summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/tests/unit/cluster
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/tests/unit/cluster
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/tests/unit/cluster')
-rw-r--r--examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl75
-rw-r--r--examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl3063
-rw-r--r--examples/redis-unstable/tests/unit/cluster/cli.tcl415
-rw-r--r--examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl110
-rw-r--r--examples/redis-unstable/tests/unit/cluster/failure-marking.tcl53
-rw-r--r--examples/redis-unstable/tests/unit/cluster/hostnames.tcl230
-rw-r--r--examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl29
-rw-r--r--examples/redis-unstable/tests/unit/cluster/internal-secret.tcl71
-rw-r--r--examples/redis-unstable/tests/unit/cluster/links.tcl292
-rw-r--r--examples/redis-unstable/tests/unit/cluster/misc.tcl36
-rw-r--r--examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl182
-rw-r--r--examples/redis-unstable/tests/unit/cluster/scripting.tcl91
-rw-r--r--examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl67
-rw-r--r--examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl61
-rw-r--r--examples/redis-unstable/tests/unit/cluster/slot-stats.tcl1169
15 files changed, 5944 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl b/examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl
new file mode 100644
index 0000000..a37ca58
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl
@@ -0,0 +1,75 @@
+start_cluster 2 2 {tags {external:skip cluster}} {
+
+ test "Test change cluster-announce-port and cluster-announce-tls-port at runtime" {
+ if {$::tls} {
+ set baseport [lindex [R 0 config get tls-port] 1]
+ } else {
+ set baseport [lindex [R 0 config get port] 1]
+ }
+ set count [expr [llength $::servers] + 1]
+ set used_port [find_available_port $baseport $count]
+
+ R 0 config set cluster-announce-tls-port $used_port
+ R 0 config set cluster-announce-port $used_port
+
+ assert_match "*:$used_port@*" [R 0 CLUSTER NODES]
+ wait_for_condition 50 100 {
+ [string match "*:$used_port@*" [R 1 CLUSTER NODES]]
+ } else {
+ fail "Cluster announced port was not propagated via gossip"
+ }
+
+ R 0 config set cluster-announce-tls-port 0
+ R 0 config set cluster-announce-port 0
+ assert_match "*:$baseport@*" [R 0 CLUSTER NODES]
+ }
+
+ test "Test change cluster-announce-bus-port at runtime" {
+ if {$::tls} {
+ set baseport [lindex [R 0 config get tls-port] 1]
+ } else {
+ set baseport [lindex [R 0 config get port] 1]
+ }
+ set count [expr [llength $::servers] + 1]
+ set used_port [find_available_port $baseport $count]
+
+ # Verify config set cluster-announce-bus-port
+ R 0 config set cluster-announce-bus-port $used_port
+ assert_match "*@$used_port *" [R 0 CLUSTER NODES]
+ wait_for_condition 50 100 {
+ [string match "*@$used_port *" [R 1 CLUSTER NODES]]
+ } else {
+ fail "Cluster announced port was not propagated via gossip"
+ }
+
+ # Verify restore default cluster-announce-port
+ set base_bus_port [expr $baseport + 10000]
+ R 0 config set cluster-announce-bus-port 0
+ assert_match "*@$base_bus_port *" [R 0 CLUSTER NODES]
+ }
+
+ test "CONFIG SET port updates cluster-announced port" {
+ set count [expr [llength $::servers] + 1]
+ # Get the original port and change to new_port
+ if {$::tls} {
+ set orig_port [lindex [R 0 config get tls-port] 1]
+ } else {
+ set orig_port [lindex [R 0 config get port] 1]
+ }
+ assert {$orig_port != ""}
+ set new_port [find_available_port $orig_port $count]
+
+ if {$::tls} {
+ R 0 config set tls-port $new_port
+ } else {
+ R 0 config set port $new_port
+ }
+
+ # Verify that the new port appears in the output of cluster slots
+ wait_for_condition 50 100 {
+ [string match "*$new_port*" [R 0 cluster slots]]
+ } else {
+ fail "Cluster announced port was not updated in cluster slots"
+ }
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl
new file mode 100644
index 0000000..f04257f
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl
@@ -0,0 +1,3063 @@
+set ::slot_prefixes [dict create \
+ 0 "{06S}" \
+ 1 "{Qi}" \
+ 2 "{5L5}" \
+ 3 "{4Iu}" \
+ 4 "{4gY}" \
+ 5 "{460}" \
+ 6 "{1Y7}" \
+ 7 "{1LV}" \
+ 101 "{1j2}" \
+ 102 "{75V}" \
+ 103 "{bno}" \
+ 5462 "{450}"\
+ 5463 "{4dY}"\
+ 6000 "{4L7}" \
+ 6001 "{4YV}" \
+ 6002 "{0bx}" \
+ 6003 "{AJ}" \
+ 6004 "{of}" \
+ 16383 "{6ZJ}" \
+]
+
+# Helper functions
+proc get_port {node_id} {
+ if {$::tls} {
+ return [lindex [R $node_id config get tls-port] 1]
+ } else {
+ return [lindex [R $node_id config get port] 1]
+ }
+}
+
+# return the prefix for the given slot
+proc slot_prefix {slot} {
+ return [dict get $::slot_prefixes $slot]
+}
+
+# return a key for the given slot
+proc slot_key {slot {suffix ""}} {
+ return "[slot_prefix $slot]$suffix"
+}
+
+# Populate a slot with keys
+# TODO: Consider merging with populate()
+proc populate_slot {num args} {
+ # Default values
+ set prefix "key:"
+ set size 3
+ set idx 0
+ set prints false
+ set expires 0
+ set slot -1
+
+ # Parse named arguments
+ foreach {key value} $args {
+ switch -- $key {
+ -prefix { set prefix $value }
+ -size { set size $value }
+ -idx { set idx $value }
+ -prints { set prints $value }
+ -expires { set expires $value }
+ -slot { set slot $value }
+ default { error "Unknown option: $key" }
+ }
+ }
+
+ # If slot is specified, use slot prefix from table
+ if {$slot >= 0} {
+ if {[dict exists $::slot_prefixes $slot]} {
+ set prefix [dict get $::slot_prefixes $slot]
+ } else {
+ error "Slot $slot not supported in slot_prefixes table, add it manually"
+ }
+ }
+
+ R $idx deferred 1
+ if {$num > 16} {set pipeline 16} else {set pipeline $num}
+ set val [string repeat A $size]
+ for {set j 0} {$j < $pipeline} {incr j} {
+ if {$expires > 0} {
+ R $idx set $prefix$j $val ex $expires
+ } else {
+ R $idx set $prefix$j $val
+ }
+ if {$prints} {puts $j}
+ }
+ for {} {$j < $num} {incr j} {
+ if {$expires > 0} {
+ R $idx set $prefix$j $val ex $expires
+ } else {
+ R $idx set $prefix$j $val
+ }
+ R $idx read
+ if {$prints} {puts $j}
+ }
+ for {set j 0} {$j < $pipeline} {incr j} {
+ R $idx read
+ if {$prints} {puts $j}
+ }
+ R $idx deferred 0
+}
+
+# Return 1 if all instances are idle
+proc asm_all_instances_idle {total} {
+ for {set i 0} {$i < $total} {incr i} {
+ if {[CI $i cluster_slot_migration_active_tasks] != 0} { return 0 }
+ if {[CI $i cluster_slot_migration_active_trim_running] != 0} { return 0 }
+ }
+ return 1
+}
+
+# Wait for all ASM tasks to complete in the cluster
+proc wait_for_asm_done {} {
+ set total_instances [expr {$::cluster_master_nodes + $::cluster_replica_nodes}]
+
+ wait_for_condition 1000 10 {
+ [asm_all_instances_idle $total_instances] == 1
+ } else {
+ # Print the number of active tasks on each instance
+ for {set i 0} {$i < $total_instances} {incr i} {
+ set migration_count [CI $i cluster_slot_migration_active_tasks]
+ set trim_count [CI $i cluster_slot_migration_active_trim_running]
+ puts "Instance $i: migration_tasks=$migration_count, trim_tasks=$trim_count"
+ }
+ fail "ASM tasks did not complete on all instances"
+ }
+ # wait all nodes to reach the same cluster config after ASM
+ wait_for_cluster_propagation
+}
+
+proc failover_and_wait_for_done {node_id {failover_arg ""}} {
+ set max_attempts 5
+ for {set attempt 1} {$attempt <= $max_attempts} {incr attempt} {
+ if {$failover_arg eq ""} {
+ R $node_id cluster failover
+ } else {
+ R $node_id cluster failover $failover_arg
+ }
+
+ set completed 1
+ wait_for_condition 1000 10 {
+ [string match "*master*" [R $node_id role]]
+ } else {
+ set completed 0
+ }
+
+ if {$completed} {
+ wait_for_cluster_propagation
+ return
+ }
+ }
+ fail "Failover did not complete after $max_attempts attempts for node $node_id"
+}
+
+proc migration_status {node_id task_id field} {
+ set status [R $node_id CLUSTER MIGRATION STATUS ID $task_id]
+
+ # STATUS ID returns single task, so get first element
+ if {[llength $status] == 0} {
+ return ""
+ }
+
+ set task_status [lindex $status 0]
+ set field_value ""
+
+ # Parse the key-value pairs in the task
+ for {set i 0} {$i < [llength $task_status]} {incr i 2} {
+ set key [lindex $task_status $i]
+ set value [lindex $task_status [expr $i + 1]]
+
+ if {$key eq $field} {
+ set field_value $value
+ break
+ }
+ }
+
+ return $field_value
+}
+
+# Setup slot migration test with keys and delay, then start migration
+# Returns the task_id for the migration
+proc setup_slot_migration_with_delay {src_node dst_node start_slot end_slot {keys 2} {delay 1000000}} {
+ # Two keys on the start slot
+ populate_slot $keys -idx $src_node -slot $start_slot
+
+ # we set a delay to ensure migration takes time for testing,
+ # with default parameters, two keys cost 2s to save
+ R $src_node config set rdb-key-save-delay $delay
+
+ # migrate slot range from src_node to dst_node
+ set task_id [R $dst_node CLUSTER MIGRATION IMPORT $start_slot $end_slot]
+ wait_for_condition 2000 10 {
+ [string match {*send-bulk-and-stream*} [migration_status $src_node $task_id state]]
+ } else {
+ fail "ASM task did not start"
+ }
+
+ return $task_id
+}
+
+# Helper function to clear module internal event logs
+proc clear_module_event_log {} {
+ for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
+ R $i asm.clear_event_log
+ }
+}
+
+proc reset_default_trim_method {} {
+ for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
+ R $i debug asm-trim-method default
+ }
+}
+
+start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
+ foreach trim_method {"active" "bg"} {
+ test "Simple slot migration (trim method: $trim_method)" {
+ R 0 debug asm-trim-method $trim_method
+ R 3 debug asm-trim-method $trim_method
+
+ set slot0_key [slot_key 0 mykey]
+ R 0 set $slot0_key "a"
+ set slot1_key [slot_key 1 mykey]
+ R 0 set $slot1_key "b"
+ set slot101_key [slot_key 101 mykey]
+ R 0 set $slot101_key "c"
+ # 3 keys cost 3s to save
+ R 0 config set rdb-key-save-delay 1000000
+
+ # load a function
+ R 0 function load {#!lua name=test1
+ redis.register_function('test1', function() return 'hello1' end)
+ }
+
+ # migrate slot 0-100 to R 1
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
+ # migration is start, and in accumulating buffer stage
+ wait_for_condition 1000 50 {
+ [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] &&
+ [string match {*accumulate-buffer*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not start"
+ }
+
+ # append 99 times during migration
+ for {set i 0} {$i < 99} {incr i} {
+ R 0 multi
+ R 0 append $slot0_key "a"
+ R 0 exec
+ R 0 append $slot1_key "b"
+ R 0 append $slot101_key "c"
+ }
+
+ # wait until migration of 0-100 successful
+ wait_for_asm_done
+
+ # verify task state became completed
+ assert_equal "completed" [migration_status 0 $task_id state]
+ assert_equal "completed" [migration_status 1 $task_id state]
+
+ # the appended 99 times should also be migrated
+ assert_equal [string repeat a 100] [R 1 get $slot0_key]
+ assert_equal [string repeat b 100] [R 1 get $slot1_key]
+
+ # function should be migrated
+ assert_equal [R 0 function dump] [R 1 function dump]
+ # the slave should also get the data
+ wait_for_ofs_sync [Rn 1] [Rn 4]
+
+ R 4 readonly
+ assert_equal [string repeat a 100] [R 4 get $slot0_key]
+ assert_equal [string repeat b 100] [R 4 get $slot1_key]
+ assert_equal [R 0 function dump] [R 4 function dump]
+
+ # verify key that was not in the slot range is not migrated
+ assert_equal [string repeat c 100] [R 0 get $slot101_key]
+ # verify changes in replica
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+ R 3 readonly
+ assert_equal [string repeat c 100] [R 3 get $slot101_key]
+
+ # cleanup
+ R 0 config set rdb-key-save-delay 0
+ R 0 flushall
+ R 0 function flush
+ R 1 flushall
+ R 1 function flush
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ }
+ }
+}
+
+# Skip most of the tests when running under valgrind since it is hard to
+# stabilize tests under valgrind.
+if {!$::valgrind} {
+start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
+ test "Test CLUSTER MIGRATION IMPORT input validation" {
+ # invalid arguments
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100 200 300}
+ assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION UNKNOWN 1 2}
+
+ # invalid slot range
+ assert_error {*greater than end slot number*} {R 0 CLUSTER MIGRATION IMPORT 200 100}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 17000 18000}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 14000 18000}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 16384}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 -1}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -1 2}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -2 -1}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 10 a}
+ assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT sd sd}
+ assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 200}
+ }
+
+ test "Test CLUSTER MIGRATION CANCEL input validation" {
+ # invalid arguments
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID 12345 EXTRAARG}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ALL EXTRAARG}
+ assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL UNKNOWNARG}
+ assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL abc def}
+ # empty string id should not cancel any task
+ assert_equal 0 [R 0 CLUSTER MIGRATION CANCEL ID ""]
+ }
+
+ test "Test CLUSTER MIGRATION STATUS input validation" {
+ # invalid arguments
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID id EXTRAARG}
+ assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ALL EXTRAARG}
+ assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS ABC DEF}
+ assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS UNKNOWNARG}
+ # empty string id should not list any task
+ assert_equal {} [R 0 CLUSTER MIGRATION STATUS ID ""]
+ }
+
+ test "Test TRIMSLOTS input validation" {
+ # Wrong number of arguments
+ assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS}
+ assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES}
+ assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 1}
+ assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 2 100}
+ assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 17000 1}
+ assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES abc}
+
+ # Missing ranges argument
+ assert_error {*missing ranges argument*} {R 0 TRIMSLOTS UNKNOWN 1 100 200}
+
+ # Invalid number of ranges
+ assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 0 1 1}
+ assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES -1 2 2}
+ assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 17000 1 2}
+ assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 2 100 200 300}
+
+ # Invalid slot numbers
+ assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -1 0}
+ assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -2 -1}
+ assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 0 16384}
+ assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 abc def}
+ assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 100 abc}
+
+ # Start slot greater than end slot
+ assert_error {*greater than end slot number*} {R 0 TRIMSLOTS RANGES 1 200 100}
+ }
+
+ test "Test IMPORT not allowed on replica" {
+ assert_error {* not allowed on replica*} {R 4 CLUSTER MIGRATION IMPORT 100 200}
+ }
+
+ test "Test IMPORT not allowed during manual migration" {
+ set dst_id [R 1 CLUSTER MYID]
+
+ # Set a slot to IMPORTING
+ R 0 CLUSTER SETSLOT 15000 IMPORTING $dst_id
+ assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200}
+ # Revert the change
+ R 0 CLUSTER SETSLOT 15000 STABLE
+
+ # Same test with setting a slot to MIGRATING
+ R 0 CLUSTER SETSLOT 5000 MIGRATING $dst_id
+ assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200}
+ # Revert the change
+ R 0 CLUSTER SETSLOT 5000 STABLE
+ }
+
+ test "Test IMPORT not allowed if the node is already the owner" {
+ assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 100}
+ }
+
+ test "Test IMPORT not allowed for a slot without an owner" {
+ # Slot will have no owner
+ R 0 CLUSTER DELSLOTS 5000
+
+ assert_error {*slot has no owner: 5000*} {R 0 CLUSTER MIGRATION IMPORT 5000 5000}
+
+ # Revert the change
+ R 0 CLUSTER ADDSLOTS 5000
+ }
+
+ test "Test IMPORT not allowed if slot ranges belong to different nodes" {
+ assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 15000}
+ assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 14000 15000}
+ }
+
+ test "Test IMPORT not allowed if slot is given multiple times" {
+ assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 8000 9000}
+ assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 7900 9000}
+ }
+
+ test "Test CLUSTER MIGRATION STATUS ALL lists all tasks" {
+ # Create 3 completed tasks
+ R 0 CLUSTER MIGRATION IMPORT 7000 7001
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 7002 7003
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 7004 7005
+ wait_for_asm_done
+
+ # Get node IDs for verification
+ set node0_id [R 0 cluster myid]
+ set node1_id [R 1 cluster myid]
+
+ # Verify CLUSTER MIGRATION STATUS ALL reply from both nodes
+ foreach node_idx {0 1} {
+ set tasks [R $node_idx CLUSTER MIGRATION STATUS ALL]
+ assert_equal 3 [llength $tasks]
+
+ for {set i 0} {$i < 3} {incr i} {
+ set task [lindex $tasks $i]
+
+ # Verify field order
+ set expected_fields {id slots source dest operation state
+ last_error retries create_time start_time
+ end_time write_pause_ms}
+ for {set j 0} {$j < [llength $expected_fields]} {incr j} {
+ set expected_field [lindex $expected_fields $j]
+ set actual_field [lindex $task [expr $j * 2]]
+ assert_equal $expected_field $actual_field
+ }
+
+ # Verify basic fields
+ assert_equal "completed" [dict get $task state]
+ assert_equal "" [dict get $task last_error]
+ assert_equal 0 [dict get $task retries]
+ assert {[dict get $task write_pause_ms] >= 0}
+
+ # Verify operation based on node
+ if {$node_idx == 0} {
+ assert_equal "import" [dict get $task operation]
+ } else {
+ assert_equal "migrate" [dict get $task operation]
+ }
+
+ # Verify node IDs (all tasks: node1 -> node0)
+ assert_equal $node1_id [dict get $task source]
+ assert_equal $node0_id [dict get $task dest]
+
+ # Verify timestamps exist and are reasonable
+ set create_time [dict get $task create_time]
+ set start_time [dict get $task start_time]
+ set end_time [dict get $task end_time]
+ assert {$create_time > 0}
+ assert {$start_time >= $create_time}
+ assert {$end_time >= $start_time}
+
+ # Verify specific slot ranges for each task
+ set slots [dict get $task slots]
+ if {$i == 0} {
+ assert_equal "7004-7005" $slots
+ } elseif {$i == 1} {
+ assert_equal "7002-7003" $slots
+ } elseif {$i == 2} {
+ assert_equal "7000-7001" $slots
+ }
+ }
+ }
+
+ # cleanup
+ R 1 CLUSTER MIGRATION IMPORT 7000 7005
+ wait_for_asm_done
+ }
+
+ test "Test IMPORT not allowed if there is an overlapping import" {
+ # Let slot migration take long time, so that we can test overlapping import
+ R 1 config set rdb-key-save-delay 1000000
+ R 1 set tag22273 tag22273 ;# slot hash is 7000
+ R 1 set tag9283 tag9283 ;# slot hash is 8000
+
+ set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 8000]
+ assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 8000 9000}
+ assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 7500 8500}
+ assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6000 7000}
+ assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6500 7500}
+
+ wait_for_condition 1000 50 {
+ [string match {*completed*} [migration_status 0 $task_id state]] &&
+ [string match {*completed*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not start"
+ }
+ assert_equal "tag22273" [R 0 get tag22273]
+ assert_equal "tag9283" [R 0 get tag9283]
+ R 1 config set rdb-key-save-delay 0
+
+ # revert the migration
+ R 1 CLUSTER MIGRATION IMPORT 7000 8000
+ wait_for_asm_done
+ }
+
+ test "Test IMPORT with unsorted and adjacent ranges" {
+ # Redis should sort and merge adjacent ranges
+ # Adjacent means: prev.end + 1 == next.start
+ # e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005
+
+ # Test with adjacent ranges
+ set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100]
+ wait_for_asm_done
+ # verify migration is successfully completed on both nodes
+ assert_equal "completed" [migration_status 0 $task_id state]
+ assert_equal "completed" [migration_status 1 $task_id state]
+ # verify slot ranges are merged correctly
+ assert_equal "7000-7100" [migration_status 0 $task_id slots]
+ assert_equal "7000-7100" [migration_status 1 $task_id slots]
+
+ # Test with unsorted and adjacent ranges
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005]
+ wait_for_asm_done
+ # verify migration is successfully completed on both nodes
+ assert_equal "completed" [migration_status 0 $task_id state]
+ assert_equal "completed" [migration_status 1 $task_id state]
+ # verify slot ranges are merged correctly
+ assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots]
+ assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots]
+
+ # Another test with unsorted and adjacent ranges
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006]
+ wait_for_asm_done
+ # verify migration is successfully completed on both nodes
+ assert_equal "completed" [migration_status 0 $task_id state]
+ assert_equal "completed" [migration_status 1 $task_id state]
+ # verify slot ranges are merged correctly
+ assert_equal "7006-7009" [migration_status 0 $task_id slots]
+ assert_equal "7006-7009" [migration_status 1 $task_id slots]
+ }
+
+ test "Simple slot migration with write load" {
+ # Perform slot migration while traffic is on and verify data consistency.
+ # Trimming is disabled on source nodes so, we can compare the dbs after
+ # migration via DEBUG DIGEST to ensure no data loss during migration.
+ # Steps:
+ # 1. Disable trimming on both nodes
+ # 2. Populate slot 0 on node-0 and slot 6000 on node-1
+ # 2. Start write traffic on both nodes
+ # 3. Migrate slot 0 from node-0 to node-1
+ # 4. Migrate slot 6000 from node-1 to node-0
+ # 5. Stop write traffic, verify db's are identical.
+
+ # This test runs slowly under the thread sanitizer.
+ # 1. Increase the lag threshold from the default 1 MB to 10 MB to let the destination catch up easily.
+ # 2. Increase the write pause timeout from the default 10s to 60s so the source can wait longer.
+ set prev_config_lag [lindex [R 0 config get cluster-slot-migration-handoff-max-lag-bytes] 1]
+ R 0 config set cluster-slot-migration-handoff-max-lag-bytes 10mb
+ R 1 config set cluster-slot-migration-handoff-max-lag-bytes 10mb
+ set prev_config_timeout [lindex [R 0 config get cluster-slot-migration-write-pause-timeout] 1]
+ R 0 config set cluster-slot-migration-write-pause-timeout 60000
+ R 1 config set cluster-slot-migration-write-pause-timeout 60000
+
+ R 0 flushall
+ R 0 debug asm-trim-method none
+ populate_slot 10000 -idx 0 -slot 0
+
+ R 1 flushall
+ R 1 debug asm-trim-method none
+ populate_slot 10000 -idx 1 -slot 6000
+
+ # Start write traffic on node-0
+ # Throws -MOVED error once asm is completed, catch block will ignore it.
+ catch {
+ # Start the slot 0 write load on the R 0
+ set port [get_port 0]
+ set key [slot_key 0 mykey]
+ set load_handle0 [start_write_load "127.0.0.1" $port 100 $key 0 5]
+ }
+
+ # Start write traffic on node-1
+ # Throws -MOVED error once asm is completed, catch block will ignore it.
+ catch {
+ # Start the slot 6000 write load on the R 1
+ set port [get_port 1]
+ set key [slot_key 6000 mykey]
+ set load_handle1 [start_write_load "127.0.0.1" $port 100 $key 0 5]
+ }
+
+ # Migrate keys
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 6000 6100
+ wait_for_asm_done
+
+ stop_write_load $load_handle0
+ stop_write_load $load_handle1
+
+ # verify data
+ assert_morethan [R 0 dbsize] 0
+ assert_equal [R 0 debug digest] [R 1 debug digest]
+
+ # cleanup
+ R 0 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag
+ R 0 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout
+ R 0 debug asm-trim-method default
+ R 0 flushall
+ R 1 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag
+ R 1 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout
+ R 1 debug asm-trim-method default
+ R 1 flushall
+
+ R 1 CLUSTER MIGRATION IMPORT 6000 6100
+ wait_for_asm_done
+ }
+
+ test "Verify expire time is migrated correctly" {
+ R 0 flushall
+ R 1 flushall
+
+ set string_key [slot_key 0 string_key]
+ set list_key [slot_key 0 list_key]
+ set hash_key [slot_key 0 hash_key]
+ set stream_key [slot_key 0 stream_key]
+
+ for {set i 0} {$i < 20} {incr i} {
+ R 1 hset $hash_key $i $i
+ R 1 xadd $stream_key * item $i
+ }
+ for {set i 0} {$i < 2000} {incr i} {
+ R 1 lpush $list_key $i
+ }
+
+ # set expire time of some keys
+ R 1 set $string_key "a" EX 1000
+ R 1 EXPIRE $list_key 1000
+ R 1 EXPIRE $hash_key 1000
+
+ # migrate slot 0-100 to R 0
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+
+ # check expire times are migrated correctly
+ assert_range [R 0 ttl $string_key] 900 1000
+ assert_range [R 0 ttl $list_key] 900 1000
+ assert_range [R 0 ttl $hash_key] 900 1000
+ assert_equal -1 [R 0 ttl $stream_key]
+
+ # cleanup
+ R 0 flushall
+ R 1 flushall
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ }
+
+ test "Slot migration with complex data types can work well" {
+ R 0 flushall
+ R 1 flushall
+
+ set list_key [slot_key 0 list_key]
+ set set_key [slot_key 0 set_key]
+ set zset_key [slot_key 0 zset_key]
+ set hash_key [slot_key 0 hash_key]
+ set stream_key [slot_key 0 stream_key]
+
+ # generate big keys for each data type
+ for {set i 0} {$i < 1000} {incr i} {
+ R 1 lpush $list_key $i
+ R 1 sadd $set_key $i
+ R 1 zadd $zset_key $i $i
+ R 1 hset $hash_key $i $i
+ R 1 xadd $stream_key * item $i
+ }
+
+ # migrate slot 0-100 to R 0
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ # check the data on destination node is correct
+ assert_equal 1000 [R 0 llen $list_key]
+ assert_equal 1000 [R 0 scard $set_key]
+ assert_equal 1000 [R 0 zcard $zset_key]
+ assert_equal 1000 [R 0 hlen $hash_key]
+ assert_equal 1000 [R 0 xlen $stream_key]
+ # migrate slot 0-100 to R 1
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ }
+
+ proc asm_basic_error_handling_test {operation channel all_states} {
+ foreach state $all_states {
+ if {$::verbose} { puts "Testing $operation $channel channel with state: $state"}
+
+ # For states that need incremental data streaming, set a longer delay
+ set streaming_states [list "streaming-buffer" "accumulate-buffer" "send-bulk-and-stream" "send-stream"]
+ if {$state in $streaming_states} {
+ R 1 config set rdb-key-save-delay 1000000
+ }
+
+ # Let the destination node take time to stream buffer, so the source node will handle
+ # slot snapshot child process exit, and then enter "send-stream" state.
+ if {$state == "send-stream"} {
+ R 0 config set key-load-delay 100000
+ }
+
+ # Start the slot 0 write load on the R 1
+ set slot0_key [slot_key 0 mykey]
+ set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key 500]
+
+ # clear old fail points and set the new fail point
+ assert_equal {OK} [R 0 debug asm-failpoint "" ""]
+ assert_equal {OK} [R 1 debug asm-failpoint "" ""]
+ if {$operation eq "import"} {
+ assert_equal {OK} [R 0 debug asm-failpoint "import-$channel-channel" $state]
+ } elseif {$operation eq "migrate"} {
+ assert_equal {OK} [R 1 debug asm-failpoint "migrate-$channel-channel" $state]
+ } else {
+ fail "Unknown operation: $operation"
+ }
+
+ # Start the migration
+ set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100]
+
+ # The task should be failed due to the fail point
+ wait_for_condition 2000 10 {
+ [string match -nocase "*$channel*${state}*" [migration_status 0 $task_id last_error]] ||
+ [string match -nocase "*$channel*${state}*" [migration_status 1 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail with expected error -
+ (dst: [migration_status 0 $task_id last_error]
+ src: [migration_status 1 $task_id last_error]
+ expected: $channel $state)"
+ }
+ stop_write_load $load_handle
+
+ # Cancel the task
+ R 0 CLUSTER MIGRATION CANCEL ID $task_id
+ R 1 CLUSTER MIGRATION CANCEL ID $task_id
+
+ R 1 config set rdb-key-save-delay 0
+ R 0 config set key-load-delay 0
+ }
+ }
+
+ test "Destination node main channel basic error-handling tests " {
+ set all_states [list \
+ "connecting" \
+ "auth-reply" \
+ "handshake-reply" \
+ "syncslots-reply" \
+ "accumulate-buffer" \
+ "streaming-buffer" \
+ "wait-stream-eof" \
+ ]
+ asm_basic_error_handling_test "import" "main" $all_states
+ }
+
+ test "Destination node rdb channel basic error-handling tests" {
+ set all_states [list \
+ "connecting" \
+ "auth-reply" \
+ "rdbchannel-reply" \
+ "rdbchannel-transfer" \
+ ]
+ asm_basic_error_handling_test "import" "rdb" $all_states
+ }
+
+ test "Source node main channel basic error-handling tests " {
+ set all_states [list \
+ "wait-rdbchannel" \
+ "send-bulk-and-stream" \
+ "send-stream" \
+ "handoff" \
+ ]
+ asm_basic_error_handling_test "migrate" "main" $all_states
+ }
+
+ test "Source node rdb channel basic error-handling tests" {
+ set all_states [list \
+ "wait-bgsave-start" \
+ "send-bulk-and-stream" \
+ ]
+ asm_basic_error_handling_test "migrate" "rdb" $all_states
+ }
+
+ test "Migration will be successful after fail points are cleared" {
+ R 0 flushall
+ R 1 flushall
+ set slot0_key [slot_key 0 mykey]
+ set slot1_key [slot_key 1 mykey]
+ R 1 set $slot0_key "a"
+ R 1 set $slot1_key "b"
+
+ # we set a delay to write incremental data
+ R 1 config set rdb-key-save-delay 1000000
+
+ # Start the slot 0 write load on the R 1
+ set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key]
+
+ # Clear all fail points
+ assert_equal {OK} [R 0 debug asm-failpoint "" ""]
+ assert_equal {OK} [R 1 debug asm-failpoint "" ""]
+
+ # Start the migration
+ set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100]
+
+ # Wait for the migration to complete
+ wait_for_asm_done
+
+ stop_write_load $load_handle
+
+ # Verify the data is migrated, slot 0 and 1 should belong to R 1
+ # slot 0 key should be changed by the write load
+ assert_not_equal "a" [R 0 get $slot0_key]
+ assert_equal "b" [R 0 get $slot1_key]
+ R 1 config set rdb-key-save-delay 0
+ }
+
+ test "Client output buffer limit is reached on source side" {
+ R 0 flushall
+ R 1 flushall
+ set r1_pid [S 1 process_id]
+ R 1 debug repl-pause on-streaming-repl-buf
+
+ # Set a small output buffer limit to trigger the error
+ R 0 config set client-output-buffer-limit "replica 4mb 0 0"
+
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ # some write traffic is to have chance to enter streaming buffer state
+ set slot0_key [slot_key 0 mykey]
+ R 0 set $slot0_key "a"
+
+ # after 3 second, the slots snapshot (costs 2s to generate) should be transferred,
+ # then start streaming buffer
+ after 3000
+
+ set loglines [count_log_lines 0]
+
+ # Start the slot 0 write load on the R 0
+ set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 1000]
+
+ # verify the metric is accessible, it is transient, will be reset on disconnect
+ assert {[S 0 mem_cluster_slot_migration_output_buffer] >= 0}
+
+ # After some time, the client output buffer limit should be reached
+ wait_for_log_messages 0 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 1000 10
+ wait_for_condition 1000 10 {
+ [string match {*send*stream*} [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail as expected"
+ }
+
+ stop_write_load $load_handle
+
+ # Reset configurations
+ R 0 config set client-output-buffer-limit "replica 0 0 0"
+ R 0 config set rdb-key-save-delay 0
+
+ # resume server and clear pause point
+ resume_process $r1_pid
+ R 1 debug repl-pause clear
+
+ # Wait for the migration to complete
+ wait_for_asm_done
+ }
+
+ test "Full sync buffer limit is reached on destination side" {
+ # Set a small replication buffer limit to trigger the error
+ R 0 config set replica-full-sync-buffer-limit 1mb
+
+ # start migration from 1 to 0, cost 4s to transfer slots snapshot
+ set task_id [setup_slot_migration_with_delay 1 0 0 100 2 2000000]
+ set loglines [count_log_lines 0]
+
+ # Create some traffic on slot 0
+ populate_slot 100 -idx 1 -slot 0 -size 100000
+
+ # After some time, slots sync buffer limit should be reached, but migration would not fail
+ # since the buffer will be accumulated on source side from now.
+ wait_for_log_messages 0 {"*Slots sync buffer limit has been reached*"} $loglines 1000 10
+
+ # verify the peak value, should be greater than 1mb
+ assert {[S 0 mem_cluster_slot_migration_input_buffer_peak] > 1000000}
+ # verify the metric is accessible, it is transient, will be reset on disconnect
+ assert {[S 0 mem_cluster_slot_migration_input_buffer] >= 0}
+
+ wait_for_asm_done
+
+ # Reset configurations
+ R 0 config set replica-full-sync-buffer-limit 0
+ R 1 config set rdb-key-save-delay 0
+ R 1 cluster migration import 0 100
+ wait_for_asm_done
+ }
+
+ test "Expired key is not deleted and SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT filter keys in importing slots" {
+ set slot0_key [slot_key 0 mykey]
+ set slot1_key [slot_key 1 mykey]
+ set slot2_key [slot_key 2 mykey]
+ R 1 flushall
+ R 0 flushall
+
+ # we set a delay to write incremental data
+ R 1 config set rdb-key-save-delay 1000000
+
+ # set expire time 2s. Generating slot snapshot will 3s, so these
+ # three keys will be expired after slot snapshot is transferred
+ R 1 setex $slot0_key 2 "a"
+ R 1 setex $slot1_key 2 "b"
+ R 1 hset $slot2_key "f1" "1"
+ R 1 expire $slot2_key 2
+ R 1 hexpire $slot2_key 2 FIELDS 1 "f1"
+
+ set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100]
+ wait_for_condition 2000 10 {
+ [string match {*send-bulk-and-stream*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not start"
+ }
+
+ # update expire time during mirgration
+ R 1 setex $slot0_key 100 "a"
+ R 1 expire $slot1_key 80
+ R 1 expire $slot2_key 60
+ R 1 hincrbyfloat $slot2_key "f1" 1
+ R 1 hexpire $slot2_key 60 FIELDS 1 "f1"
+
+ # after 2s, at least a key should be transferred, and should not be deleted
+ # due to expired, neither active nor lazy expiration (SCAN) takes effect,
+ # Besides SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT command can not find them
+ after 2000
+ R 3 readonly
+ foreach id {0 3} { ;# 0 is the master, 3 is the replica
+ assert_equal {0 {}} [R $id scan 0 count 10]
+ assert_equal {} [R $id keys "*"]
+ assert_equal {} [R $id keys "{06S}*"]
+ assert_equal {} [R $id randomkey]
+ assert_equal {} [R $id cluster getkeysinslot 0 100]
+ assert_equal [R $id cluster countkeysinslot 0] 0
+ assert_equal [R $id dbsize] 0
+
+ # but we can see the number of keys is increased in INFO KEYSPACE
+ assert {[scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d] >= 1}
+ assert {[scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d] >= 1}
+ }
+
+ wait_for_asm_done
+
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+
+ foreach id {0 3} { ;# 0 is the master, 3 is the replica
+ # verify the keys are valid
+ assert_range [R $id ttl $slot0_key] 90 100
+ assert_range [R $id ttl $slot1_key] 70 80
+ assert_range [R $id ttl $slot2_key] 50 60
+ assert_range [R $id httl $slot2_key FIELDS 1 "f1"] 50 60
+
+ # KEYS/SCAN/RANDOMKEY/CLUSTER GETKEYSINSLOT will find the keys after migration
+ assert_equal [list 0 [list $slot0_key $slot1_key $slot2_key]] [R $id scan 0 count 10]
+ assert_equal [list $slot0_key $slot1_key $slot2_key] [R $id keys "*"]
+ assert_equal [list $slot0_key] [R $id keys "{06S}*"]
+ assert_not_equal {} [R $id randomkey]
+ assert_equal [list $slot0_key] [R $id cluster getkeysinslot 0 100]
+
+ # INFO KEYSPACE/DBSIZE/CLUSTER COUNTKEYSINSLOT will also reflect the keys
+ assert_equal 3 [scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d]
+ assert_equal 3 [scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d]
+ assert_equal 1 [scan [regexp -inline {subexpiry\=([\d]*)} [R $id info keyspace]] subexpiry=%d]
+ assert_equal 3 [R $id dbsize]
+ assert_equal 1 [R $id cluster countkeysinslot 0]
+ }
+
+ # update expire time to 10ms, after some time, the keys should be deleted due to
+ # active expiration
+ R 0 pexpire $slot0_key 10
+ R 0 pexpire $slot1_key 10
+ R 0 hpexpire $slot2_key 10 FIELDS 1 "f1" ;# the last field is expired, the key will be deleted
+ wait_for_condition 100 50 {
+ [scan [regexp -inline {keys\=([\d]*)} [R 0 info keyspace]] keys=%d] == {} &&
+ [scan [regexp -inline {keys\=([\d]*)} [R 3 info keyspace]] keys=%d] == {}
+ } else {
+ fail "keys did not expire"
+ }
+
+ R 1 config set rdb-key-save-delay 0
+ }
+
+ test "Eviction does not evict keys in importing slots" {
+ set slot0_key [slot_key 0 mykey]
+ set slot1_key [slot_key 1 mykey]
+ set slot2_key [slot_key 2 mykey]
+ set slot5462_key [slot_key 5462 mykey]
+ set slot5463_key [slot_key 5463 mykey]
+ R 1 flushall
+ R 0 flushall
+
+ # we set a delay to write incremental data
+ R 0 config set rdb-key-save-delay 1000000
+
+ set 1k_str [string repeat "a" 1024]
+ set 1m_str [string repeat "a" 1048576]
+
+ # set two keys to be evicted
+ R 1 set $slot5462_key $1k_str
+ R 1 set $slot5463_key $1k_str
+
+ # set maxmemory to 200kb more than current used memory,
+ # redis should evict some keys if importing some big keys
+ set r1_mem_used [S 1 used_memory]
+ set r1_max_mem [expr {$r1_mem_used + 200*1024}]
+ R 1 config set maxmemory $r1_max_mem
+ R 1 config set maxmemory-policy allkeys-lru
+
+ # set 3 keys to be migrated
+ R 0 set $slot0_key $1m_str
+ R 0 set $slot1_key $1m_str
+ R 0 set $slot2_key $1m_str
+
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
+ wait_for_condition 2000 10 {
+ [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not start"
+ }
+
+ # after 2.2s, at least two keys should be transferred, they should not be evicted
+ # but other keys (slot5462_key and slot5463_key) should be evicted
+ after 2200
+ for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction
+ assert_equal 0 [R 1 exists $slot5462_key]
+ assert_equal 0 [R 1 exists $slot5463_key]
+ assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 2}
+
+ # current used memory should be more than the maxmemory, since the big keys that
+ # belong importing slots can not be evicted.
+ set r1_mem_used [S 1 used_memory]
+ assert {$r1_mem_used > $r1_max_mem + 1024*1024}
+
+ wait_for_asm_done
+
+ # after migration, these big keys should be evicted
+ for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction
+ assert_equal {} [scan [regexp -inline {expires\=([\d]*)} [R 1 info keyspace]] expires=%d]
+ }
+
+ test "Failover will cancel slot migration tasks" {
+ # migrate slot 0-100 from 1 to 0
+ set task_id [setup_slot_migration_with_delay 1 0 0 100]
+
+ # FAILOVER happens on the destination node, instance #3 become master, #0 become slave
+ failover_and_wait_for_done 3
+
+ # the old master will cancel the importing task, and the migrating task on
+ # the source node will be failed
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 0 $task_id state]] &&
+ [string match {*failover*} [migration_status 0 $task_id last_error]] &&
+ [string match {*failed*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+
+ # We can restart ASM tasks on new master, migrate slot 0-100 from 1 to 3
+ R 1 config set rdb-key-save-delay 0
+ set task_id [R 3 CLUSTER MIGRATION IMPORT 0 100]
+ wait_for_asm_done
+
+ # migrate slot 0-100 from 3 to 1
+ set task_id [setup_slot_migration_with_delay 3 1 0 100]
+
+ # FAILOVER happens on the source node, instance #3 become slave, #0 become master
+ failover_and_wait_for_done 0
+
+ # the old master will cancel the migrating task, but the destination node will
+ # retry the importing task, and then succeed.
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 3 $task_id state]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+ wait_for_asm_done
+ }
+
+ test "Flush-like command can cancel slot migration task" {
+ # flushall, flushdb
+ foreach flushcmd {flushall flushdb} {
+ # start slot migration from 1 to 0
+ set task_id [setup_slot_migration_with_delay 1 0 0 100]
+
+ if {$::verbose} { puts "Testing flush command: $flushcmd"}
+ R 0 $flushcmd
+
+ # flush-like will cancel the task
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+ }
+
+ R 1 config set rdb-key-save-delay 0
+ R 0 cluster migration import 0 100
+ wait_for_asm_done
+ }
+
+ test "CLUSTER SETSLOT command when there is a slot migration task" {
+ # Setup slot migration test from node 0 to node 1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ # Cluster SETSLOT command is not allowed when there is a slot migration task
+ # on the slot. #0 and #1 are having migration task now.
+ foreach instance {0 1} {
+ set node_id [R $instance cluster myid]
+
+ catch {R $instance cluster setslot 0 migrating $node_id} err
+ assert_match {*in an active atomic slot migration*} $err
+
+ catch {R $instance cluster setslot 0 importing $node_id} err
+ assert_match {*in an active atomic slot migration*} $err
+
+ catch {R $instance cluster setslot 0 stable} err
+ assert_match {*in an active atomic slot migration*} $err
+
+ catch {R $instance cluster setslot 0 node $node_id} err
+ assert_match {*in an active atomic slot migration*} $err
+ }
+
+ # CLUSTER SETSLOT on other node will cancel the migration task, we update
+ # the owner of slot 0 (that is migrating from #0 to #1) to #2 on #2, we
+ # bump the config epoch to make sure the change can update #0 and #1
+ # slot configuration, so #0 and #1 will cancel the migration task.
+ # BTW, if config epoch is not bumped, the slot config of #2 may be
+ # updated by #0 and #1.
+ R 2 cluster bumpepoch
+ R 2 cluster setslot 0 node [R 2 cluster myid]
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 0 $task_id state]] &&
+ [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] &&
+ [string match {*canceled*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+
+ # set slot 0 back to #0
+ R 0 cluster bumpepoch
+ R 0 cluster setslot 0 node [R 0 cluster myid]
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+ }
+
+ test "CLUSTER DELSLOTSRANGE command cancels a slot migration task" {
+ # start slot migration from 0 to 1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ R 0 cluster delslotsrange 0 100
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 0 $task_id state]] &&
+ [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] &&
+ [string match {*failed*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+ R 1 cluster migration cancel id $task_id
+
+ # add the slots back
+ R 0 cluster addslotsrange 0 100
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+ }
+
+ # NOTE: this test needs more than 60s, maybe you can skip when testing
+ test "CLUSTER FORGET command cancels a slot migration task" {
+ R 0 config set rdb-key-save-delay 0
+ # Migrate all slot on #0 to #1, so we can forget #0
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 5461]
+ wait_for_asm_done
+
+ # start slot migration from 1 to 0
+ set task_id [setup_slot_migration_with_delay 1 0 0 5461]
+
+ # Forget #0 on #1, the migration task on #1 will be canceled due to node deleted,
+ # and the importing task on #0 will be failed
+ R 1 cluster forget [R 0 cluster myid]
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 1 $task_id state]] &&
+ [string match {*node deleted*} [migration_status 1 $task_id last_error]] &&
+ [string match {*failed*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+
+ # Add #0 back into cluster
+ # NOTE: this will cost 60s to let #0 join the cluster since
+ # other nodes add #0 into black list for 60s after FORGET.
+ R 1 config set rdb-key-save-delay 0
+ R 1 cluster meet "127.0.0.1" [lindex [R 0 config get port] 1]
+
+ # the importing task on #0 will be retried, and eventually succeed
+ # since now #0 is back in the cluster
+ wait_for_condition 3000 50 {
+ [string match {*completed*} [migration_status 0 $task_id state]] &&
+ [string match {*completed*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not finish"
+ }
+
+ # make sure #0 is completely back to the cluster
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+ }
+
+ test "CLIENT PAUSE can cancel slot migration task" {
+ # start slot migration from 0 to 1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ # CLIENT PAUSE happens on the destination node, #1 will cancel the importing task
+ R 1 client pause 100000 write ;# pause 100s
+ wait_for_condition 1000 50 {
+ [string match {*canceled*} [migration_status 1 $task_id state]] &&
+ [string match {*client pause*} [migration_status 1 $task_id last_error]]
+ } else {
+ fail "ASM task did not cancel"
+ }
+
+ # start task again
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
+ after 200 ;# give some time to have chance to schedule the task
+ # the task should not start since server is paused
+ assert {[string match {*none*} [migration_status 1 $task_id state]]}
+
+ # unpause the server, the task should start
+ R 1 client unpause
+ wait_for_asm_done
+
+ # migrate back to original node #0
+ R 0 config set rdb-key-save-delay 0
+ R 1 config set rdb-key-save-delay 0
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ }
+
+ test "Server shutdown can cancel slot migration task, exit with success" {
+ # start slot migration from 0 to 1
+ setup_slot_migration_with_delay 0 1 0 100
+
+ set loglines [count_log_lines -1]
+
+ # Shutdown the server, it should cancel the migration task
+ restart_server -1 true false true nosave
+
+ wait_for_log_messages -1 {"*Cancelled due to server shutdown*"} $loglines 100 100
+
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+ }
+
+ test "Cancel import task when streaming buffer into db" {
+ # set a delay to have time to cancel import task that is streaming buf to db
+ R 1 config set key-load-delay 50000
+ # start slot migration from 0 to 1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 5]
+
+ # start the slot 0 write load on the node 0
+ set slot0_key [slot_key 0 mykey]
+ set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 500]
+
+ # wait for entering streaming buffer state
+ wait_for_condition 1000 10 {
+ [string match {*streaming-buffer*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not enter streaming buffer state"
+ }
+ stop_write_load $load_handle
+
+ # cancel the import task on #1, the destination node works fine
+ R 1 cluster migration cancel id $task_id
+ assert_match {*canceled*} [migration_status 1 $task_id state]
+
+ # reset config
+ R 0 config set key-load-delay 0
+ R 1 config set key-load-delay 0
+ }
+
+ test "Destination node main channel timeout when waiting stream EOF" {
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+ R 1 config set repl-timeout 5
+
+ # pause the source node to make EOF wait timeout. Do not pause
+ # the child process, so it can deliver slot snapshot to destination
+ set r0_process_id [S 0 process_id]
+ pause_process $r0_process_id
+
+ # the destination node will fail after 7s, 5s for EOF wait and 2s for slot snapshot
+ wait_for_condition 1000 20 {
+ [string match {*failed*} [migration_status 1 $task_id state]] &&
+ [string match {*Main channel*Connection timeout*wait-stream-eof*} \
+ [migration_status 1 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+
+ # resume the source node
+ resume_process $r0_process_id
+
+ # After the source node is resumed, the task on source node may receive
+ # ACKs from destination and consider the task is stream-done. In this case,
+ # the task on source node will be failed after several seconds
+ if {[string match {*stream-done*} [migration_status 0 $task_id state]]} {
+ wait_for_condition 1000 20 {
+ [string match {*failed*} [migration_status 0 $task_id state]] &&
+ [string match {*Server paused*} [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+ }
+
+ R 1 config set repl-timeout 60
+ R 0 cluster migration cancel id $task_id
+ R 1 cluster migration cancel id $task_id
+ }
+
+ test "Destination node rdb channel timeout when transferring slots snapshot" {
+ # cost 10s to transfer each key
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 2 10000000]
+ R 1 config set repl-timeout 3
+
+ # the destination node will fail after 3s
+ wait_for_condition 1000 20 {
+ [string match {*failed*} [migration_status 1 $task_id state]] &&
+ [string match {*RDB channel*Connection timeout*rdbchannel-transfer*} \
+ [migration_status 1 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+
+ R 1 config set repl-timeout 60
+ R 0 cluster migration cancel id $task_id
+ R 1 cluster migration cancel id $task_id
+ }
+
+ test "Source node rdb channel timeout when transferring slots snapshot" {
+ set r1_pid [S 1 process_id]
+ R 0 flushall
+ R 0 config set save ""
+ # generate several large keys, make sure the memory usage is more than
+ # socket buffer size, so the rdb channel will block and timeout if
+ # no data is received by destination.
+ set val [string repeat "a" 102400] ;# 100kb
+ for {set i 0} {$i < 1000} {incr i} {
+ set key [slot_key 0 "key$i"]
+ R 0 set $key $val
+ }
+ R 0 config set repl-timeout 3 ;# 3s for rdb channel timeout
+ R 0 config set rdb-key-save-delay 10000 ;# 1000 keys cost 10s to save
+
+ # start migration from #0 to #1
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
+ wait_for_condition 1000 20 {
+ [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not start"
+ }
+
+ # pause the destination node to make rdb channel timeout
+ pause_process $r1_pid
+
+ # the source node will fail, the rdb child process can not
+ # write data to destination, so it will timeout
+ wait_for_condition 1000 30 {
+ [string match {*failed*} [migration_status 0 $task_id state]] &&
+ [string match {*RDB channel*Failed to send slots snapshot*} \
+ [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+ resume_process $r1_pid
+
+ R 0 config set repl-timeout 60
+ R 0 cluster migration cancel id $task_id
+ R 1 cluster migration cancel id $task_id
+ }
+
+ test "Source node main channel timeout when sending incremental stream" {
+ R 0 flushall
+ R 0 config set repl-timeout 2 ;# 2s for main channel timeout
+
+ set r1_pid [S 1 process_id]
+ # in order to have time to pause the destination node
+ R 1 config set key-load-delay 50000 ;# 50ms each 16k data
+
+ # start migration from #0 to #1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ # Create 200 keys of 16k size traffic on slot 0, streaming buffer need 10s (200*50ms)
+ populate_slot 200 -idx 0 -slot 0 -size 16384
+
+ # wait for streaming buffer state, then pause the destination node
+ wait_for_condition 1000 20 {
+ [string match {*streaming-buffer*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not stream buffer, state: [migration_status 1 $task_id state]"
+ }
+ pause_process $r1_pid
+
+ # Start the slot 0 write load on the R 0
+ set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 [slot_key 0 mykey] 500]
+
+ # the source node will fail after several seconds (including the time
+ # to fill the socket buffer of source node), the main channel can not
+ # write data to destination since the destination is paused
+ wait_for_condition 1000 30 {
+ [string match {*failed*} [migration_status 0 $task_id state]] &&
+ [string match {*Main channel*Connection timeout*} \
+ [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+ stop_write_load $load_handle
+ resume_process $r1_pid
+
+ R 0 config set repl-timeout 60
+ R 1 config set key-load-delay 0
+ R 0 cluster migration cancel id $task_id
+ R 1 cluster migration cancel id $task_id
+ R 0 flushall
+ }
+
+ test "Source server paused timeout" {
+ # set timeout to 0, so the task will fail immediately when checking timeout
+ R 0 config set cluster-slot-migration-write-pause-timeout 0
+
+ # start migration from node 0 to 1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ # start the slot 0 write load on the node 0
+ set slot0_key [slot_key 0 mykey]
+ set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key]
+
+ # node 0 will fail since server paused timeout
+ wait_for_condition 2000 10 {
+ [string match {*failed*} [migration_status 0 $task_id state]] &&
+ [string match {*Server paused timeout*} \
+ [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+
+ stop_write_load $load_handle
+
+ # reset config
+ R 0 config set cluster-slot-migration-write-pause-timeout 10000
+ R 0 cluster migration cancel id $task_id
+ R 1 cluster migration cancel id $task_id
+ }
+
+ test "Sync buffer drain timeout" {
+ # set a fail point to avoid the source node to enter handoff prep state
+ # to test the sync buffer drain timeout
+ R 0 debug asm-failpoint "migrate-main-channel" "handoff-prep"
+ R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 5000
+
+ set r1_pid [S 1 process_id]
+
+ # start migration from node 0 to 1
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ # start the slot 0 write load on the node 0
+ set slot0_key [slot_key 0 mykey]
+ set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key]
+
+ # wait for entering streaming buffer state
+ wait_for_condition 1000 10 {
+ [string match {*wait-stream-eof*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not enter wait-stream-eof state"
+ }
+
+ pause_process $r1_pid ;# avoid the destination to apply commands
+
+ # node 0 will fail since sync buffer drain timeout
+ wait_for_condition 2000 10 {
+ [string match {*failed*} [migration_status 0 $task_id state]] &&
+ [string match {*Sync buffer drain timeout*} \
+ [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+
+ stop_write_load $load_handle
+ resume_process $r1_pid
+
+ # reset config
+ R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 60000
+ R 0 debug asm-failpoint "" ""
+ R 0 cluster migration cancel id $task_id
+ R 1 cluster migration cancel id $task_id
+ }
+
+ test "Cluster implementation cannot start migrate task temporarily" {
+ # Inject a fail point to make the source node not ready
+ R 0 debug asm-failpoint "migrate-main-channel" "none"
+
+ # start migration from node 0 to 1
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
+
+ # verify source node replies SYNCSLOTS with -NOTREADY
+ set loglines [count_log_lines -1]
+ wait_for_log_messages -1 {"*Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later*"} $loglines 100 100
+
+ # clear the fail point and verify the task is completed
+ R 0 debug asm-failpoint "" ""
+ wait_for_asm_done
+ assert_equal "completed" [migration_status 0 $task_id state]
+ assert_equal "completed" [migration_status 1 $task_id state]
+
+ # cleanup
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ }
+}
+
+start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
+ test "Test bgtrim after a successful migration" {
+ R 0 debug asm-trim-method bg
+ R 3 debug asm-trim-method bg
+ R 0 CONFIG RESETSTAT
+ R 3 CONFIG RESETSTAT
+
+ R 0 flushall
+ # Fill slot 0
+ populate_slot 1000 -idx 0 -slot 0
+ # Fill slot 1 with keys that have TTL
+ populate_slot 1000 -idx 0 -slot 1 -prefix "expirekey" -expires 100
+ # HFE key on slot 2
+ set slot2_hfekey [slot_key 2 hfekey]
+ R 0 HSETEX $slot2_hfekey EX 10 FIELDS 1 f1 v1
+
+ # Fill slot 101, these keys won't be migrated
+ populate_slot 1000 -idx 0 -slot 101
+ # Fill slot 102 with keys that have TTL
+ populate_slot 1000 -idx 0 -slot 102 -prefix "expirekey" -expires 100
+ # HFE key on slot 103
+ set slot103_hfekey [slot_key 103 hfekey]
+ R 0 HSETEX $slot103_hfekey EX 10 FIELDS 1 f1 v1
+
+ # migrate slot 0 to node-1
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+
+ # Verify the data is migrated
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+ assert_equal 2001 [R 0 dbsize]
+ assert_equal 2001 [R 3 dbsize]
+ wait_for_ofs_sync [Rn 1] [Rn 4]
+ assert_equal 2001 [R 1 dbsize]
+ assert_equal 2001 [R 4 dbsize]
+
+ # Verify the keys are trimmed lazily
+ wait_for_condition 1000 10 {
+ [S 0 lazyfreed_objects] == 2001 &&
+ [S 3 lazyfreed_objects] == 2001
+ } else {
+ puts "lazyfreed_objects: [S 0 lazyfreed_objects] [S 3 lazyfreed_objects]"
+ fail "Background trim did not happen"
+ }
+
+ # Cleanup
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ R 0 flushall
+ R 0 debug asm-trim-method default
+ R 3 debug asm-trim-method default
+ }
+
+ test "Test bgtrim after a failed migration" {
+ R 0 debug asm-trim-method bg
+ R 3 debug asm-trim-method bg
+ R 1 CONFIG RESETSTAT
+ R 4 CONFIG RESETSTAT
+
+ # Fill slot 0 on node-0 and migrate it to node-1 (with some delay)
+ R 0 flushall
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000]
+ after 1000 ;# wait some time so that some keys are moved
+
+ # Fail the migration
+ R 1 CLUSTER MIGRATION CANCEL ID $task_id
+ wait_for_asm_done
+
+ # Verify the data is not migrated
+ assert_equal 10000 [R 0 dbsize]
+ assert_equal 10000 [R 3 dbsize]
+
+ # Verify the keys are trimmed lazily after a failed import on dest side.
+ wait_for_condition 1000 20 {
+ [R 1 dbsize] == 0 &&
+ [R 4 dbsize] == 0 &&
+ [S 1 lazyfreed_objects] > 0 &&
+ [S 4 lazyfreed_objects] > 0
+ } else {
+ fail "Background trim did not happen"
+ }
+
+ # Cleanup
+ wait_for_asm_done
+ R 0 flushall
+ R 0 debug asm-trim-method default
+ R 3 debug asm-trim-method default
+ }
+
+ test "Test bgtrim unblocks stream client" {
+ # Two clients waiting for data on two different streams which are in
+ # different slots. We are going to migrate one slot, which will unblock
+ # the client. The other client should still be blocked.
+ R 0 debug asm-trim-method bg
+
+ set key0 [slot_key 0 mystream]
+ set key1 [slot_key 1 mystream]
+
+ # First client waits on slot-0 key
+ R 0 DEL $key0
+ R 0 XADD $key0 666 f v
+ R 0 XGROUP CREATE $key0 mygroup $
+ set rd0 [redis_deferring_client]
+ $rd0 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key0 ">"
+ wait_for_blocked_clients_count 1
+
+ # Second client waits on slot-1 key
+ R 0 DEL $key1
+ R 0 XADD $key1 666 f v
+ R 0 XGROUP CREATE $key1 mygroup $
+ set rd1 [redis_deferring_client]
+ $rd1 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key1 ">"
+ wait_for_blocked_clients_count 2
+
+ # Migrate slot 0
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+
+ # First client should get MOVED error
+ assert_error "*MOVED*" {$rd0 read}
+ $rd0 close
+
+ # Second client should operate normally
+ R 0 XADD $key1 667 f v
+ set res [$rd1 read]
+ assert_equal [lindex $res 0 1 0] {667-0 {f v}}
+ $rd1 close
+
+ # cleanup
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ R 0 flushall
+ R 0 debug asm-trim-method default
+ }
+
+ test "Test bgtrim touches watched keys" {
+ R 0 debug asm-trim-method bg
+
+ # bgtrim should touch watched keys on migrated slots
+ set key0 [slot_key 0 key]
+ R 0 set $key0 30
+ R 0 watch $key0
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ R 0 multi
+ R 0 ping
+ assert_equal {} [R 0 exec]
+
+ # bgtrim should not touch watched keys on other slots
+ set key2 [slot_key 2 key]
+ R 0 set $key2 30
+ R 0 watch $key2
+ R 1 CLUSTER MIGRATION IMPORT 1 1
+ wait_for_asm_done
+ R 0 multi
+ R 0 ping
+ assert_equal PONG [R 0 exec]
+
+ # cleanup
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_asm_done
+ R 0 flushall
+ R 0 debug asm-trim-method default
+ }
+
+ test "Test bgtrim after a FAILOVER on destination side" {
+ R 1 debug asm-trim-method bg
+ R 4 debug asm-trim-method bg
+
+ set loglines [count_log_lines -4]
+
+ # Fill slot 0 on node-0 and migrate it to node-1 (with some delay)
+ R 0 flushall
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000]
+ after 1000 ;# wait some time so that some keys are moved
+
+ # Trigger a failover with force to simulate unreachable master and
+ # verify unowned keys are trimmed once replica becomes master.
+ failover_and_wait_for_done 4 force
+ wait_for_log_messages -4 {"*Detected keys in slots that do not belong*Scheduling trim*"} $loglines 1000 10
+ wait_for_condition 1000 10 {
+ [R 1 dbsize] == 0 &&
+ [R 4 dbsize] == 0
+ } else {
+ fail "Background trim did not happen"
+ }
+
+ # cleanup
+ wait_for_cluster_propagation
+ failover_and_wait_for_done 1
+ R 0 config set rdb-key-save-delay 0
+ R 1 debug asm-trim-method default
+ R 4 debug asm-trim-method default
+ wait_for_asm_done
+ }
+
+ test "CLUSTER SETSLOT is not allowed if there is a pending trim job" {
+ R 0 debug asm-trim-method bg
+ R 3 debug asm-trim-method bg
+
+ # Fill slot 0 on node-0 and migrate it to node-1 (with some delay)
+ R 0 flushall
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000]
+
+ # Pause will cancel the task and there will be a pending trim job
+ # until writes are allowed again.
+ R 1 client pause 100000 write ;# pause 100s
+ wait_for_asm_done
+
+ # CLUSTER SETSLOT is not allowed if there is a pending trim job.
+ assert_error {*There is a pending trim job for slot 0*} {R 1 CLUSTER SETSLOT 0 STABLE}
+
+ # Unpause the server, trim will be triggered and SETSLOT will be allowed
+ R 1 client unpause
+ R 1 CLUSTER SETSLOT 0 STABLE
+ }
+}
+
+start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no save ""}} {
+ test "Test active trim after a successful migration" {
+ R 0 debug asm-trim-method active
+ R 3 debug asm-trim-method active
+ populate_slot 500 -slot 0
+ populate_slot 500 -slot 1
+ populate_slot 500 -slot 3
+ populate_slot 500 -slot 4
+
+ # Migrate 1500 keys
+ R 1 CLUSTER MIGRATION IMPORT 0 1 3 3
+ wait_for_asm_done
+
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] == 1500 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 0 &&
+ [CI 3 cluster_slot_migration_active_trim_current_job_trimmed] == 1500
+ } else {
+ fail "trim failed"
+ }
+
+ assert_equal 1500 [CI 0 cluster_slot_migration_active_trim_current_job_keys]
+ assert_equal 1500 [CI 3 cluster_slot_migration_active_trim_current_job_keys]
+
+ assert_equal 500 [R 0 dbsize]
+ assert_equal 500 [R 3 dbsize]
+ assert_equal 1500 [R 1 dbsize]
+ assert_equal 1500 [R 4 dbsize]
+ assert_equal 0 [R 0 cluster countkeysinslot 0]
+ assert_equal 0 [R 0 cluster countkeysinslot 1]
+ assert_equal 0 [R 0 cluster countkeysinslot 3]
+ assert_equal 500 [R 0 cluster countkeysinslot 4]
+
+ # cleanup
+ R 0 debug asm-trim-method default
+ R 3 debug asm-trim-method default
+ R 0 CLUSTER MIGRATION IMPORT 0 1 3 3
+ wait_for_asm_done
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test multiple active trim jobs can be scheduled" {
+ # Active trim will be scheduled but it won't run
+ R 0 debug asm-trim-method active -1
+ R 3 debug asm-trim-method active -1
+
+ populate_slot 500 -slot 0
+ populate_slot 500 -slot 1
+ populate_slot 500 -slot 3
+ populate_slot 500 -slot 4
+
+ # Migrate 1500 keys
+ R 1 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ fail "migrate failed"
+ }
+
+ # Migrate another slot and verify there are two trim tasks on the source
+ R 1 CLUSTER MIGRATION IMPORT 3 3
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 2 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 2
+ } else {
+ fail "migrate failed"
+ }
+
+ # Enabled active trim and wait until it is completed.
+ R 0 debug asm-trim-method active 0
+ R 3 debug asm-trim-method active 0
+ wait_for_asm_done
+
+ assert_equal 500 [R 0 dbsize]
+ assert_equal 500 [R 3 dbsize]
+ assert_equal 0 [R 0 cluster countkeysinslot 0]
+ assert_equal 0 [R 0 cluster countkeysinslot 1]
+ assert_equal 0 [R 0 cluster countkeysinslot 3]
+ assert_equal 500 [R 0 cluster countkeysinslot 4]
+
+ # cleanup
+ R 0 debug asm-trim-method default
+ R 3 debug asm-trim-method default
+ R 0 CLUSTER MIGRATION IMPORT 0 1 3 3
+ wait_for_asm_done
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test active-trim clears partially imported keys on cancel" {
+ R 1 debug asm-trim-method active
+ R 4 debug asm-trim-method active
+
+ # Rdb delivery will take 10 seconds
+ R 0 config set rdb-key-save-delay 10000
+ populate_slot 250 -slot 0
+ populate_slot 250 -slot 1
+ populate_slot 250 -slot 3
+ populate_slot 250 -slot 4
+
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ after 2000
+ R 1 CLUSTER MIGRATION CANCEL ALL
+ wait_for_asm_done
+
+ assert_morethan [CI 1 cluster_slot_migration_active_trim_current_job_keys] 0
+ assert_morethan [CI 4 cluster_slot_migration_active_trim_current_job_trimmed] 0
+
+ assert_equal 1000 [R 0 dbsize]
+ assert_equal 1000 [R 3 dbsize]
+ assert_equal 0 [R 1 dbsize]
+ assert_equal 0 [R 4 dbsize]
+
+ # Cleanup
+ R 1 debug asm-trim-method default
+ R 4 debug asm-trim-method default
+ R 0 config set rdb-key-save-delay 0
+ }
+
+ test "Test active-trim clears partially imported keys on failover" {
+ R 1 debug asm-trim-method active
+ R 4 debug asm-trim-method active
+
+ # Rdb delivery will take 10 seconds
+ R 0 config set rdb-key-save-delay 10000
+
+ populate_slot 250 -slot 0
+ populate_slot 250 -slot 1
+ populate_slot 250 -slot 3
+ populate_slot 250 -slot 4
+
+ set prev_trim_started_1 [CI 1 cluster_slot_migration_stats_active_trim_started]
+ set prev_trim_started_4 [CI 4 cluster_slot_migration_stats_active_trim_started]
+
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ after 2000
+ failover_and_wait_for_done 4
+ wait_for_asm_done
+
+ # Verify there is at least one trim job started
+ assert_morethan [CI 1 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_1
+ assert_morethan [CI 4 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_4
+
+ assert_equal 1000 [R 0 dbsize]
+ assert_equal 1000 [R 3 dbsize]
+ assert_equal 0 [R 1 dbsize]
+ assert_equal 0 [R 4 dbsize]
+
+ # Cleanup
+ failover_and_wait_for_done 1
+ R 1 debug asm-trim-method default
+ R 4 debug asm-trim-method default
+ R 0 config set rdb-key-save-delay 0
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test import task does not start if active trim is in progress for the same slots" {
+ # Active trim will be scheduled but it won't run
+ R 0 flushall
+ R 1 flushall
+ R 0 debug asm-trim-method active -1
+
+ populate_slot 500 -slot 0
+ populate_slot 500 -slot 1
+
+ # Migrate 1000 keys
+ R 1 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ fail "migrate failed"
+ }
+
+ # Try to migrate slots back
+ R 0 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_log_messages 0 {"*Can not start import task*trim in progress for some of the slots*"} 0 1000 10
+
+ # Enabled active trim and verify slots are imported back
+ R 0 debug asm-trim-method active 0
+ wait_for_asm_done
+
+ assert_equal 1000 [R 0 dbsize]
+ assert_equal 500 [R 0 cluster countkeysinslot 0]
+ assert_equal 500 [R 0 cluster countkeysinslot 1]
+
+ # cleanup
+ R 0 debug asm-trim-method default
+ R 0 flushall
+ }
+
+ test "Rdb save during active trim should skip keys in trimmed slots" {
+ # Insert some delay to activate trim
+ R 0 debug asm-trim-method active 1000
+ R 0 config set repl-diskless-sync-delay 0
+ R 0 flushall
+
+ populate_slot 5000 -idx 0 -slot 0
+ populate_slot 5000 -idx 0 -slot 1
+ populate_slot 5000 -idx 0 -slot 2
+
+ # Start migration and wait until trim is in progress
+ R 1 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1 &&
+ [S 0 rdb_bgsave_in_progress] == 0
+ } else {
+ puts "[CI 0 cluster_slot_migration_active_tasks]"
+ puts "[CI 0 cluster_slot_migration_active_trim_running]"
+ fail "trim failed"
+ }
+
+ # Trigger save during active trim
+ R 0 save
+ # Wait until the log contains a "keys skipped" message with a non-zero value
+ wait_for_log_messages 0 {"*BGSAVE done, 5000 keys saved, [1-9]* keys skipped*"} 0 1000 10
+
+ restart_server 0 yes no yes nosave
+ assert_equal 5000 [R 0 dbsize]
+ assert_equal 0 [R 0 cluster countkeysinslot 0]
+ assert_equal 0 [R 0 cluster countkeysinslot 1]
+ assert_equal 5000 [R 0 cluster countkeysinslot 2]
+
+ # Cleanup
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+ R 0 flushall
+ R 1 flushall
+ R 0 save
+ R 0 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_asm_done
+ }
+
+ test "AOF rewrite during active trim should skip keys in trimmed slots" {
+ R 0 debug asm-trim-method active 1000
+ R 0 config set repl-diskless-sync-delay 0
+ R 0 config set aof-use-rdb-preamble no
+ R 0 config set appendonly yes
+ R 0 config rewrite
+ R 0 flushall
+ populate_slot 5000 -idx 0 -slot 0
+ populate_slot 5000 -idx 0 -slot 1
+ populate_slot 5000 -idx 0 -slot 2
+
+ R 1 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ puts "[CI 0 cluster_slot_migration_active_tasks]"
+ puts "[CI 0 cluster_slot_migration_active_trim_running]"
+ fail "trim failed"
+ }
+
+ wait_for_condition 50 100 {
+ [S 0 rdb_bgsave_in_progress] == 0
+ } else {
+ fail "bgsave is in progress"
+ }
+
+ R 0 bgrewriteaof
+ # Wait until the log contains a "keys skipped" message with a non-zero value
+ wait_for_log_messages 0 {"*AOF rewrite done, [1-9]* keys saved, [1-9]* keys skipped*"} 0 1000 10
+
+ restart_server 0 yes no yes nosave
+ assert_equal 5000 [R 0 dbsize]
+ assert_equal 0 [R 0 cluster countkeysinslot 0]
+ assert_equal 0 [R 0 cluster countkeysinslot 1]
+ assert_equal 5000 [R 0 cluster countkeysinslot 2]
+
+ # cleanup
+ R 0 config set appendonly no
+ R 0 config rewrite
+ restart_server 0 yes no yes nosave
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+ R 0 flushall
+ R 1 flushall
+ R 0 save
+ R 0 CLUSTER MIGRATION IMPORT 0 1
+ wait_for_asm_done
+ }
+
+ test "Pause actions will stop active trimming" {
+ R 0 debug asm-trim-method active 1000
+ R 0 config set repl-diskless-sync-delay 0
+ R 0 flushall
+ populate_slot 10000 -idx 0 -slot 0
+
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ puts "[CI 0 cluster_slot_migration_active_tasks]"
+ puts "[CI 0 cluster_slot_migration_active_trim_running]"
+ fail "trim failed"
+ }
+
+ # Pause the server and verify no keys are trimmed
+ R 0 client pause 100000 write ;# pause 100s
+ set prev [CI 0 cluster_slot_migration_active_trim_current_job_trimmed]
+ after 1000 ; # wait some time to see if any keys are trimmed
+ set curr [CI 0 cluster_slot_migration_active_trim_current_job_trimmed]
+ assert_equal $prev $curr
+
+ R 0 client unpause
+ R 0 debug asm-trim-method default
+ wait_for_asm_done
+ assert_equal 0 [R 0 dbsize]
+
+ # revert
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ assert_equal 10000 [R 0 dbsize]
+ }
+
+ foreach diskless_load {"disabled" "swapdb" "on-empty-db"} {
+ test "Test fullsync cancels active trim (repl-diskless-load $diskless_load)" {
+ R 3 debug asm-trim-method active -10
+ R 3 config set repl-diskless-load $diskless_load
+ R 0 flushall
+
+ R 0 config set repl-diskless-sync-delay 0
+ populate_slot 10000 -idx 0 -slot 0
+
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ puts "[CI 0 cluster_slot_migration_active_tasks]"
+ puts "[CI 0 cluster_slot_migration_active_trim_running]"
+ puts "[CI 3 cluster_slot_migration_active_trim_running]"
+ fail "trim failed"
+ }
+
+ set prev_cancelled [CI 3 cluster_slot_migration_stats_active_trim_cancelled]
+ R 0 config set client-output-buffer-limit "replica 1024 0 0"
+
+ # Trigger a fullsync
+ populate_slot 1 -idx 0 -size 2000000 -slot 2
+
+ wait_for_condition 1000 10 {
+ [CI 3 cluster_slot_migration_active_trim_running] == 0 &&
+ [CI 3 cluster_slot_migration_stats_active_trim_cancelled] == $prev_cancelled + 1
+ } else {
+ puts "[CI 3 cluster_slot_migration_active_trim_running]"
+ puts "[CI 3 cluster_slot_migration_stats_active_trim_cancelled]"
+ fail "trim failed"
+ }
+
+ R 3 debug asm-trim-method active 0
+ R 3 config set repl-diskless-load disabled
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+ assert_equal 10001 [R 0 dbsize]
+ assert_equal 10001 [R 3 dbsize]
+ assert_equal 0 [R 1 dbsize]
+ assert_equal 0 [R 4 dbsize]
+ R 0 flushall
+ }
+ }
+
+ test "Test importing slots while active-trim is in progress for the same slots on replica" {
+ R 3 debug asm-trim-method active 10000
+ R 0 flushall
+ populate_slot 10000 -slot 0
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+
+ # Wait until active trim is in progress on replica
+ R 1 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ puts "[CI 0 cluster_slot_migration_active_tasks]"
+ puts "[CI 0 cluster_slot_migration_active_trim_running]"
+ puts "[CI 3 cluster_slot_migration_active_trim_running]"
+ fail "trim failed"
+ }
+
+ set loglines [count_log_lines -3]
+
+ # Get slots back
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_condition 1000 20 {
+ [CI 0 cluster_slot_migration_active_tasks] == 1 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ fail "trim failed"
+ }
+
+ # Verify replica blocks master until trim is done
+ wait_for_log_messages -3 {"*Blocking master client until trim job is done*"} $loglines 1000 30
+ R 3 debug asm-trim-method active 0
+ wait_for_log_messages -3 {"*Unblocking master client after active trim*"} $loglines 1000 30
+
+ wait_for_asm_done
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+ assert_equal 10000 [R 0 dbsize]
+ assert_equal 10000 [R 3 dbsize]
+ assert_equal 0 [R 1 dbsize]
+ assert_equal 0 [R 4 dbsize]
+ }
+
+ test "TRIMSLOTS should not trim slots that this node is serving" {
+ assert_error {*the slot 0 is served by this node*} {R 0 trimslots ranges 1 0 0}
+ assert_error {*READONLY*} {R 3 trimslots ranges 1 0 100}
+ assert_equal {OK} [R 0 trimslots ranges 1 16383 16383]
+ assert_error {*READONLY*} {R 3 trimslots ranges 1 16383 16383}
+ }
+
+ test "Trigger multiple active trim jobs at the same time" {
+ R 1 debug asm-trim-method active 0
+ R 1 flushall
+
+ set prev_trim_done [CI 1 cluster_slot_migration_stats_active_trim_completed]
+
+ R 1 debug populate 1000 [slot_prefix 0] 100
+ R 1 debug populate 1000 [slot_prefix 1] 100
+ R 1 debug populate 1000 [slot_prefix 2] 100
+
+ R 1 multi
+ R 1 trimslots ranges 1 0 0
+ R 1 trimslots ranges 1 1 1
+ R 1 trimslots ranges 1 2 2
+ R 1 exec
+
+ wait_for_condition 1000 10 {
+ [CI 1 cluster_slot_migration_stats_active_trim_completed] == $prev_trim_done + 3
+ } else {
+ fail "active trim failed"
+ }
+
+ R 1 flushall
+ R 1 debug asm-trim-method default
+ }
+
+ test "Restart will clean up unowned slot keys" {
+ R 1 flushall
+
+ # generate 1000 keys belonging to slot 0
+ R 1 debug populate 1000 [slot_prefix 0] 100
+ assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1000}
+
+ # restart node-1
+ restart_server -1 true false true save
+ wait_for_cluster_propagation
+ wait_for_cluster_state "ok"
+
+ # Node-1 has no keys since unowned slot 0 keys were cleaned up during restart
+ assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] == {}}
+
+ R 1 flushall
+ }
+
+ test "Test active trim is used when client tracking is used" {
+ R 0 flushall
+ R 1 flushall
+ R 0 debug asm-trim-method default
+ R 1 debug asm-trim-method default
+
+ set prev_active_trim [CI 0 cluster_slot_migration_stats_active_trim_completed]
+
+ # Setup a tracking client that is redirected to a pubsub client
+ set rd_redirection [redis_deferring_client]
+ $rd_redirection client id
+ set redir_id [$rd_redirection read]
+ $rd_redirection subscribe __redis__:invalidate
+ $rd_redirection read ; # Consume the SUBSCRIBE reply.
+
+ # setup tracking
+ set key0 [slot_key 0 key]
+ R 0 CLIENT TRACKING on REDIRECT $redir_id
+ R 0 SET $key0 1
+ R 0 GET $key0
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_stats_active_trim_completed] == [expr $prev_active_trim + 1]
+ } else {
+ fail "active trim did not happen"
+ }
+
+ # Verify the tracking client received the invalidation message
+ set msg [$rd_redirection read]
+ set head [lindex $msg 0]
+
+ if {$head eq "message"} {
+ # RESP 2
+ set got_key [lindex [lindex $msg 2] 0]
+ } elseif {$head eq "invalidate"} {
+ # RESP 3
+ set got_key [lindex $msg 1 0]
+ } else {
+ fail "unexpected invalidation message: $msg"
+ }
+ assert_equal $got_key $key0
+
+ # cleanup
+ $rd_redirection close
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ R 0 flushall
+ }
+}
+
+set testmodule [file normalize tests/modules/atomicslotmigration.so]
+
+start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no]] {
+ test "Module api sanity" {
+ R 0 asm.sanity ;# on master
+ R 3 asm.sanity ;# on replica
+ }
+
+ test "Module replicate cross slot command" {
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+ set listkey [slot_key 0 "asmlist"]
+ # replicate cross slot command during migrating
+ R 0 asm.lpush_replicate_crossslot_command $listkey "item1"
+
+ # node 0 will fail due to cross slot
+ wait_for_condition 2000 10 {
+ [string match {*canceled*} [migration_status 0 $task_id state]] &&
+ [string match {*cross slot*} [migration_status 0 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+ R 1 CLUSTER MIGRATION CANCEL ID $task_id
+
+ # sanity check if lpush replicated correctly to the replica
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+ assert_equal {item1} [R 0 lrange $listkey 0 -1]
+ R 3 readonly
+ assert_equal {item1} [R 3 lrange $listkey 0 -1]
+ }
+
+ test "Test RM_ClusterCanAccessKeysInSlot" {
+ # Test invalid slots
+ assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot -1]
+ assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 20000]
+ assert_equal 0 [R 2 asm.cluster_can_access_keys_in_slot 16384]
+ assert_equal 0 [R 5 asm.cluster_can_access_keys_in_slot 16384]
+
+ # Test on a master-replica pair
+ assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 100]
+ assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 100]
+
+ # Test on a master-replica pair
+ assert_equal 1 [R 2 asm.cluster_can_access_keys_in_slot 16383]
+ assert_equal 1 [R 5 asm.cluster_can_access_keys_in_slot 16383]
+ }
+
+ test "Test RM_ClusterCanAccessKeysInSlot returns false for unowned slots" {
+ # Active trim will be scheduled but it won't run
+ R 0 debug asm-trim-method active -1
+ R 3 debug asm-trim-method active -1
+
+ setup_slot_migration_with_delay 0 1 0 100 3 1000000
+
+ # Verify importing slots are not local
+ assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 100]
+ assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 100]
+
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1 &&
+ [CI 3 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ fail "migrate failed"
+ }
+
+ # Wait for config propagation before checking the slot ownership on replica
+ wait_for_cluster_propagation
+
+ # Verify slots that are being trimmed are not local
+ assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 100]
+ assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 100]
+
+ # Enabled active trim and wait until it is completed.
+ R 0 debug asm-trim-method active 0
+ R 3 debug asm-trim-method active 0
+ wait_for_asm_done
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+
+ # Verify slots are local after migration
+ assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 100]
+ assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 0]
+ assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 100]
+
+ # cleanup
+ R 0 debug asm-trim-method default
+ R 3 debug asm-trim-method default
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ R 0 flushall
+ R 1 flushall
+ }
+
+ foreach trim_method {"active" "bg"} {
+ test "Test cluster module notifications on a successful migration ($trim_method-trim)" {
+ clear_module_event_log
+ R 0 debug asm-trim-method $trim_method
+ R 3 debug asm-trim-method $trim_method
+ R 6 debug asm-trim-method $trim_method
+
+ # Set a key in the slot range
+ set key [slot_key 0 mykey]
+ R 0 set $key "value"
+
+ # Migrate the slot ranges
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100 200 300]
+ wait_for_asm_done
+
+ set src_id [R 0 cluster myid]
+ set dest_id [R 1 cluster myid]
+
+ # Verify the events on source, both master and replica
+ set migrate_event_log [list \
+ "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
+ "sub: cluster-slot-migration-migrate-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
+ ]
+ assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log
+ assert_equal [R 3 asm.get_cluster_event_log] {}
+ assert_equal [R 6 asm.get_cluster_event_log] {}
+
+ # Verify the events on destination, both master and replica
+ set import_event_log [list \
+ "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
+ "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
+ ]
+ wait_for_condition 500 20 {
+ [R 1 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 4 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 7 asm.get_cluster_event_log] eq $import_event_log
+ } else {
+ puts "R1: [R 1 asm.get_cluster_event_log]"
+ puts "R4: [R 4 asm.get_cluster_event_log]"
+ puts "R7: [R 7 asm.get_cluster_event_log]"
+ fail "ASM import event not received"
+ }
+
+ # Verify the trim events
+ if {$trim_method eq "active"} {
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-started, slots:0-100,200-300" \
+ "keyspace: key_trimmed, key: $key" \
+ "sub: cluster-slot-migration-trim-completed, slots:0-100,200-300" \
+ ]
+ } else {
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-background, slots:0-100,200-300" \
+ ]
+ }
+ wait_for_condition 500 10 {
+ [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log
+ } else {
+ fail "ASM source trim event not received"
+ }
+
+ # cleanup
+ R 0 CLUSTER MIGRATION IMPORT 0 100 200 300
+ wait_for_asm_done
+ clear_module_event_log
+ reset_default_trim_method
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test cluster module notifications on a failed migration ($trim_method-trim)" {
+ clear_module_event_log
+ R 1 debug asm-trim-method $trim_method
+ R 4 debug asm-trim-method $trim_method
+ R 7 debug asm-trim-method $trim_method
+
+ # Set a key in the slot range
+ set key [slot_key 0 mykey]
+ R 0 set $key "value"
+
+ # Start migration and cancel it
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
+ # Wait until at least one key is moved to destination
+ wait_for_condition 1000 10 {
+ [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
+ } else {
+ fail "Key not moved to destination"
+ }
+ R 1 CLUSTER MIGRATION CANCEL ID $task_id
+ wait_for_asm_done
+
+ set src_id [R 0 cluster myid]
+ set dest_id [R 1 cluster myid]
+
+ # Verify the events on source, both master and replica
+ set migrate_event_log [list \
+ "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ ]
+ assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log
+ assert_equal [R 3 asm.get_cluster_event_log] {}
+ assert_equal [R 6 asm.get_cluster_event_log] {}
+
+ # Verify the events on destination, both master and replica
+ set import_event_log [list \
+ "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ ]
+ wait_for_condition 500 10 {
+ [R 1 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 4 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 7 asm.get_cluster_event_log] eq $import_event_log
+ } else {
+ fail "ASM import event not received"
+ }
+
+ # Verify the trim events on destination (partially imported keys are trimmed)
+ if {$trim_method eq "active"} {
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-started, slots:0-100" \
+ "keyspace: key_trimmed, key: $key" \
+ "sub: cluster-slot-migration-trim-completed, slots:0-100" \
+ ]
+ } else {
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-background, slots:0-100" \
+ ]
+ }
+ wait_for_condition 500 10 {
+ [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log
+ } else {
+ fail "ASM destination trim event not received"
+ }
+
+ # cleanup
+ clear_module_event_log
+ reset_default_trim_method
+ wait_for_asm_done
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test cluster module notifications on failover ($trim_method-trim)" {
+ # NOTE: cluster legacy may have a bug, multiple manual failover will fail,
+ # so only perform one round of failover test, fix it later
+ if {$trim_method eq "bg"} {
+ clear_module_event_log
+ R 1 debug asm-trim-method $trim_method
+ R 4 debug asm-trim-method $trim_method
+ R 7 debug asm-trim-method $trim_method
+
+ # Set a key in the slot range
+ set key [slot_key 0 mykey]
+ R 0 set $key "value"
+
+ # Start migration
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
+ # Wait until at least one key is moved to destination
+ wait_for_condition 1000 10 {
+ [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
+ } else {
+ fail "Key not moved to destination"
+ }
+
+ failover_and_wait_for_done 4
+ wait_for_asm_done
+
+ set src_id [R 0 cluster myid]
+ set dest_id [R 1 cluster myid]
+
+ # Verify the events on source, both master and replica
+ set migrate_event_log [list \
+ "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ ]
+ assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log
+ assert_equal [R 3 asm.get_cluster_event_log] {}
+ assert_equal [R 6 asm.get_cluster_event_log] {}
+
+ # Verify the events on destination, both master and replica
+ set import_event_log [list \
+ "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ ]
+ wait_for_condition 500 20 {
+ [R 1 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 4 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 7 asm.get_cluster_event_log] eq $import_event_log
+ } else {
+ puts "R1: [R 1 asm.get_cluster_event_log]"
+ puts "R4: [R 4 asm.get_cluster_event_log]"
+ puts "R7: [R 7 asm.get_cluster_event_log]"
+ fail "ASM import event not received"
+ }
+
+ # Verify the trim events on destination (partially imported keys are trimmed)
+ # NOTE: after failover, the new master will initiate the slot trimming,
+ # and only slot 0 has data, so only slot 0 is trimmed
+ if {$trim_method eq "active"} {
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-started, slots:0-0" \
+ "keyspace: key_trimmed, key: $key" \
+ "sub: cluster-slot-migration-trim-completed, slots:0-0" \
+ ]
+ } else {
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-background, slots:0-0" \
+ ]
+ }
+ wait_for_condition 500 20 {
+ [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log
+ } else {
+ puts "R1: [R 1 asm.get_cluster_trim_event_log]"
+ puts "R4: [R 4 asm.get_cluster_trim_event_log]"
+ puts "R7: [R 7 asm.get_cluster_trim_event_log]"
+ fail "ASM destination trim event not received"
+ }
+
+ # cleanup
+ failover_and_wait_for_done 1
+ clear_module_event_log
+ reset_default_trim_method
+ R 0 flushall
+ R 1 flushall
+ }
+ }
+ }
+
+ foreach with_rdb {"with" "without"} {
+ test "Test cluster module notifications when replica restart $with_rdb RDB during importing" {
+ clear_module_event_log
+ R 1 debug asm-trim-method $trim_method
+ R 4 debug asm-trim-method $trim_method
+ R 7 debug asm-trim-method $trim_method
+ R 4 config set save ""
+
+ set src_id [R 0 cluster myid]
+ set dest_id [R 1 cluster myid]
+
+ # Set a key in the slot range
+ set key [slot_key 0 mykey]
+ R 0 set $key "value"
+
+ # Start migration, 2s delay
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
+ # Wait until at least one key is moved to destination
+ wait_for_condition 1000 10 {
+ [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
+ } else {
+ fail "Key not moved to destination"
+ }
+ wait_for_ofs_sync [Rn 1] [Rn 4]
+
+ # restart node 4
+ if {$with_rdb eq "with"} {
+ restart_server -4 true false true save ;# rdb save
+ } else {
+ restart_server -4 true false true nosave ;# no rdb saved
+ }
+ wait_for_cluster_propagation
+
+ wait_for_asm_done
+
+ # started and completed are paired, and not duplicated
+ set import_event_log [list \
+ "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ ]
+ wait_for_condition 500 10 {
+ [R 1 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 4 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 7 asm.get_cluster_event_log] eq $import_event_log
+ } else {
+ fail "ASM import event not received"
+ }
+
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ R 4 save ;# save an empty rdb to override previous one
+ clear_module_event_log
+ reset_default_trim_method
+ R 0 flushall
+ R 1 flushall
+ }
+ }
+
+ test "Test cluster module notifications when replica is disconnected and full resync after importing" {
+ clear_module_event_log
+ R 1 debug asm-trim-method $trim_method
+ R 4 debug asm-trim-method $trim_method
+ R 7 debug asm-trim-method $trim_method
+
+ set src_id [R 0 cluster myid]
+ set dest_id [R 1 cluster myid]
+
+ # Set a key in the slot range
+ set key [slot_key 0 mykey]
+ R 0 set $key "value"
+
+ # Start migration, 2s delay
+ set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
+ # Wait until at least one key is moved to destination
+ wait_for_condition 1000 10 {
+ [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
+ } else {
+ fail "Key not moved to destination"
+ }
+ wait_for_ofs_sync [Rn 1] [Rn 4]
+
+ # puase node-4
+ set r4_pid [S 4 process_id]
+ pause_process $r4_pid
+
+ # set a small repl-backlog-size and write some commands to make node-4
+ # full resync when reconnecting after waking up
+ set r1_full_sync [S 1 sync_full]
+ R 1 config set repl-backlog-size 16kb
+ R 1 client kill type replica
+ set 1k_str [string repeat "a" 1024]
+ for {set i 0} {$i < 2000} {incr i} {
+ R 1 set [slot_key 6000] $1k_str
+ }
+
+ # after ASM task is completed, wake up node-4
+ wait_for_condition 1000 10 {
+ [CI 1 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 1 cluster_slot_migration_active_trim_running] == 0
+ } else {
+ fail "ASM tasks did not completed"
+ }
+ resume_process $r4_pid
+
+ # make sure full resync happens
+ wait_for_sync [Rn 4]
+ wait_for_ofs_sync [Rn 1] [Rn 4]
+ assert_morethan [S 1 sync_full] $r1_full_sync
+
+ # started and completed are paired, and not duplicated
+ set import_event_log [list \
+ "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
+ ]
+ wait_for_condition 500 10 {
+ [R 1 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 4 asm.get_cluster_event_log] eq $import_event_log &&
+ [R 7 asm.get_cluster_event_log] eq $import_event_log
+ } else {
+ fail "ASM import event not received"
+ }
+
+ # since ASM task is completed on node-1 before node-4 reconnects,
+ # no trim event should be received on node-4
+ assert_equal {} [R 4 asm.get_cluster_trim_event_log]
+
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ clear_module_event_log
+ reset_default_trim_method
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test new master can trim slots when migration is completed and failover occurs on source side" {
+ R 0 asm.disable_trim ;# can not start slot trimming on source side
+ set slot0_key [slot_key 0 mykey]
+ R 0 set $slot0_key "value"
+
+ # migrate slot 0 from #0 to #1, and wait it completed, but not allow to trim slots
+ # on source node
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
+ wait_for_condition 1000 10 {
+ [string match {*completed*} [migration_status 0 $task_id state]] &&
+ [string match {*completed*} [migration_status 1 $task_id state]]
+ } else {
+ fail "ASM task did not complete"
+ }
+ # verify trim is not allowed on source node, and replica node doesn't have trim job either
+ wait_for_ofs_sync [Rn 0] [Rn 3]
+ assert_equal 1 [R 0 asm.trim_in_progress]
+ assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key]
+ assert_equal 0 [R 3 asm.trim_in_progress]
+ assert_equal "value" [R 3 asm.read_pending_trim_key $slot0_key]
+
+ set loglines [count_log_lines 0]
+
+ # failover happens on source node, instance #3 become slave, #0 become master
+ failover_and_wait_for_done 3
+ R 0 asm.enable_trim ;# enable trim on old master
+
+ # old master should cancel the pending trim job
+ wait_for_log_messages 0 {"*Cancelling the pending trim job*"} $loglines 1000 10
+
+ wait_for_ofs_sync [Rn 3] [Rn 0]
+ # verify trim is allowed on new master, and the key is trimmed
+ wait_for_condition 1000 10 {
+ [R 3 asm.trim_in_progress] == 0 &&
+ [R 3 asm.read_pending_trim_key $slot0_key] eq "" &&
+ [R 0 asm.trim_in_progress] == 0 &&
+ [R 0 asm.read_pending_trim_key $slot0_key] eq ""
+ } else {
+ fail "Trim did not complete"
+ }
+
+ # verify the trim events, use active trim since module is subscribed to trimmed event
+ set trim_event_log [list \
+ "sub: cluster-slot-migration-trim-started, slots:0-0" \
+ "keyspace: key_trimmed, key: $slot0_key" \
+ "sub: cluster-slot-migration-trim-completed, slots:0-0" \
+ ]
+ wait_for_condition 500 20 {
+ [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log &&
+ [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log
+ } else {
+ fail "ASM destination trim event not received"
+ }
+
+ # cleanup
+ failover_and_wait_for_done 0
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ clear_module_event_log
+ reset_default_trim_method
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test module replicates commands at the beginning of slot migration " {
+ R 0 flushall
+ R 1 flushall
+
+ # Sanity check
+ assert_equal 0 [R 1 asm.read_keyless_cmd_val]
+ assert_equal 0 [R 4 asm.read_keyless_cmd_val]
+
+ # Enable module command replication and set a key to be replicated
+ # Module will replicate two commands:
+ # 1- A keyless command: asm.keyless_cmd
+ # 2- SET command for the given key and value
+ set keyname [slot_key 0 modulekey]
+ R 0 asm.replicate_module_command 1 $keyname "value"
+
+ setup_slot_migration_with_delay 0 1 0 100
+ wait_for_asm_done
+ wait_for_ofs_sync [Rn 1] [Rn 4]
+
+ # Verify the commands are replicated
+ assert_equal 1 [R 1 asm.read_keyless_cmd_val]
+ assert_equal value [R 1 get $keyname]
+
+ # Verify the commands are replicated to replica
+ R 4 readonly
+ assert_equal 1 [R 4 asm.read_keyless_cmd_val]
+ assert_equal value [R 4 get $keyname]
+
+ # cleanup
+ R 0 asm.replicate_module_command 0 "" ""
+ R 0 CLUSTER MIGRATION IMPORT 0 100
+ wait_for_asm_done
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Test subcommand propagation during slot migration" {
+ R 0 flushall
+ R 1 flushall
+ set task_id [setup_slot_migration_with_delay 0 1 0 100]
+
+ set key [slot_key 0 mykey]
+ R 0 asm.parent set $key "value" ;# execute a module subcommand
+ wait_for_asm_done
+ assert_equal "value" [R 1 GET $key]
+
+ # cleanup
+ R 0 cluster migration import 0 100
+ wait_for_asm_done
+ }
+
+ test "Test trim method selection based on module keyspace subscription" {
+ R 0 debug asm-trim-method default
+ R 1 debug asm-trim-method default
+
+ R 0 flushall
+ R 1 flushall
+
+ populate_slot 10 -idx 0 -slot 0
+
+ # Make sure module is subscribed to NOTIFY_KEY_TRIMMED event. In this
+ # case, active trim must be used.
+ R 0 asm.subscribe_trimmed_event 1
+ set loglines [count_log_lines 0]
+ R 1 CLUSTER MIGRATION IMPORT 0 15
+ wait_for_asm_done
+ wait_for_log_messages 0 {"*Active trim scheduled for slots: 0-15*"} $loglines 1000 10
+
+ # Move slots back to node-0. Make sure module is not subscribed to
+ # NOTIFY_KEY_TRIMMED event. In this case, background trim must be used.
+ R 1 asm.subscribe_trimmed_event 0
+ set loglines [count_log_lines -1]
+ R 0 CLUSTER MIGRATION IMPORT 0 15
+ wait_for_asm_done
+ wait_for_log_messages -1 {"*Background trim started for slots: 0-15*"} $loglines 1000 10
+
+ # cleanup
+ wait_for_asm_done
+ R 0 asm.subscribe_trimmed_event 1
+ R 1 asm.subscribe_trimmed_event 1
+ R 0 flushall
+ R 1 flushall
+ }
+
+ test "Verify trimmed key value can be read in the server event callback" {
+ R 0 flushall
+ set key [slot_key 0]
+ set value "value123random"
+ R 0 set $key $value
+
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ wait_for_condition 1000 10 {
+ [R 0 asm.get_last_deleted_key] eq "keyevent: key: $key, value: $value"
+ } else {
+ fail "Last deleted key event not received"
+ }
+
+ # cleanup
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ }
+
+ test "Verify module cannot open a key in a slot that is being trimmed" {
+ R 0 flushall
+ R 0 debug asm-trim-method active -1 ;# disable active trim
+
+ set key [slot_key 0]
+ R 0 set $key value
+
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 1 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 0 cluster_slot_migration_active_trim_running] == 1
+ } else {
+ fail "migrate failed"
+ }
+
+ # We cannot open the key since it is in a slot being trimmed
+ assert_equal {} [R 0 asm.get $key]
+
+ # cleanup
+ R 0 debug asm-trim-method default
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ }
+
+ test "Test RM_ClusterGetLocalSlotRanges" {
+ assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461}}
+ assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461}}
+
+ R 0 cluster migration import 5463 6000
+ wait_for_asm_done
+ wait_for_cluster_propagation
+ assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}}
+ assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}}
+
+ R 0 cluster migration import 5462 5462 6001 10922
+ wait_for_asm_done
+ wait_for_cluster_propagation
+ assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 10922}}
+ assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 10922}}
+ assert_equal [R 1 asm.cluster_get_local_slot_ranges] {}
+ assert_equal [R 4 asm.cluster_get_local_slot_ranges] {}
+ }
+}
+
+set testmodule [file normalize tests/modules/atomicslotmigration.so]
+
+start_cluster 2 0 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no appendonly yes]] {
+ test "TRIMSLOTS in AOF will work synchronously on restart" {
+ # When TRIMSLOTS is replayed from AOF during restart, it must execute
+ # synchronously rather than using active trim. This prevents race
+ # conditions where subsequent AOF commands might operate on keys
+ # that should have been trimmed.
+
+ # Subscribe to key trimmed event to force active trim
+ R 0 asm.subscribe_trimmed_event 1
+ populate_slot 1000 -slot 0
+ populate_slot 1000 -slot 1
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+
+ # verify active trim is used
+ assert_equal 1 [CI 0 cluster_slot_migration_stats_active_trim_completed]
+
+ # restart server and verify aof is loaded
+ restart_server 0 yes no yes nosave
+ assert {[scan [regexp -inline {aof_current_size:([\d]*)} [R 0 info persistence]] aof_current_size=%d] > 0}
+ wait_for_cluster_state "ok"
+
+ # verify TRIMSLOTS in AOF is executed synchronously
+ assert_equal 0 [CI 0 cluster_slot_migration_stats_active_trim_completed]
+ assert_equal 1000 [R 0 dbsize]
+
+ # cleanup
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ assert_equal 2000 [R 0 dbsize]
+ R 0 flushall
+ R 1 flushall
+ clear_module_event_log
+
+ }
+
+ test "Test trim is disabled when module requests it" {
+ R 0 asm.disable_trim
+
+ set slot0_key [slot_key 0 mykey]
+ R 0 set $slot0_key "value"
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
+ wait_for_condition 1000 10 {
+ [string match {*completed*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not complete"
+ }
+ # since we disable trim, the key should still exist on source,
+ # we can read it with REDISMODULE_OPEN_KEY_ACCESS_TRIMMED flag
+ assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key]
+ assert_equal 1 [R 0 asm.trim_in_progress]
+
+ # enable trim and verify the key is trimmed
+ R 0 asm.enable_trim
+ wait_for_condition 1000 10 {
+ [R 0 asm.read_pending_trim_key $slot0_key] eq "" &&
+ [R 0 asm.trim_in_progress] == 0
+ } else {
+ fail "Trim did not complete"
+ }
+ wait_for_asm_done
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_asm_done
+ clear_module_event_log
+ }
+
+ test "Can not start new asm task when trim is not allowed" {
+ # start a migration task, wait it completed but not allow to trim slots
+ R 0 asm.disable_trim
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
+ wait_for_condition 1000 10 {
+ [string match {*completed*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not complete"
+ }
+ # Can not start new migrating task since trim is disabled
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 1 1]
+ wait_for_condition 1000 10 {
+ [string match {*fail*} [migration_status 1 $task_id state]] &&
+ [string match {*Trim is disabled by module*} [migration_status 1 $task_id last_error]]
+ } else {
+ fail "ASM task did not fail"
+ }
+ R 0 asm.enable_trim
+ wait_for_asm_done
+
+ # start a migration task, wait it completed but not allow to trim slots
+ R 0 asm.disable_trim
+ set task_id [R 1 CLUSTER MIGRATION IMPORT 2 2]
+ wait_for_condition 1000 10 {
+ [string match {*completed*} [migration_status 0 $task_id state]]
+ } else {
+ fail "ASM task did not complete"
+ }
+ set logline [count_log_lines 0]
+ # Can not start new importing task since trim is disabled
+ set task_id [R 0 CLUSTER MIGRATION IMPORT 0 1]
+ wait_for_log_messages 0 {"*Can not start import task*trim is disabled by module*"} $logline 1000 10
+ R 0 asm.enable_trim
+ wait_for_asm_done
+ }
+}
+
+start_server {tags "cluster external:skip"} {
+ test "Test RM_ClusterGetLocalSlotRanges without cluster" {
+ r module load $testmodule
+ assert_equal [r asm.cluster_get_local_slot_ranges] {{0 16383}}
+ }
+}
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/cli.tcl b/examples/redis-unstable/tests/unit/cluster/cli.tcl
new file mode 100644
index 0000000..ce4629e
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/cli.tcl
@@ -0,0 +1,415 @@
+# Primitive tests on cluster-enabled redis using redis-cli
+
+source tests/support/cli.tcl
+
+# make sure the test infra won't use SELECT
+set old_singledb $::singledb
+set ::singledb 1
+
+# cluster creation is complicated with TLS, and the current tests don't really need that coverage
+tags {tls:skip external:skip cluster} {
+
+# start three servers
+set base_conf [list cluster-enabled yes cluster-node-timeout 1000]
+start_multiple_servers 3 [list overrides $base_conf] {
+
+ set node1 [srv 0 client]
+ set node2 [srv -1 client]
+ set node3 [srv -2 client]
+ set node3_pid [srv -2 pid]
+ set node3_rd [redis_deferring_client -2]
+
+ test {Create 3 node cluster} {
+ exec src/redis-cli --cluster-yes --cluster create \
+ 127.0.0.1:[srv 0 port] \
+ 127.0.0.1:[srv -1 port] \
+ 127.0.0.1:[srv -2 port]
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+ }
+
+ test "Run blocking command on cluster node3" {
+ # key9184688 is mapped to slot 10923 (first slot of node 3)
+ $node3_rd brpop key9184688 0
+ $node3_rd flush
+
+ wait_for_condition 50 100 {
+ [s -2 blocked_clients] eq {1}
+ } else {
+ fail "Client not blocked"
+ }
+ }
+
+ test "Perform a Resharding" {
+ exec src/redis-cli --cluster-yes --cluster reshard 127.0.0.1:[srv -2 port] \
+ --cluster-to [$node1 cluster myid] \
+ --cluster-from [$node3 cluster myid] \
+ --cluster-slots 1
+ }
+
+ test "Verify command got unblocked after resharding" {
+ # this (read) will wait for the node3 to realize the new topology
+ assert_error {*MOVED*} {$node3_rd read}
+
+ # verify there are no blocked clients
+ assert_equal [s 0 blocked_clients] {0}
+ assert_equal [s -1 blocked_clients] {0}
+ assert_equal [s -2 blocked_clients] {0}
+ }
+
+ test "Wait for cluster to be stable" {
+ # Cluster check just verifies the config state is self-consistent,
+ # waiting for cluster_state to be okay is an independent check that all the
+ # nodes actually believe each other are healthy, prevent cluster down error.
+ wait_for_condition 1000 50 {
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv 0 port]}] == 0 &&
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -1 port]}] == 0 &&
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -2 port]}] == 0 &&
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+ }
+
+ set node1_rd [redis_deferring_client 0]
+
+ test "use previous hostip in \"cluster-preferred-endpoint-type unknown-endpoint\" mode" {
+
+ # backup and set cluster-preferred-endpoint-type unknown-endpoint
+ set endpoint_type_before_set [lindex [split [$node1 CONFIG GET cluster-preferred-endpoint-type] " "] 1]
+ $node1 CONFIG SET cluster-preferred-endpoint-type unknown-endpoint
+
+ # when redis-cli not in cluster mode, return MOVE with empty host
+ set slot_for_foo [$node1 CLUSTER KEYSLOT foo]
+ assert_error "*MOVED $slot_for_foo :*" {$node1 set foo bar}
+
+ # when in cluster mode, redirect using previous hostip
+ assert_equal "[exec src/redis-cli -h 127.0.0.1 -p [srv 0 port] -c set foo bar]" {OK}
+ assert_match "[exec src/redis-cli -h 127.0.0.1 -p [srv 0 port] -c get foo]" {bar}
+
+ assert_equal [$node1 CONFIG SET cluster-preferred-endpoint-type "$endpoint_type_before_set"] {OK}
+ }
+
+ test "Sanity test push cmd after resharding" {
+ assert_error {*MOVED*} {$node3 lpush key9184688 v1}
+
+ $node1_rd brpop key9184688 0
+ $node1_rd flush
+
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ puts "Client not blocked"
+ puts "read from blocked client: [$node1_rd read]"
+ fail "Client not blocked"
+ }
+
+ $node1 lpush key9184688 v2
+ assert_equal {key9184688 v2} [$node1_rd read]
+ }
+
+ $node3_rd close
+
+ test "Run blocking command again on cluster node1" {
+ $node1 del key9184688
+ # key9184688 is mapped to slot 10923 which has been moved to node1
+ $node1_rd brpop key9184688 0
+ $node1_rd flush
+
+ wait_for_condition 50 100 {
+ [s 0 blocked_clients] eq {1}
+ } else {
+ fail "Client not blocked"
+ }
+ }
+
+ test "Kill a cluster node and wait for fail state" {
+ # kill node3 in cluster
+ pause_process $node3_pid
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {fail} &&
+ [CI 1 cluster_state] eq {fail}
+ } else {
+ fail "Cluster doesn't fail"
+ }
+ }
+
+ test "Verify command got unblocked after cluster failure" {
+ assert_error {*CLUSTERDOWN*} {$node1_rd read}
+
+ # verify there are no blocked clients
+ assert_equal [s 0 blocked_clients] {0}
+ assert_equal [s -1 blocked_clients] {0}
+ }
+
+ resume_process $node3_pid
+ $node1_rd close
+
+} ;# stop servers
+
+# Test redis-cli -- cluster create, add-node, call.
+# Test that functions are propagated on add-node
+start_multiple_servers 5 [list overrides $base_conf] {
+
+ set node4_rd [redis_client -3]
+ set node5_rd [redis_client -4]
+
+ test {Functions are added to new node on redis-cli cluster add-node} {
+ exec src/redis-cli --cluster-yes --cluster create \
+ 127.0.0.1:[srv 0 port] \
+ 127.0.0.1:[srv -1 port] \
+ 127.0.0.1:[srv -2 port]
+
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # upload a function to all the cluster
+ exec src/redis-cli --cluster-yes --cluster call 127.0.0.1:[srv 0 port] \
+ FUNCTION LOAD {#!lua name=TEST
+ redis.register_function('test', function() return 'hello' end)
+ }
+
+ # adding node to the cluster
+ exec src/redis-cli --cluster-yes --cluster add-node \
+ 127.0.0.1:[srv -3 port] \
+ 127.0.0.1:[srv 0 port]
+
+ wait_for_cluster_size 4
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok} &&
+ [CI 3 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # make sure 'test' function was added to the new node
+ assert_equal {{library_name TEST engine LUA functions {{name test description {} flags {}}}}} [$node4_rd FUNCTION LIST]
+
+ # add function to node 5
+ assert_equal {TEST} [$node5_rd FUNCTION LOAD {#!lua name=TEST
+ redis.register_function('test', function() return 'hello' end)
+ }]
+
+ # make sure functions was added to node 5
+ assert_equal {{library_name TEST engine LUA functions {{name test description {} flags {}}}}} [$node5_rd FUNCTION LIST]
+
+ # adding node 5 to the cluster should failed because it already contains the 'test' function
+ catch {
+ exec src/redis-cli --cluster-yes --cluster add-node \
+ 127.0.0.1:[srv -4 port] \
+ 127.0.0.1:[srv 0 port]
+ } e
+ assert_match {*node already contains functions*} $e
+ }
+} ;# stop servers
+
+# Test redis-cli --cluster create, add-node.
+# Test that one slot can be migrated to and then away from the new node.
+test {Migrate the last slot away from a node using redis-cli} {
+ start_multiple_servers 4 [list overrides $base_conf] {
+
+ # Create a cluster of 3 nodes
+ exec src/redis-cli --cluster-yes --cluster create \
+ 127.0.0.1:[srv 0 port] \
+ 127.0.0.1:[srv -1 port] \
+ 127.0.0.1:[srv -2 port]
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Insert some data
+ assert_equal OK [exec src/redis-cli -c -p [srv 0 port] SET foo bar]
+ set slot [exec src/redis-cli -c -p [srv 0 port] CLUSTER KEYSLOT foo]
+
+ # Add new node to the cluster
+ exec src/redis-cli --cluster-yes --cluster add-node \
+ 127.0.0.1:[srv -3 port] \
+ 127.0.0.1:[srv 0 port]
+
+ # First we wait for new node to be recognized by entire cluster
+ wait_for_cluster_size 4
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok} &&
+ [CI 3 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ set newnode_r [redis_client -3]
+ set newnode_id [$newnode_r CLUSTER MYID]
+
+ # Find out which node has the key "foo" by asking the new node for a
+ # redirect.
+ catch { $newnode_r get foo } e
+ assert_match "MOVED $slot *" $e
+ lassign [split [lindex $e 2] :] owner_host owner_port
+ set owner_r [redis $owner_host $owner_port 0 $::tls]
+ set owner_id [$owner_r CLUSTER MYID]
+
+ # Move slot to new node using plain Redis commands
+ assert_equal OK [$newnode_r CLUSTER SETSLOT $slot IMPORTING $owner_id]
+ assert_equal OK [$owner_r CLUSTER SETSLOT $slot MIGRATING $newnode_id]
+ assert_equal {foo} [$owner_r CLUSTER GETKEYSINSLOT $slot 10]
+ assert_equal OK [$owner_r MIGRATE 127.0.0.1 [srv -3 port] "" 0 5000 KEYS foo]
+ assert_equal OK [$newnode_r CLUSTER SETSLOT $slot NODE $newnode_id]
+ assert_equal OK [$owner_r CLUSTER SETSLOT $slot NODE $newnode_id]
+
+ # Using --cluster check make sure we won't get `Not all slots are covered by nodes`.
+ # Wait for the cluster to become stable make sure the cluster is up during MIGRATE.
+ wait_for_condition 1000 50 {
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv 0 port]}] == 0 &&
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -1 port]}] == 0 &&
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -2 port]}] == 0 &&
+ [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -3 port]}] == 0 &&
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok} &&
+ [CI 3 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Move the only slot back to original node using redis-cli
+ exec src/redis-cli --cluster reshard 127.0.0.1:[srv -3 port] \
+ --cluster-from $newnode_id \
+ --cluster-to $owner_id \
+ --cluster-slots 1 \
+ --cluster-yes
+
+ # The empty node will become a replica of the new owner before the
+ # `MOVED` check, so let's wait for the cluster to become stable.
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok} &&
+ [CI 3 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Check that the key foo has been migrated back to the original owner.
+ catch { $newnode_r get foo } e
+ assert_equal "MOVED $slot $owner_host:$owner_port" $e
+
+ # Check that the empty node has turned itself into a replica of the new
+ # owner and that the new owner knows that.
+ wait_for_condition 1000 50 {
+ [string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]]
+ } else {
+ fail "Empty node didn't turn itself into a replica."
+ }
+ }
+}
+
+foreach ip_or_localhost {127.0.0.1 localhost} {
+
+# Test redis-cli --cluster create, add-node with cluster-port.
+# Create five nodes, three with custom cluster_port and two with default values.
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] {
+start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] {
+
+ # The first three are used to test --cluster create.
+ # The last two are used to test --cluster add-node
+
+ test "redis-cli -4 --cluster create using $ip_or_localhost with cluster-port" {
+ exec src/redis-cli -4 --cluster-yes --cluster create \
+ $ip_or_localhost:[srv 0 port] \
+ $ip_or_localhost:[srv -1 port] \
+ $ip_or_localhost:[srv -2 port]
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Make sure each node can meet other nodes
+ assert_equal 3 [CI 0 cluster_known_nodes]
+ assert_equal 3 [CI 1 cluster_known_nodes]
+ assert_equal 3 [CI 2 cluster_known_nodes]
+ }
+
+ test "redis-cli -4 --cluster add-node using $ip_or_localhost with cluster-port" {
+ # Adding node to the cluster (without cluster-port)
+ exec src/redis-cli -4 --cluster-yes --cluster add-node \
+ $ip_or_localhost:[srv -3 port] \
+ $ip_or_localhost:[srv 0 port]
+
+ wait_for_cluster_size 4
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok} &&
+ [CI 3 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Adding node to the cluster (with cluster-port)
+ exec src/redis-cli -4 --cluster-yes --cluster add-node \
+ $ip_or_localhost:[srv -4 port] \
+ $ip_or_localhost:[srv 0 port]
+
+ wait_for_cluster_size 5
+
+ wait_for_condition 1000 50 {
+ [CI 0 cluster_state] eq {ok} &&
+ [CI 1 cluster_state] eq {ok} &&
+ [CI 2 cluster_state] eq {ok} &&
+ [CI 3 cluster_state] eq {ok} &&
+ [CI 4 cluster_state] eq {ok}
+ } else {
+ fail "Cluster doesn't stabilize"
+ }
+
+ # Make sure each node can meet other nodes
+ assert_equal 5 [CI 0 cluster_known_nodes]
+ assert_equal 5 [CI 1 cluster_known_nodes]
+ assert_equal 5 [CI 2 cluster_known_nodes]
+ assert_equal 5 [CI 3 cluster_known_nodes]
+ assert_equal 5 [CI 4 cluster_known_nodes]
+ }
+# stop 5 servers
+}
+}
+}
+}
+}
+
+} ;# foreach ip_or_localhost
+
+} ;# tags
+
+set ::singledb $old_singledb
diff --git a/examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl b/examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl
new file mode 100644
index 0000000..a099fa7
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl
@@ -0,0 +1,110 @@
+source tests/support/cluster.tcl
+
+proc get_port_from_moved_error {e} {
+ set ip_port [lindex [split $e " "] 2]
+ return [lindex [split $ip_port ":"] 1]
+}
+
+proc get_pport_by_port {port} {
+ foreach srv $::servers {
+ set srv_port [dict get $srv port]
+ if {$port == $srv_port} {
+ return [dict get $srv pport]
+ }
+ }
+ return 0
+}
+
+proc get_port_from_node_info {line} {
+ set fields [split $line " "]
+ set addr [lindex $fields 1]
+ set ip_port [lindex [split $addr "@"] 0]
+ return [lindex [split $ip_port ":"] 1]
+}
+
+proc cluster_response_tls {tls_cluster} {
+
+ test "CLUSTER SLOTS with different connection type -- tls-cluster $tls_cluster" {
+ set slots1 [R 0 cluster slots]
+ set pport [srv 0 pport]
+ set cluster_client [redis_cluster 127.0.0.1:$pport 0]
+ set slots2 [$cluster_client cluster slots]
+ $cluster_client close
+ # Compare the ports in the first row
+ assert_no_match [lindex $slots1 0 2 1] [lindex $slots2 0 2 1]
+ }
+
+ test "CLUSTER NODES return port according to connection type -- tls-cluster $tls_cluster" {
+ set nodes [R 0 cluster nodes]
+ set port1 [get_port_from_node_info [lindex [split $nodes "\r\n"] 0]]
+ set pport [srv 0 pport]
+ set cluster_client [redis_cluster 127.0.0.1:$pport 0]
+ set nodes [$cluster_client cluster nodes]
+ set port2 [get_port_from_node_info [lindex [split $nodes "\r\n"] 0]]
+ $cluster_client close
+ assert_not_equal $port1 $port2
+ }
+
+ set cluster [redis_cluster 127.0.0.1:[srv 0 port]]
+ set cluster_pport [redis_cluster 127.0.0.1:[srv 0 pport] 0]
+ $cluster refresh_nodes_map
+
+ test "Set many keys in the cluster -- tls-cluster $tls_cluster" {
+ for {set i 0} {$i < 5000} {incr i} {
+ $cluster set $i $i
+ assert { [$cluster get $i] eq $i }
+ }
+ }
+
+ test "Test cluster responses during migration of slot x -- tls-cluster $tls_cluster" {
+ set slot 10
+ array set nodefrom [$cluster masternode_for_slot $slot]
+ array set nodeto [$cluster masternode_notfor_slot $slot]
+ $nodeto(link) cluster setslot $slot importing $nodefrom(id)
+ $nodefrom(link) cluster setslot $slot migrating $nodeto(id)
+
+ # Get a key from that slot
+ set key [$nodefrom(link) cluster GETKEYSINSLOT $slot "1"]
+ # MOVED REPLY
+ catch {$nodeto(link) set $key "newVal"} e_moved1
+ assert_match "*MOVED*" $e_moved1
+ # ASK REPLY
+ catch {$nodefrom(link) set "abc{$key}" "newVal"} e_ask1
+ assert_match "*ASK*" $e_ask1
+
+ # UNSTABLE REPLY
+ assert_error "*TRYAGAIN*" {$nodefrom(link) mset "a{$key}" "newVal" $key "newVal2"}
+
+ # Connecting using another protocol
+ array set nodefrom_pport [$cluster_pport masternode_for_slot $slot]
+ array set nodeto_pport [$cluster_pport masternode_notfor_slot $slot]
+
+ # MOVED REPLY
+ catch {$nodeto_pport(link) set $key "newVal"} e_moved2
+ assert_match "*MOVED*" $e_moved2
+ # ASK REPLY
+ catch {$nodefrom_pport(link) set "abc{$key}" "newVal"} e_ask2
+ assert_match "*ASK*" $e_ask2
+ # Compare MOVED error's port
+ set port1 [get_port_from_moved_error $e_moved1]
+ set port2 [get_port_from_moved_error $e_moved2]
+ assert_not_equal $port1 $port2
+ assert_equal $port1 $nodefrom(port)
+ assert_equal $port2 [get_pport_by_port $nodefrom(port)]
+ # Compare ASK error's port
+ set port1 [get_port_from_moved_error $e_ask1]
+ set port2 [get_port_from_moved_error $e_ask2]
+ assert_not_equal $port1 $port2
+ assert_equal $port1 $nodeto(port)
+ assert_equal $port2 [get_pport_by_port $nodeto(port)]
+ }
+}
+
+if {$::tls} {
+ start_cluster 3 3 {tags {external:skip cluster tls} overrides {tls-cluster yes tls-replication yes}} {
+ cluster_response_tls yes
+ }
+ start_cluster 3 3 {tags {external:skip cluster tls} overrides {tls-cluster no tls-replication no}} {
+ cluster_response_tls no
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/failure-marking.tcl b/examples/redis-unstable/tests/unit/cluster/failure-marking.tcl
new file mode 100644
index 0000000..c4746c8
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/failure-marking.tcl
@@ -0,0 +1,53 @@
+# Test a single primary can mark replica as `fail`
+start_cluster 1 1 {tags {external:skip cluster}} {
+
+ test "Verify that single primary marks replica as failed" {
+ set primary [srv -0 client]
+
+ set replica1 [srv -1 client]
+ set replica1_pid [srv -1 pid]
+ set replica1_instance_id [dict get [cluster_get_myself 1] id]
+
+ assert {[lindex [$primary role] 0] eq {master}}
+ assert {[lindex [$replica1 role] 0] eq {slave}}
+
+ wait_for_sync $replica1
+
+ pause_process $replica1_pid
+
+ wait_node_marked_fail 0 $replica1_instance_id
+ }
+}
+
+# Test multiple primaries wait for a quorum and then mark a replica as `fail`
+start_cluster 2 1 {tags {external:skip cluster}} {
+
+ test "Verify that multiple primaries mark replica as failed" {
+ set primary1 [srv -0 client]
+
+ set primary2 [srv -1 client]
+ set primary2_pid [srv -1 pid]
+
+ set replica1 [srv -2 client]
+ set replica1_pid [srv -2 pid]
+ set replica1_instance_id [dict get [cluster_get_myself 2] id]
+
+ assert {[lindex [$primary1 role] 0] eq {master}}
+ assert {[lindex [$primary2 role] 0] eq {master}}
+ assert {[lindex [$replica1 role] 0] eq {slave}}
+
+ wait_for_sync $replica1
+
+ pause_process $replica1_pid
+
+ # Pause other primary to allow time for pfail flag to appear
+ pause_process $primary2_pid
+
+ wait_node_marked_pfail 0 $replica1_instance_id
+
+ # Resume other primary and wait for to show replica as failed
+ resume_process $primary2_pid
+
+ wait_node_marked_fail 0 $replica1_instance_id
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/hostnames.tcl b/examples/redis-unstable/tests/unit/cluster/hostnames.tcl
new file mode 100644
index 0000000..2236228
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/hostnames.tcl
@@ -0,0 +1,230 @@
+#
+# Copyright (c) 2009-Present, Redis Ltd.
+# All rights reserved.
+#
+# Copyright (c) 2024-present, Valkey contributors.
+# All rights reserved.
+#
+# Licensed under your choice of (a) the Redis Source Available License 2.0
+# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+# GNU Affero General Public License v3 (AGPLv3).
+#
+# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+#
+
+proc get_slot_field {slot_output shard_id node_id attrib_id} {
+ return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
+}
+
+# Start a cluster with 3 masters and 4 replicas.
+# These tests rely on specific node ordering, so make sure no node fails over.
+start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-replica-no-failover yes}} {
+test "Set cluster hostnames and verify they are propagated" {
+ for {set j 0} {$j < [llength $::servers]} {incr j} {
+ R $j config set cluster-announce-hostname "host-$j.com"
+ }
+
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated "host-*.com"] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+
+ # Now that everything is propagated, assert everyone agrees
+ wait_for_cluster_propagation
+}
+
+test "Update hostnames and make sure they are all eventually propagated" {
+ for {set j 0} {$j < [llength $::servers]} {incr j} {
+ R $j config set cluster-announce-hostname "host-updated-$j.com"
+ }
+
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated "host-updated-*.com"] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+
+ # Now that everything is propagated, assert everyone agrees
+ wait_for_cluster_propagation
+}
+
+test "Remove hostnames and make sure they are all eventually propagated" {
+ for {set j 0} {$j < [llength $::servers]} {incr j} {
+ R $j config set cluster-announce-hostname ""
+ }
+
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated ""] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+
+ # Now that everything is propagated, assert everyone agrees
+ wait_for_cluster_propagation
+}
+
+test "Verify cluster-preferred-endpoint-type behavior for redirects and info" {
+ R 0 config set cluster-announce-hostname "me.com"
+ R 1 config set cluster-announce-hostname ""
+ R 2 config set cluster-announce-hostname "them.com"
+
+ wait_for_cluster_propagation
+
+ # Verify default behavior
+ set slot_result [R 0 cluster slots]
+ assert_equal "" [lindex [get_slot_field $slot_result 0 2 0] 1]
+ assert_equal "" [lindex [get_slot_field $slot_result 2 2 0] 1]
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 0]
+ assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 1]
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 0]
+ assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 1]
+
+ # Redirect will use the IP address
+ catch {R 0 set foo foo} redir_err
+ assert_match "MOVED * 127.0.0.1:*" $redir_err
+
+ # Verify prefer hostname behavior
+ R 0 config set cluster-preferred-endpoint-type hostname
+
+ set slot_result [R 0 cluster slots]
+ assert_equal "me.com" [get_slot_field $slot_result 0 2 0]
+ assert_equal "them.com" [get_slot_field $slot_result 2 2 0]
+
+ # Redirect should use hostname
+ catch {R 0 set foo foo} redir_err
+ assert_match "MOVED * them.com:*" $redir_err
+
+ # Redirect to an unknown hostname returns ?
+ catch {R 0 set barfoo bar} redir_err
+ assert_match "MOVED * ?:*" $redir_err
+
+ # Verify unknown hostname behavior
+ R 0 config set cluster-preferred-endpoint-type unknown-endpoint
+
+ # Verify default behavior
+ set slot_result [R 0 cluster slots]
+ assert_equal "ip" [lindex [get_slot_field $slot_result 0 2 3] 0]
+ assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 0 2 3] 1]
+ assert_equal "ip" [lindex [get_slot_field $slot_result 2 2 3] 0]
+ assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 2 2 3] 1]
+ assert_equal "ip" [lindex [get_slot_field $slot_result 1 2 3] 0]
+ assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 1 2 3] 1]
+ # Not required by the protocol, but IP comes before hostname
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 2]
+ assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 3]
+ assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 2]
+ assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 3]
+
+ # This node doesn't have a hostname
+ assert_equal 2 [llength [get_slot_field $slot_result 1 2 3]]
+
+ # Redirect should use empty string
+ catch {R 0 set foo foo} redir_err
+ assert_match "MOVED * :*" $redir_err
+
+ R 0 config set cluster-preferred-endpoint-type ip
+}
+
+test "Verify the nodes configured with prefer hostname only show hostname for new nodes" {
+ # Have everyone forget node 6 and isolate it from the cluster.
+ isolate_node 6
+
+ set primaries 3
+ for {set j 0} {$j < $primaries} {incr j} {
+ # Set hostnames for the masters, now that the node is isolated
+ R $j config set cluster-announce-hostname "shard-$j.com"
+ }
+
+ # Prevent Node 0 and Node 6 from properly meeting,
+ # they'll hang in the handshake phase. This allows us to
+ # test the case where we "know" about it but haven't
+ # successfully retrieved information about it yet.
+ R 0 DEBUG DROP-CLUSTER-PACKET-FILTER 0
+ R 6 DEBUG DROP-CLUSTER-PACKET-FILTER 0
+
+ # Have a replica meet the isolated node
+ R 3 cluster meet 127.0.0.1 [srv -6 port]
+
+ # Wait for the isolated node to learn about the rest of the cluster,
+ # which correspond to a single entry in cluster nodes. Note this
+ # doesn't mean the isolated node has successfully contacted each
+ # node.
+ wait_for_condition 50 100 {
+ [llength [split [R 6 CLUSTER NODES] "\n"]] eq [expr [llength $::servers] + 1]
+ } else {
+ fail "Isolated node didn't learn about the rest of the cluster *"
+ }
+
+ # Now, we wait until the two nodes that aren't filtering packets
+ # to accept our isolated nodes connections. At this point they will
+ # start showing up in cluster slots.
+ wait_for_condition 50 100 {
+ [llength [R 6 CLUSTER SLOTS]] eq 2
+ } else {
+ fail "Node did not learn about the 2 shards it can talk to"
+ }
+ wait_for_condition 50 100 {
+ [lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
+ } else {
+ fail "hostname for shard-1 didn't reach node 6"
+ }
+
+ wait_for_condition 50 100 {
+ [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
+ } else {
+ fail "hostname for shard-2 didn't reach node 6"
+ }
+
+ # Also make sure we know about the isolated master, we
+ # just can't reach it.
+ set master_id [R 0 CLUSTER MYID]
+ assert_match "*$master_id*" [R 6 CLUSTER NODES]
+
+ # Stop dropping cluster packets, and make sure everything
+ # stabilizes
+ R 0 DEBUG DROP-CLUSTER-PACKET-FILTER -1
+ R 6 DEBUG DROP-CLUSTER-PACKET-FILTER -1
+
+ # This operation sometimes spikes to around 5 seconds to resolve the state,
+ # so it has a higher timeout.
+ wait_for_condition 50 500 {
+ [llength [R 6 CLUSTER SLOTS]] eq 3
+ } else {
+ fail "Node did not learn about the 2 shards it can talk to"
+ }
+
+ for {set j 0} {$j < $primaries} {incr j} {
+ wait_for_condition 50 100 {
+ [lindex [get_slot_field [R 6 CLUSTER SLOTS] $j 2 3] 1] eq "shard-$j.com"
+ } else {
+ fail "hostname information for shard-$j didn't reach node 6"
+ }
+ }
+}
+
+test "Test restart will keep hostname information" {
+ # Set a new hostname, reboot and make sure it sticks
+ R 0 config set cluster-announce-hostname "restart-1.com"
+
+ # Store the hostname in the config
+ R 0 config rewrite
+
+ restart_server 0 true false
+ set slot_result [R 0 CLUSTER SLOTS]
+ assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "restart-1.com"
+
+ # As a sanity check, make sure everyone eventually agrees
+ wait_for_cluster_propagation
+}
+
+test "Test hostname validation" {
+ catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err
+ assert_match "*Hostnames must be less than 256 characters*" $err
+ catch {R 0 config set cluster-announce-hostname "?.com"} err
+ assert_match "*Hostnames may only contain alphanumeric characters, hyphens or dots*" $err
+
+ # Note this isn't a valid hostname, but it passes our internal validation
+ R 0 config set cluster-announce-hostname "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-."
+}
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl b/examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl
new file mode 100644
index 0000000..a595ca6
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl
@@ -0,0 +1,29 @@
+# Check if cluster's view of human announced nodename is reported in logs
+start_cluster 3 0 {tags {external:skip cluster}} {
+ test "Set cluster human announced nodename and let it propagate" {
+ for {set j 0} {$j < [llength $::servers]} {incr j} {
+ R $j config set cluster-announce-hostname "host-$j.com"
+ R $j config set cluster-announce-human-nodename "nodename-$j"
+ }
+
+ # We wait for everyone to agree on the hostnames. Since they are gossiped
+ # the same way as nodenames, it implies everyone knows the nodenames too.
+ wait_for_condition 50 100 {
+ [are_hostnames_propagated "host-*.com"] eq 1
+ } else {
+ fail "cluster hostnames were not propagated"
+ }
+ }
+
+ test "Human nodenames are visible in log messages" {
+ # Pause instance 0, so everyone thinks it is dead
+ pause_process [srv 0 pid]
+
+ # We're going to use a message we will know will be sent, node unreachable,
+ # since it includes the other node gossiping.
+ wait_for_log_messages -1 {"*Node * (nodename-2) reported node * (nodename-0) as not reachable*"} 0 20 500
+ wait_for_log_messages -2 {"*Node * (nodename-1) reported node * (nodename-0) as not reachable*"} 0 20 500
+
+ resume_process [srv 0 pid]
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/internal-secret.tcl b/examples/redis-unstable/tests/unit/cluster/internal-secret.tcl
new file mode 100644
index 0000000..f310b74
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/internal-secret.tcl
@@ -0,0 +1,71 @@
+proc num_unique_secrets {num_nodes} {
+ set secrets [list]
+ for {set i 0} {$i < $num_nodes} {incr i} {
+ lappend secrets [R $i debug internal_secret]
+ }
+ set num_secrets [llength [lsort -unique $secrets]]
+ return $num_secrets
+}
+
+proc wait_for_secret_sync {maxtries delay num_nodes} {
+ wait_for_condition $maxtries $delay {
+ [num_unique_secrets $num_nodes] eq 1
+ } else {
+ fail "Failed waiting for secrets to sync"
+ }
+}
+
+start_cluster 3 3 {tags {external:skip cluster}} {
+ test "Test internal secret sync" {
+ wait_for_secret_sync 50 100 6
+ }
+
+
+ set first_shard_host [srv 0 host]
+ set first_shard_port [srv 0 port]
+
+ if {$::verbose} {
+ puts {cluster internal secret:}
+ puts [R 1 debug internal_secret]
+ }
+
+ test "Join a node to the cluster and make sure it gets the same secret" {
+ start_server {tags {"external:skip"} overrides {cluster-enabled {yes}}} {
+ r cluster meet $first_shard_host $first_shard_port
+ wait_for_condition 50 100 {
+ [r debug internal_secret] eq [R 1 debug internal_secret]
+ } else {
+ puts [r debug internal_secret]
+ puts [R 1 debug internal_secret]
+ fail "Secrets not match"
+ }
+ }
+ }
+
+ test "Join another cluster, make sure clusters sync on the internal secret" {
+ start_server {tags {"external:skip"} overrides {cluster-enabled {yes}}} {
+ set new_shard_host [srv 0 host]
+ set new_shard_port [srv 0 port]
+ start_server {tags {"external:skip"} overrides {cluster-enabled {yes}}} {
+ r cluster meet $new_shard_host $new_shard_port
+ wait_for_condition 50 100 {
+ [r debug internal_secret] eq [r -1 debug internal_secret]
+ } else {
+ puts [r debug internal_secret]
+ puts [r -1 debug internal_secret]
+ fail "Secrets not match"
+ }
+ if {$::verbose} {
+ puts {new cluster internal secret:}
+ puts [r -1 debug internal_secret]
+ }
+ r cluster meet $first_shard_host $first_shard_port
+ wait_for_secret_sync 50 100 8
+ if {$::verbose} {
+ puts {internal secret after join to bigger cluster:}
+ puts [r -1 debug internal_secret]
+ }
+ }
+ }
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/links.tcl b/examples/redis-unstable/tests/unit/cluster/links.tcl
new file mode 100644
index 0000000..a202c37
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/links.tcl
@@ -0,0 +1,292 @@
+proc get_links_with_peer {this_instance_id peer_nodename} {
+ set links [R $this_instance_id cluster links]
+ set links_with_peer {}
+ foreach l $links {
+ if {[dict get $l node] eq $peer_nodename} {
+ lappend links_with_peer $l
+ }
+ }
+ return $links_with_peer
+}
+
+# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
+# corresponds to the link established toward a peer identified by `peer_nodename`
+proc get_link_to_peer {this_instance_id peer_nodename} {
+ set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
+ foreach l $links_with_peer {
+ if {[dict get $l direction] eq "to"} {
+ return $l
+ }
+ }
+ return {}
+}
+
+# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
+# corresponds to the link accepted from a peer identified by `peer_nodename`
+proc get_link_from_peer {this_instance_id peer_nodename} {
+ set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
+ foreach l $links_with_peer {
+ if {[dict get $l direction] eq "from"} {
+ return $l
+ }
+ }
+ return {}
+}
+
+# Reset cluster links to their original state
+proc reset_links {id} {
+ set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1]
+
+ # Set a 1 byte limit and wait for cluster cron to run
+ # (executes every 100ms) and terminate links
+ R $id CONFIG SET cluster-link-sendbuf-limit 1
+ after 150
+
+ # Reset limit
+ R $id CONFIG SET cluster-link-sendbuf-limit $limit
+
+ # Wait until the cluster links come back up for each node
+ wait_for_condition 50 100 {
+ [number_of_links $id] == [expr [number_of_peers $id] * 2]
+ } else {
+ fail "Cluster links did not come back up"
+ }
+}
+
+proc number_of_peers {id} {
+ expr [llength $::servers] - 1
+}
+
+proc number_of_links {id} {
+ llength [R $id cluster links]
+}
+
+proc publish_messages {server num_msgs msg_size} {
+ for {set i 0} {$i < $num_msgs} {incr i} {
+ $server PUBLISH channel [string repeat "x" $msg_size]
+ }
+}
+
+start_cluster 1 2 {tags {external:skip cluster}} {
+ set primary_id 0
+ set replica1_id 1
+
+ set primary [Rn $primary_id]
+ set replica1 [Rn $replica1_id]
+
+ test "Broadcast message across a cluster shard while a cluster link is down" {
+ set replica1_node_id [$replica1 CLUSTER MYID]
+
+ set channelname ch3
+
+ # subscribe on replica1
+ set subscribeclient1 [redis_deferring_client -1]
+ $subscribeclient1 deferred 1
+ $subscribeclient1 SSUBSCRIBE $channelname
+ $subscribeclient1 read
+
+ # subscribe on replica2
+ set subscribeclient2 [redis_deferring_client -2]
+ $subscribeclient2 deferred 1
+ $subscribeclient2 SSUBSCRIBE $channelname
+ $subscribeclient2 read
+
+ # Verify number of links with cluster stable state
+ assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id]
+
+ # Disconnect the cluster between primary and replica1 and publish a message.
+ $primary MULTI
+ $primary DEBUG CLUSTERLINK KILL TO $replica1_node_id
+ $primary SPUBLISH $channelname hello
+ set res [$primary EXEC]
+
+ # Verify no client exists on the primary to receive the published message.
+ assert_equal $res {OK 0}
+
+ # Wait for all the cluster links are healthy
+ wait_for_condition 50 100 {
+ [number_of_peers $primary_id]*2 == [number_of_links $primary_id]
+ } else {
+ fail "All peer links couldn't be established"
+ }
+
+ # Publish a message afterwards.
+ $primary SPUBLISH $channelname world
+
+ # Verify replica1 has received only (world) / hello is lost.
+ assert_equal "smessage ch3 world" [$subscribeclient1 read]
+
+ # Verify replica2 has received both messages (hello/world)
+ assert_equal "smessage ch3 hello" [$subscribeclient2 read]
+ assert_equal "smessage ch3 world" [$subscribeclient2 read]
+ } {} {needs:debug}
+}
+
+start_cluster 3 0 {tags {external:skip cluster}} {
+ test "Each node has two links with each peer" {
+ for {set id 0} {$id < [llength $::servers]} {incr id} {
+ # Assert that from point of view of each node, there are two links for
+ # each peer. It might take a while for cluster to stabilize so wait up
+ # to 5 seconds.
+ wait_for_condition 50 100 {
+ [number_of_peers $id]*2 == [number_of_links $id]
+ } else {
+ assert_equal [expr [number_of_peers $id]*2] [number_of_links $id]
+ }
+
+ set nodes [get_cluster_nodes $id]
+ set links [R $id cluster links]
+
+ # For each peer there should be exactly one
+ # link "to" it and one link "from" it.
+ foreach n $nodes {
+ if {[cluster_has_flag $n myself]} continue
+ set peer [dict get $n id]
+ set to 0
+ set from 0
+ foreach l $links {
+ if {[dict get $l node] eq $peer} {
+ if {[dict get $l direction] eq "to"} {
+ incr to
+ } elseif {[dict get $l direction] eq "from"} {
+ incr from
+ }
+ }
+ }
+ assert {$to eq 1}
+ assert {$from eq 1}
+ }
+ }
+ }
+
+ test {Validate cluster links format} {
+ set lines [R 0 cluster links]
+ foreach l $lines {
+ if {$l eq {}} continue
+ assert_equal [llength $l] 12
+ assert_equal 1 [dict exists $l "direction"]
+ assert_equal 1 [dict exists $l "node"]
+ assert_equal 1 [dict exists $l "create-time"]
+ assert_equal 1 [dict exists $l "events"]
+ assert_equal 1 [dict exists $l "send-buffer-allocated"]
+ assert_equal 1 [dict exists $l "send-buffer-used"]
+ }
+ }
+
+ set primary1_id 0
+ set primary2_id 1
+
+ set primary1 [Rn $primary1_id]
+ set primary2 [Rn $primary2_id]
+
+ test "Disconnect link when send buffer limit reached" {
+ # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
+ set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
+ $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]
+
+ # Get primary1's links with primary2
+ set primary2_name [dict get [cluster_get_myself $primary2_id] id]
+ set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
+ set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
+
+ # On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be
+ # overflowed by regular gossip messages but also small enough that it doesn't take too much
+ # memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this
+ # limit is overflowed in some RAM-limited test environments.
+ set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
+ $primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024]
+ assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0}
+
+ # To manufacture an ever-growing send buffer from primary1 to primary2,
+ # make primary2 unresponsive.
+ set primary2_pid [srv [expr -1*$primary2_id] pid]
+ pause_process $primary2_pid
+
+ # On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from
+ # primary1 to primary2 exceeds buffer limit therefore be dropped.
+ # For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP
+ # receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB
+ # messages should be sufficient.
+ set i 0
+ wait_for_condition 100 0 {
+ [catch {incr i} e] == 0 &&
+ [catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 &&
+ [catch {after 500} e] == 0 &&
+ [CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1
+ } else {
+ fail "Cluster link not freed as expected"
+ }
+
+ # A new link to primary2 should have been recreated
+ set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
+ assert {[dict get $new_link_p1_to_p2 create-time] > [dict get $orig_link_p1_to_p2 create-time]}
+
+ # Link from primary2 should not be affected
+ set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
+ assert {[dict get $same_link_p1_from_p2 create-time] eq [dict get $orig_link_p1_from_p2 create-time]}
+
+ # Revive primary2
+ resume_process $primary2_pid
+
+ # Reset configs on primary1 so config changes don't leak out to other tests
+ $primary1 CONFIG set cluster-node-timeout $oldtimeout
+ $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit
+
+ reset_links $primary1_id
+ }
+
+ test "Link memory increases with publishes" {
+ set server_id 0
+ set server [Rn $server_id]
+ set msg_size 10000
+ set num_msgs 10
+
+ # Remove any sendbuf limit
+ $primary1 CONFIG set cluster-link-sendbuf-limit 0
+
+ # Publish ~100KB to one of the servers
+ $server MULTI
+ $server INFO memory
+ publish_messages $server $num_msgs $msg_size
+ $server INFO memory
+ set res [$server EXEC]
+
+ set link_mem_before_pubs [getInfoProperty $res mem_cluster_links]
+
+ # Remove the first half of the response string which contains the
+ # first "INFO memory" results and search for the property again
+ set res [string range $res [expr [string length $res] / 2] end]
+ set link_mem_after_pubs [getInfoProperty $res mem_cluster_links]
+
+ # We expect the memory to have increased by more than
+ # the culmulative size of the publish messages
+ set mem_diff_floor [expr $msg_size * $num_msgs]
+ set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs]
+ assert {$mem_diff > $mem_diff_floor}
+
+ # Reset links to ensure no leftover data for the next test
+ reset_links $server_id
+ }
+
+ test "Link memory resets after publish messages flush" {
+ set server [Rn 0]
+ set msg_size 100000
+ set num_msgs 10
+
+ set link_mem_before [status $server mem_cluster_links]
+
+ # Publish ~1MB to one of the servers
+ $server MULTI
+ publish_messages $server $num_msgs $msg_size
+ $server EXEC
+
+ # Wait until the cluster link memory has returned to below the pre-publish value.
+ # We can't guarantee it returns to the exact same value since gossip messages
+ # can cause the values to fluctuate.
+ wait_for_condition 1000 500 {
+ [status $server mem_cluster_links] <= $link_mem_before
+ } else {
+ fail "Cluster link memory did not settle back to expected range"
+ }
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/misc.tcl b/examples/redis-unstable/tests/unit/cluster/misc.tcl
new file mode 100644
index 0000000..62bdcf7
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/misc.tcl
@@ -0,0 +1,36 @@
+start_cluster 2 2 {tags {external:skip cluster}} {
+ test {Key lazy expires during key migration} {
+ R 0 DEBUG SET-ACTIVE-EXPIRE 0
+
+ set key_slot [R 0 CLUSTER KEYSLOT FOO]
+ R 0 set FOO BAR PX 10
+ set src_id [R 0 CLUSTER MYID]
+ set trg_id [R 1 CLUSTER MYID]
+ R 0 CLUSTER SETSLOT $key_slot MIGRATING $trg_id
+ R 1 CLUSTER SETSLOT $key_slot IMPORTING $src_id
+ after 11
+ assert_error {ASK*} {R 0 GET FOO}
+ R 0 ping
+ } {PONG}
+
+ test "Coverage: Basic cluster commands" {
+ assert_equal {OK} [R 0 CLUSTER saveconfig]
+
+ set id [R 0 CLUSTER MYID]
+ assert_equal {0} [R 0 CLUSTER count-failure-reports $id]
+
+ R 0 flushall
+ assert_equal {OK} [R 0 CLUSTER flushslots]
+ }
+
+ test "CROSSSLOT error for keys in different slots" {
+ # Test MSET with keys in different slots
+ assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MSET foo bar baz qux}
+
+ # Test DEL with keys in different slots
+ assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 DEL foo bar}
+
+ # Test MGET with keys in different slots
+ assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MGET foo bar}
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl b/examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl
new file mode 100644
index 0000000..5d2d03e
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl
@@ -0,0 +1,182 @@
+# This test uses a custom slot allocation for testing
+proc cluster_allocate_with_continuous_slots_local {n} {
+ R 0 cluster ADDSLOTSRANGE 0 3276
+ R 1 cluster ADDSLOTSRANGE 3277 6552
+ R 2 cluster ADDSLOTSRANGE 6553 9828
+ R 3 cluster ADDSLOTSRANGE 9829 13104
+ R 4 cluster ADDSLOTSRANGE 13105 16383
+}
+
+start_cluster 5 0 {tags {external:skip cluster}} {
+
+set master1 [srv 0 "client"]
+set master2 [srv -1 "client"]
+set master3 [srv -2 "client"]
+set master4 [srv -3 "client"]
+set master5 [srv -4 "client"]
+
+test "Continuous slots distribution" {
+ assert_match "* 0-3276*" [$master1 CLUSTER NODES]
+ assert_match "* 3277-6552*" [$master2 CLUSTER NODES]
+ assert_match "* 6553-9828*" [$master3 CLUSTER NODES]
+ assert_match "* 9829-13104*" [$master4 CLUSTER NODES]
+ assert_match "* 13105-16383*" [$master5 CLUSTER NODES]
+ assert_match "*0 3276*" [$master1 CLUSTER SLOTS]
+ assert_match "*3277 6552*" [$master2 CLUSTER SLOTS]
+ assert_match "*6553 9828*" [$master3 CLUSTER SLOTS]
+ assert_match "*9829 13104*" [$master4 CLUSTER SLOTS]
+ assert_match "*13105 16383*" [$master5 CLUSTER SLOTS]
+
+ $master1 CLUSTER DELSLOTSRANGE 3001 3050
+ assert_match "* 0-3000 3051-3276*" [$master1 CLUSTER NODES]
+ assert_match "*0 3000*3051 3276*" [$master1 CLUSTER SLOTS]
+
+ $master2 CLUSTER DELSLOTSRANGE 5001 5500
+ assert_match "* 3277-5000 5501-6552*" [$master2 CLUSTER NODES]
+ assert_match "*3277 5000*5501 6552*" [$master2 CLUSTER SLOTS]
+
+ $master3 CLUSTER DELSLOTSRANGE 7001 7100 8001 8500
+ assert_match "* 6553-7000 7101-8000 8501-9828*" [$master3 CLUSTER NODES]
+ assert_match "*6553 7000*7101 8000*8501 9828*" [$master3 CLUSTER SLOTS]
+
+ $master4 CLUSTER DELSLOTSRANGE 11001 12000 12101 12200
+ assert_match "* 9829-11000 12001-12100 12201-13104*" [$master4 CLUSTER NODES]
+ assert_match "*9829 11000*12001 12100*12201 13104*" [$master4 CLUSTER SLOTS]
+
+ $master5 CLUSTER DELSLOTSRANGE 13501 14000 15001 16000
+ assert_match "* 13105-13500 14001-15000 16001-16383*" [$master5 CLUSTER NODES]
+ assert_match "*13105 13500*14001 15000*16001 16383*" [$master5 CLUSTER SLOTS]
+}
+
+test "ADDSLOTS command with several boundary conditions test suite" {
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTS 3001 aaa}
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTS 3001 -1000}
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTS 3001 30003}
+
+ assert_error "ERR Slot 3200 is already busy" {R 0 cluster ADDSLOTS 3200}
+ assert_error "ERR Slot 8501 is already busy" {R 0 cluster ADDSLOTS 8501}
+
+ assert_error "ERR Slot 3001 specified multiple times" {R 0 cluster ADDSLOTS 3001 3002 3001}
+}
+
+test "ADDSLOTSRANGE command with several boundary conditions test suite" {
+ # Add multiple slots with incorrect argument number
+ assert_error "ERR wrong number of arguments for 'cluster|addslotsrange' command" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030}
+
+ # Add multiple slots with invalid input slot
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030 aaa}
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030 70000}
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTSRANGE 3001 3020 -1000 3030}
+
+ # Add multiple slots when start slot number is greater than the end slot
+ assert_error "ERR start slot number 3030 is greater than end slot number 3025" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030 3025}
+
+ # Add multiple slots with busy slot
+ assert_error "ERR Slot 3200 is already busy" {R 0 cluster ADDSLOTSRANGE 3001 3020 3200 3250}
+
+ # Add multiple slots with assigned multiple times
+ assert_error "ERR Slot 3001 specified multiple times" {R 0 cluster ADDSLOTSRANGE 3001 3020 3001 3020}
+}
+
+test "DELSLOTSRANGE command with several boundary conditions test suite" {
+ # Delete multiple slots with incorrect argument number
+ assert_error "ERR wrong number of arguments for 'cluster|delslotsrange' command" {R 0 cluster DELSLOTSRANGE 1000 2000 2100}
+ assert_match "* 0-3000 3051-3276*" [$master1 CLUSTER NODES]
+ assert_match "*0 3000*3051 3276*" [$master1 CLUSTER SLOTS]
+
+ # Delete multiple slots with invalid input slot
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster DELSLOTSRANGE 1000 2000 2100 aaa}
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster DELSLOTSRANGE 1000 2000 2100 70000}
+ assert_error "ERR Invalid or out of range slot" {R 0 cluster DELSLOTSRANGE 1000 2000 -2100 2200}
+ assert_match "* 0-3000 3051-3276*" [$master1 CLUSTER NODES]
+ assert_match "*0 3000*3051 3276*" [$master1 CLUSTER SLOTS]
+
+ # Delete multiple slots when start slot number is greater than the end slot
+ assert_error "ERR start slot number 5800 is greater than end slot number 5750" {R 1 cluster DELSLOTSRANGE 5600 5700 5800 5750}
+ assert_match "* 3277-5000 5501-6552*" [$master2 CLUSTER NODES]
+ assert_match "*3277 5000*5501 6552*" [$master2 CLUSTER SLOTS]
+
+ # Delete multiple slots with already unassigned
+ assert_error "ERR Slot 7001 is already unassigned" {R 2 cluster DELSLOTSRANGE 7001 7100 9000 9200}
+ assert_match "* 6553-7000 7101-8000 8501-9828*" [$master3 CLUSTER NODES]
+ assert_match "*6553 7000*7101 8000*8501 9828*" [$master3 CLUSTER SLOTS]
+
+ # Delete multiple slots with assigned multiple times
+ assert_error "ERR Slot 12500 specified multiple times" {R 3 cluster DELSLOTSRANGE 12500 12600 12500 12600}
+ assert_match "* 9829-11000 12001-12100 12201-13104*" [$master4 CLUSTER NODES]
+ assert_match "*9829 11000*12001 12100*12201 13104*" [$master4 CLUSTER SLOTS]
+}
+} cluster_allocate_with_continuous_slots_local
+
+start_cluster 2 0 {tags {external:skip cluster experimental}} {
+
+set master1 [srv 0 "client"]
+set master2 [srv -1 "client"]
+
+test "SFLUSH - Errors and output validation" {
+ assert_match "* 0-8191*" [$master1 CLUSTER NODES]
+ assert_match "* 8192-16383*" [$master2 CLUSTER NODES]
+ assert_match "*0 8191*" [$master1 CLUSTER SLOTS]
+ assert_match "*8192 16383*" [$master2 CLUSTER SLOTS]
+
+ # make master1 non-continuous slots
+ $master1 cluster DELSLOTSRANGE 1000 2000
+
+ # Test SFLUSH errors validation
+ assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4}
+ assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 SYNC}
+ assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH x 4}
+ assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH 0 12x}
+ assert_error {ERR Slot 3 specified multiple times} {$master1 SFLUSH 2 4 3 5}
+ assert_error {ERR start slot number 8 is greater than*} {$master1 SFLUSH 8 4}
+ assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 8 10}
+ assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 0 999 2001 8191 ASYNCX}
+
+ # Test SFLUSH output validation
+ assert_match "" [$master1 SFLUSH 2 4]
+ assert_match "" [$master1 SFLUSH 0 4]
+ assert_match "" [$master2 SFLUSH 0 4]
+ assert_match "" [$master1 SFLUSH 1 8191]
+ assert_match "" [$master1 SFLUSH 0 8190]
+ assert_match "" [$master1 SFLUSH 0 998 2001 8191]
+ assert_match "" [$master1 SFLUSH 1 999 2001 8191]
+ assert_match "" [$master1 SFLUSH 0 999 2001 8190]
+ assert_match "" [$master1 SFLUSH 0 999 2002 8191]
+ assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 999 2001 8191]
+ assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 8191]
+ assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 4000 4001 8191]
+ assert_match "" [$master2 SFLUSH 8193 16383]
+ assert_match "" [$master2 SFLUSH 8192 16382]
+ assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383]
+ assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 SYNC]
+ assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 ASYNC]
+ assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383]
+ assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 SYNC]
+ assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 ASYNC]
+
+ # restore master1 continuous slots
+ $master1 cluster ADDSLOTSRANGE 1000 2000
+}
+
+test "SFLUSH - Deletes the keys with argument <NONE>/SYNC/ASYNC" {
+ foreach op {"" "SYNC" "ASYNC"} {
+ for {set i 0} {$i < 100} {incr i} {
+ catch {$master1 SET key$i val$i}
+ catch {$master2 SET key$i val$i}
+ }
+
+ assert {[$master1 DBSIZE] > 0}
+ assert {[$master2 DBSIZE] > 0}
+ if {$op eq ""} {
+ assert_match "{0 8191}" [ $master1 SFLUSH 0 8191]
+ } else {
+ assert_match "{0 8191}" [ $master1 SFLUSH 0 8191 $op]
+ }
+ assert {[$master1 DBSIZE] == 0}
+ assert {[$master2 DBSIZE] > 0}
+ assert_match "{8192 16383}" [ $master2 SFLUSH 8192 16383]
+ assert {[$master2 DBSIZE] == 0}
+ }
+}
+
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/scripting.tcl b/examples/redis-unstable/tests/unit/cluster/scripting.tcl
new file mode 100644
index 0000000..76aa882
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/scripting.tcl
@@ -0,0 +1,91 @@
+start_cluster 1 0 {tags {external:skip cluster}} {
+
+ test {Eval scripts with shebangs and functions default to no cross slots} {
+ # Test that scripts with shebang block cross slot operations
+ assert_error "ERR Script attempted to access keys that do not hash to the same slot*" {
+ r 0 eval {#!lua
+ redis.call('set', 'foo', 'bar')
+ redis.call('set', 'bar', 'foo')
+ return 'OK'
+ } 0}
+
+ # Test the functions by default block cross slot operations
+ r 0 function load REPLACE {#!lua name=crossslot
+ local function test_cross_slot(keys, args)
+ redis.call('set', 'foo', 'bar')
+ redis.call('set', 'bar', 'foo')
+ return 'OK'
+ end
+
+ redis.register_function('test_cross_slot', test_cross_slot)}
+ assert_error "ERR Script attempted to access keys that do not hash to the same slot*" {r FCALL test_cross_slot 0}
+ }
+
+ test {Cross slot commands are allowed by default for eval scripts and with allow-cross-slot-keys flag} {
+ # Old style lua scripts are allowed to access cross slot operations
+ r 0 eval "redis.call('set', 'foo', 'bar'); redis.call('set', 'bar', 'foo')" 0
+
+ # scripts with allow-cross-slot-keys flag are allowed
+ r 0 eval {#!lua flags=allow-cross-slot-keys
+ redis.call('set', 'foo', 'bar'); redis.call('set', 'bar', 'foo')
+ } 0
+
+ # Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup
+ # during cross-slot operation from the above lua script.
+ assert_equal "bar" [r 0 get foo]
+ assert_equal "foo" [r 0 get bar]
+ r 0 del foo
+ r 0 del bar
+
+ # Functions with allow-cross-slot-keys flag are allowed
+ r 0 function load REPLACE {#!lua name=crossslot
+ local function test_cross_slot(keys, args)
+ redis.call('set', 'foo', 'bar')
+ redis.call('set', 'bar', 'foo')
+ return 'OK'
+ end
+
+ redis.register_function{function_name='test_cross_slot', callback=test_cross_slot, flags={ 'allow-cross-slot-keys' }}}
+ r FCALL test_cross_slot 0
+
+ # Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup
+ # during cross-slot operation from the above lua function.
+ assert_equal "bar" [r 0 get foo]
+ assert_equal "foo" [r 0 get bar]
+ }
+
+ test {Cross slot commands are also blocked if they disagree with pre-declared keys} {
+ assert_error "ERR Script attempted to access keys that do not hash to the same slot*" {
+ r 0 eval {#!lua
+ redis.call('set', 'foo', 'bar')
+ return 'OK'
+ } 1 bar}
+ }
+
+ test {Cross slot commands are allowed by default if they disagree with pre-declared keys} {
+ r 0 flushall
+ r 0 eval "redis.call('set', 'foo', 'bar')" 1 bar
+
+ # Make sure the script writes to the right slot
+ assert_equal 1 [r 0 cluster COUNTKEYSINSLOT 12182] ;# foo slot
+ assert_equal 0 [r 0 cluster COUNTKEYSINSLOT 5061] ;# bar slot
+ }
+
+ test "Function no-cluster flag" {
+ R 0 function load {#!lua name=test
+ redis.register_function{function_name='f1', callback=function() return 'hello' end, flags={'no-cluster'}}
+ }
+ catch {R 0 fcall f1 0} e
+ assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e
+ }
+
+ test "Script no-cluster flag" {
+ catch {
+ R 0 eval {#!lua flags=no-cluster
+ return 1
+ } 0
+ } e
+
+ assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl b/examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl
new file mode 100644
index 0000000..57b550a
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl
@@ -0,0 +1,67 @@
+#
+# Copyright (c) 2009-Present, Redis Ltd.
+# All rights reserved.
+#
+# Licensed under your choice of (a) the Redis Source Available License 2.0
+# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+# GNU Affero General Public License v3 (AGPLv3).
+#
+# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+#
+
+start_cluster 1 1 {tags {external:skip cluster}} {
+ set primary_id 0
+ set replica1_id 1
+
+ set primary [Rn $primary_id]
+ set replica [Rn $replica1_id]
+
+ test "Sharded pubsub publish behavior within multi/exec" {
+ foreach {node} {primary replica} {
+ set node [set $node]
+ $node MULTI
+ $node SPUBLISH ch1 "hello"
+ $node EXEC
+ }
+ }
+
+ test "Sharded pubsub within multi/exec with cross slot operation" {
+ $primary MULTI
+ $primary SPUBLISH ch1 "hello"
+ $primary GET foo
+ catch {$primary EXEC} err
+ assert_match {CROSSSLOT*} $err
+ }
+
+ test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
+ $primary MULTI
+ $primary SPUBLISH foo "hello"
+ $primary GET foo
+ $primary EXEC
+ } {0 {}}
+
+ test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
+ $replica MULTI
+ $replica SPUBLISH foo "hello"
+ catch {[$replica GET foo]} err
+ assert_match {MOVED*} $err
+ catch {[$replica EXEC]} err
+ assert_match {EXECABORT*} $err
+ }
+
+ test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
+ $primary MULTI
+ $primary SPUBLISH foo "hello"
+ $primary SET foo bar
+ $primary EXEC
+ } {0 OK}
+
+ test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
+ $replica MULTI
+ $replica SPUBLISH foo "hello"
+ catch {[$replica SET foo bar]} err
+ assert_match {MOVED*} $err
+ catch {[$replica EXEC]} err
+ assert_match {EXECABORT*} $err
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl b/examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl
new file mode 100644
index 0000000..0f3e3cc
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl
@@ -0,0 +1,61 @@
+start_cluster 2 2 {tags {external:skip cluster}} {
+
+ test "Verify that slot ownership transfer through gossip propagates deletes to replicas" {
+ assert {[s -2 role] eq {slave}}
+ wait_for_condition 1000 50 {
+ [s -2 master_link_status] eq {up}
+ } else {
+ fail "Instance #2 master link status is not up"
+ }
+
+ assert {[s -3 role] eq {slave}}
+ wait_for_condition 1000 50 {
+ [s -3 master_link_status] eq {up}
+ } else {
+ fail "Instance #3 master link status is not up"
+ }
+
+ # Set a single key that will be used to test deletion
+ set key "FOO"
+ R 0 SET $key TEST
+ set key_slot [R 0 cluster keyslot $key]
+ set slot_keys_num [R 0 cluster countkeysinslot $key_slot]
+ assert {$slot_keys_num > 0}
+
+ # Wait for replica to have the key
+ R 2 readonly
+ wait_for_condition 1000 50 {
+ [R 2 exists $key] eq "1"
+ } else {
+ fail "Test key was not replicated"
+ }
+
+ assert_equal [R 2 cluster countkeysinslot $key_slot] $slot_keys_num
+
+ # Assert other shards in cluster doesn't have the key
+ assert_equal [R 1 cluster countkeysinslot $key_slot] "0"
+ assert_equal [R 3 cluster countkeysinslot $key_slot] "0"
+
+ set nodeid [R 1 cluster myid]
+
+ R 1 cluster bumpepoch
+ # Move $key_slot to node 1
+ assert_equal [R 1 cluster setslot $key_slot node $nodeid] "OK"
+
+ wait_for_cluster_propagation
+
+ # src master will delete keys in the slot
+ wait_for_condition 50 100 {
+ [R 0 cluster countkeysinslot $key_slot] eq 0
+ } else {
+ fail "master 'countkeysinslot $key_slot' did not eq 0"
+ }
+
+ # src replica will delete keys in the slot
+ wait_for_condition 50 100 {
+ [R 2 cluster countkeysinslot $key_slot] eq 0
+ } else {
+ fail "replica 'countkeysinslot $key_slot' did not eq 0"
+ }
+ }
+}
diff --git a/examples/redis-unstable/tests/unit/cluster/slot-stats.tcl b/examples/redis-unstable/tests/unit/cluster/slot-stats.tcl
new file mode 100644
index 0000000..1123731
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/slot-stats.tcl
@@ -0,0 +1,1169 @@
+#
+# Copyright (c) 2009-Present, Redis Ltd.
+# All rights reserved.
+#
+# Copyright (c) 2024-present, Valkey contributors.
+# All rights reserved.
+#
+# Licensed under your choice of (a) the Redis Source Available License 2.0
+# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+# GNU Affero General Public License v3 (AGPLv3).
+#
+# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+#
+
+# Integration tests for CLUSTER SLOT-STATS command.
+
+# -----------------------------------------------------------------------------
+# Helper functions for CLUSTER SLOT-STATS test cases.
+# -----------------------------------------------------------------------------
+
+# Converts array RESP response into a dict.
+# This is useful for many test cases, where unnecessary nesting is removed.
+proc convert_array_into_dict {slot_stats} {
+ set res [dict create]
+ foreach slot_stat $slot_stats {
+ # slot_stat is an array of size 2, where 0th index represents (int) slot,
+ # and 1st index represents (map) usage statistics.
+ dict set res [lindex $slot_stat 0] [lindex $slot_stat 1]
+ }
+ return $res
+}
+
+proc get_cmdstat_usec {cmd r} {
+ set cmdstatline [cmdrstat $cmd r]
+ regexp "usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline -> usec _
+ return $usec
+}
+
+proc initialize_expected_slots_dict {} {
+ set expected_slots [dict create]
+ for {set i 0} {$i < 16384} {incr i 1} {
+ dict set expected_slots $i 0
+ }
+ return $expected_slots
+}
+
+proc initialize_expected_slots_dict_with_range {start_slot end_slot} {
+ assert {$start_slot <= $end_slot}
+ set expected_slots [dict create]
+ for {set i $start_slot} {$i <= $end_slot} {incr i 1} {
+ dict set expected_slots $i 0
+ }
+ return $expected_slots
+}
+
+proc assert_empty_slot_stats {slot_stats metrics_to_assert} {
+ set slot_stats [convert_array_into_dict $slot_stats]
+ dict for {slot stats} $slot_stats {
+ foreach metric_name $metrics_to_assert {
+ set metric_value [dict get $stats $metric_name]
+ assert {$metric_value == 0}
+ }
+ }
+}
+
+proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_to_assert} {
+ set slot_stats [convert_array_into_dict $slot_stats]
+ dict for {slot stats} $exception_slots {
+ assert {[dict exists $slot_stats $slot]} ;# slot_stats must contain the expected slots.
+ }
+ dict for {slot stats} $slot_stats {
+ if {[dict exists $exception_slots $slot]} {
+ foreach metric_name $metrics_to_assert {
+ set metric_value [dict get $exception_slots $slot $metric_name]
+ assert {[dict get $stats $metric_name] == $metric_value}
+ }
+ } else {
+ dict for {metric value} $stats {
+ assert {$value == 0}
+ }
+ }
+ }
+}
+
+proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 deterministic_metrics non_deterministic_metrics} {
+ set slot_stats_1 [convert_array_into_dict $slot_stats_1]
+ set slot_stats_2 [convert_array_into_dict $slot_stats_2]
+ assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]}
+
+ dict for {slot stats_1} $slot_stats_1 {
+ assert {[dict exists $slot_stats_2 $slot]}
+ set stats_2 [dict get $slot_stats_2 $slot]
+
+ # For deterministic metrics, we assert their equality.
+ foreach metric $deterministic_metrics {
+ assert {[dict get $stats_1 $metric] == [dict get $stats_2 $metric]}
+ }
+ # For non-deterministic metrics, we assert their non-zeroness as a best-effort.
+ foreach metric $non_deterministic_metrics {
+ assert {([dict get $stats_1 $metric] == 0 && [dict get $stats_2 $metric] == 0) || \
+ ([dict get $stats_1 $metric] != 0 && [dict get $stats_2 $metric] != 0)}
+ }
+ }
+}
+
+proc assert_all_slots_have_been_seen {expected_slots} {
+ dict for {k v} $expected_slots {
+ assert {$v == 1}
+ }
+}
+
+proc assert_slot_visibility {slot_stats expected_slots} {
+ set slot_stats [convert_array_into_dict $slot_stats]
+ dict for {slot _} $slot_stats {
+ assert {[dict exists $expected_slots $slot]}
+ dict set expected_slots $slot 1
+ }
+
+ assert_all_slots_have_been_seen $expected_slots
+}
+
+proc assert_slot_stats_monotonic_order {slot_stats orderby is_desc} {
+ # For Tcl dict, the order of iteration is the order in which the keys were inserted into the dictionary
+ # Thus, the response ordering is preserved upon calling 'convert_array_into_dict()'.
+ # Source: https://www.tcl.tk/man/tcl8.6.11/TclCmd/dict.htm
+ set slot_stats [convert_array_into_dict $slot_stats]
+ set prev_metric -1
+ dict for {_ stats} $slot_stats {
+ set curr_metric [dict get $stats $orderby]
+ if {$prev_metric != -1} {
+ if {$is_desc == 1} {
+ assert {$prev_metric >= $curr_metric}
+ } else {
+ assert {$prev_metric <= $curr_metric}
+ }
+ }
+ set prev_metric $curr_metric
+ }
+}
+
+proc assert_slot_stats_monotonic_descent {slot_stats orderby} {
+ assert_slot_stats_monotonic_order $slot_stats $orderby 1
+}
+
+proc assert_slot_stats_monotonic_ascent {slot_stats orderby} {
+ assert_slot_stats_monotonic_order $slot_stats $orderby 0
+}
+
+proc wait_for_replica_key_exists {key key_count} {
+ wait_for_condition 1000 50 {
+ [R 1 exists $key] eq "$key_count"
+ } else {
+ fail "Test key was not replicated"
+ }
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS cpu-usec metric correctness.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+
+ # Define shared variables.
+ set key "FOO"
+ set key_slot [R 0 cluster keyslot $key]
+ set key_secondary "FOO2"
+ set key_secondary_slot [R 0 cluster keyslot $key_secondary]
+ set metrics_to_assert [list cpu-usec]
+
+ test "CLUSTER SLOT-STATS cpu-usec reset upon CONFIG RESETSTAT." {
+ R 0 SET $key VALUE
+ R 0 DEL $key
+ R 0 CONFIG RESETSTAT
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec reset upon slot migration." {
+ R 0 SET $key VALUE
+
+ R 0 CLUSTER DELSLOTS $key_slot
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+
+ R 0 CLUSTER ADDSLOTS $key_slot
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for non-slot specific commands." {
+ R 0 INFO
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for slot specific commands." {
+ R 0 SET $key VALUE
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set usec [get_cmdstat_usec set r]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create cpu-usec $usec
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on keyspace update." {
+ # Blocking command with no timeout. Only keyspace update can unblock this client.
+ set rd [redis_deferring_client]
+ $rd BLPOP $key 0
+ wait_for_blocked_clients_count 1
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ # When the client is blocked, no accumulation is made. This behaviour is identical to INFO COMMANDSTATS.
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+
+ # Unblocking command.
+ R 0 LPUSH $key value
+ wait_for_blocked_clients_count 0
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set lpush_usec [get_cmdstat_usec lpush r]
+ set blpop_usec [get_cmdstat_usec blpop r]
+
+ # Assert that both blocking and non-blocking command times have been accumulated.
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create cpu-usec [expr $lpush_usec + $blpop_usec]
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on timeout." {
+ # Blocking command with 0.5 seconds timeout.
+ set rd [redis_deferring_client]
+ $rd BLPOP $key 0.5
+
+ # Confirm that the client is blocked, then unblocked within 1 second.
+ wait_for_blocked_clients_count 1
+ wait_for_blocked_clients_count 0
+
+ # Assert that the blocking command time has been accumulated.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set blpop_usec [get_cmdstat_usec blpop r]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create cpu-usec $blpop_usec
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for transactions." {
+ set r1 [redis_client]
+ $r1 MULTI
+ $r1 SET $key value
+ $r1 GET $key
+
+ # CPU metric is not accumulated until EXEC is reached. This behaviour is identical to INFO COMMANDSTATS.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+
+ # Execute transaction, and assert that all nested command times have been accumulated.
+ $r1 EXEC
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set exec_usec [get_cmdstat_usec exec r]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create cpu-usec $exec_usec
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, without cross-slot keys." {
+ R 0 eval {#!lua
+ redis.call('set', KEYS[1], 'bar') redis.call('get', KEYS[2])
+ } 2 $key $key
+
+ set eval_usec [get_cmdstat_usec eval r]
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create cpu-usec $eval_usec
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, with cross-slot keys." {
+ R 0 eval {#!lua flags=allow-cross-slot-keys
+ redis.call('set', KEYS[1], 'bar') redis.call('get', ARGV[1])
+ } 1 $key $key_secondary
+
+ # For cross-slot, we do not accumulate at all.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for functions, without cross-slot keys." {
+ R 0 function load replace {#!lua name=f1
+ redis.register_function{
+ function_name='f1',
+ callback=function(keys, args) redis.call('set', keys[1], '1') redis.call('get', keys[2]) end
+ }
+ }
+ R 0 fcall f1 2 $key $key
+
+ set fcall_usec [get_cmdstat_usec fcall r]
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create cpu-usec $fcall_usec
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS cpu-usec for functions, with cross-slot keys." {
+ R 0 function load replace {#!lua name=f1
+ redis.register_function{
+ function_name='f1',
+ callback=function(keys, args) redis.call('set', keys[1], '1') redis.call('get', args[1]) end,
+ flags={'allow-cross-slot-keys'}
+ }
+ }
+ R 0 fcall f1 1 $key $key_secondary
+
+ # For cross-slot, we do not accumulate at all.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS network-bytes-in.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+
+ # Define shared variables.
+ set key "key"
+ set key_slot [R 0 cluster keyslot $key]
+ set metrics_to_assert [list network-bytes-in]
+
+ test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." {
+ # *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
+ R 0 SET $key value
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-in 33
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." {
+ set rd [redis_deferring_client]
+ # SET key value\r\n --> 15 bytes.
+ $rd write "SET $key value\r\n"
+ $rd flush
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-in 15
+ ]
+ ]
+
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-in, blocking command." {
+ set rd [redis_deferring_client]
+ # *3\r\n$5\r\nblpop\r\n$3\r\nkey\r\n$1\r\n0\r\n --> 31 bytes.
+ $rd BLPOP $key 0
+ wait_for_blocked_clients_count 1
+
+ # Slot-stats must be empty here, as the client is yet to be unblocked.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+
+ # *3\r\n$5\r\nlpush\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 35 bytes.
+ R 0 LPUSH $key value
+ wait_for_blocked_clients_count 0
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-in 66 ;# 31 + 35 bytes.
+ ]
+ ]
+
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-in, multi-exec transaction." {
+ set r [redis_client]
+ # *1\r\n$5\r\nmulti\r\n --> 15 bytes.
+ $r MULTI
+ # *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
+ assert {[$r SET $key value] eq {QUEUED}}
+ # *1\r\n$4\r\nexec\r\n --> 14 bytes.
+ assert {[$r EXEC] eq {OK}}
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-in 62 ;# 15 + 33 + 14 bytes.
+ ]
+ ]
+
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-in, non slot specific command." {
+ R 0 INFO
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-in, pub/sub." {
+ # PUB/SUB does not get accumulated at per-slot basis,
+ # as it is cluster-wide and is not slot specific.
+ set rd [redis_deferring_client]
+ $rd subscribe channel
+ R 0 publish channel message
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+}
+
+start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+ set channel "channel"
+ set key_slot [R 0 cluster keyslot $channel]
+ set metrics_to_assert [list network-bytes-in]
+
+ # Setup replication.
+ assert {[s -1 role] eq {slave}}
+ wait_for_condition 1000 50 {
+ [s -1 master_link_status] eq {up}
+ } else {
+ fail "Instance #1 master link status is not up"
+ }
+ R 1 readonly
+
+ test "CLUSTER SLOT-STATS network-bytes-in, sharded pub/sub." {
+ set slot [R 0 cluster keyslot $channel]
+ set primary [Rn 0]
+ set replica [Rn 1]
+ set replica_subcriber [redis_deferring_client -1]
+ $replica_subcriber SSUBSCRIBE $channel
+ # *2\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n --> 34 bytes.
+ $primary SPUBLISH $channel hello
+ # *3\r\n$8\r\nspublish\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
+
+ set slot_stats [$primary CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-in 42
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+
+ set slot_stats [$replica CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-in 34
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS network-bytes-out correctness.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster}} {
+ # Define shared variables.
+ set key "FOO"
+ set key_slot [R 0 cluster keyslot $key]
+ set expected_slots_to_key_count [dict create $key_slot 1]
+ set metrics_to_assert [list network-bytes-out]
+ R 0 CONFIG SET cluster-slot-stats-enabled yes
+
+ test "CLUSTER SLOT-STATS network-bytes-out, for non-slot specific commands." {
+ R 0 INFO
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-out, for slot specific commands." {
+ R 0 SET $key value
+ # +OK\r\n --> 5 bytes
+
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-out 5
+ ]
+ ]
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+
+ test "CLUSTER SLOT-STATS network-bytes-out, blocking commands." {
+ set rd [redis_deferring_client]
+ $rd BLPOP $key 0
+ wait_for_blocked_clients_count 1
+
+ # Assert empty slot stats here, since COB is yet to be flushed due to the block.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+
+ # Unblock the command.
+ # LPUSH client) :1\r\n --> 4 bytes.
+ # BLPOP client) *2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 24 bytes, upon unblocking.
+ R 0 LPUSH $key value
+ wait_for_blocked_clients_count 0
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-out 28 ;# 4 + 24 bytes.
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ R 0 CONFIG RESETSTAT
+ R 0 FLUSHALL
+}
+
+start_cluster 1 1 {tags {external:skip cluster}} {
+
+ # Define shared variables.
+ set key "FOO"
+ set key_slot [R 0 CLUSTER KEYSLOT $key]
+ set metrics_to_assert [list network-bytes-out]
+ R 0 CONFIG SET cluster-slot-stats-enabled yes
+
+ # Setup replication.
+ assert {[s -1 role] eq {slave}}
+ wait_for_condition 1000 50 {
+ [s -1 master_link_status] eq {up}
+ } else {
+ fail "Instance #1 master link status is not up"
+ }
+ R 1 readonly
+
+ test "CLUSTER SLOT-STATS network-bytes-out, replication stream egress." {
+ assert_equal [R 0 SET $key VALUE] {OK}
+ # Local client) +OK\r\n --> 5 bytes.
+ # Replication stream) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-out 38 ;# 5 + 33 bytes.
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+}
+
+start_cluster 1 1 {tags {external:skip cluster}} {
+
+ # Define shared variables.
+ set channel "channel"
+ set key_slot [R 0 cluster keyslot $channel]
+ set channel_secondary "channel2"
+ set key_slot_secondary [R 0 cluster keyslot $channel_secondary]
+ set metrics_to_assert [list network-bytes-out]
+ R 0 CONFIG SET cluster-slot-stats-enabled yes
+
+ test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, single channel." {
+ set slot [R 0 cluster keyslot $channel]
+ set publisher [Rn 0]
+ set subscriber [redis_client]
+ set replica [redis_deferring_client -1]
+
+ # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes
+ $subscriber SSUBSCRIBE $channel
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-out 38
+ ]
+ ]
+ R 0 CONFIG RESETSTAT
+
+ # Publisher client) :1\r\n --> 4 bytes.
+ # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
+ assert_equal 1 [$publisher SPUBLISH $channel hello]
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create network-bytes-out 46 ;# 4 + 42 bytes.
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+ $subscriber QUIT
+ R 0 FLUSHALL
+ R 0 CONFIG RESETSTAT
+
+ test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, cross-slot channels." {
+ set slot [R 0 cluster keyslot $channel]
+ set publisher [Rn 0]
+ set subscriber [redis_client]
+ set replica [redis_deferring_client -1]
+
+ # Stack multi-slot subscriptions against a single client.
+ # For primary channel;
+ # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes
+ # For secondary channel;
+ # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$8\r\nchannel2\r\n:1\r\n --> 39 bytes
+ $subscriber SSUBSCRIBE $channel
+ $subscriber SSUBSCRIBE $channel_secondary
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create \
+ $key_slot [ \
+ dict create network-bytes-out 38
+ ] \
+ $key_slot_secondary [ \
+ dict create network-bytes-out 39
+ ]
+ ]
+ R 0 CONFIG RESETSTAT
+
+ # For primary channel;
+ # Publisher client) :1\r\n --> 4 bytes.
+ # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
+ # For secondary channel;
+ # Publisher client) :1\r\n --> 4 bytes.
+ # Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes.
+ assert_equal 1 [$publisher SPUBLISH $channel hello]
+ assert_equal 1 [$publisher SPUBLISH $channel_secondary hello]
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set expected_slot_stats [
+ dict create \
+ $key_slot [ \
+ dict create network-bytes-out 46 ;# 4 + 42 bytes.
+ ] \
+ $key_slot_secondary [ \
+ dict create network-bytes-out 47 ;# 4 + 43 bytes.
+ ]
+ ]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS key-count metric correctness.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+
+ # Define shared variables.
+ set key "FOO"
+ set key_slot [R 0 cluster keyslot $key]
+ set metrics_to_assert [list key-count]
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create key-count 1
+ ]
+ ]
+
+ test "CLUSTER SLOT-STATS contains default value upon redis-server startup" {
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+
+ test "CLUSTER SLOT-STATS contains correct metrics upon key introduction" {
+ R 0 SET $key TEST
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+
+ test "CLUSTER SLOT-STATS contains correct metrics upon key mutation" {
+ R 0 SET $key NEW_VALUE
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+
+ test "CLUSTER SLOT-STATS contains correct metrics upon key deletion" {
+ R 0 DEL $key
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats $slot_stats $metrics_to_assert
+ }
+
+ test "CLUSTER SLOT-STATS slot visibility based on slot ownership changes" {
+ R 0 CONFIG SET cluster-require-full-coverage no
+
+ R 0 CLUSTER DELSLOTS $key_slot
+ set expected_slots [initialize_expected_slots_dict]
+ dict unset expected_slots $key_slot
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert {[dict size $expected_slots] == 16383}
+ assert_slot_visibility $slot_stats $expected_slots
+
+ R 0 CLUSTER ADDSLOTS $key_slot
+ set expected_slots [initialize_expected_slots_dict]
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert {[dict size $expected_slots] == 16384}
+ assert_slot_visibility $slot_stats $expected_slots
+ }
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS SLOTSRANGE sub-argument.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster}} {
+
+ test "CLUSTER SLOT-STATS SLOTSRANGE all slots present" {
+ set start_slot 100
+ set end_slot 102
+ set expected_slots [initialize_expected_slots_dict_with_range $start_slot $end_slot]
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE $start_slot $end_slot]
+ assert_slot_visibility $slot_stats $expected_slots
+ }
+
+ test "CLUSTER SLOT-STATS SLOTSRANGE some slots missing" {
+ set start_slot 100
+ set end_slot 102
+ set expected_slots [initialize_expected_slots_dict_with_range $start_slot $end_slot]
+
+ R 0 CLUSTER DELSLOTS $start_slot
+ dict unset expected_slots $start_slot
+
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE $start_slot $end_slot]
+ assert_slot_visibility $slot_stats $expected_slots
+ }
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS ORDERBY sub-argument.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+
+ set metrics [list "key-count" "memory-bytes" "cpu-usec" "network-bytes-in" "network-bytes-out"]
+
+ # SET keys for target hashslots, to encourage ordering.
+ set hash_tags [list 0 1 2 3 4]
+ set num_keys 1
+ foreach hash_tag $hash_tags {
+ for {set i 0} {$i < $num_keys} {incr i 1} {
+ R 0 SET "$i{$hash_tag}" VALUE
+ }
+ incr num_keys 1
+ }
+
+ # SET keys for random hashslots, for random noise.
+ set num_keys 0
+ while {$num_keys < 1000} {
+ set random_key [randomInt 16384]
+ R 0 SET $random_key VALUE
+ incr num_keys 1
+ }
+
+ test "CLUSTER SLOT-STATS ORDERBY DESC correct ordering" {
+ foreach orderby $metrics {
+ set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC]
+ assert_slot_stats_monotonic_descent $slot_stats $orderby
+ }
+ }
+
+ test "CLUSTER SLOT-STATS ORDERBY ASC correct ordering" {
+ foreach orderby $metrics {
+ set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC]
+ assert_slot_stats_monotonic_ascent $slot_stats $orderby
+ }
+ }
+
+ test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is less than number of assigned slots" {
+ R 0 FLUSHALL SYNC
+ R 0 CONFIG RESETSTAT
+
+ foreach orderby $metrics {
+ set limit 5
+ set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
+ set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
+ set slot_stats_desc_length [llength $slot_stats_desc]
+ set slot_stats_asc_length [llength $slot_stats_asc]
+ assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}
+
+ # All slot statistics have been reset to 0, so we will order by slot in ascending order.
+ set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
+ assert_slot_visibility $slot_stats_desc $expected_slots
+ assert_slot_visibility $slot_stats_asc $expected_slots
+ }
+ }
+
+ test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is greater than number of assigned slots" {
+ R 0 CONFIG SET cluster-require-full-coverage no
+ R 0 FLUSHALL SYNC
+ R 0 CLUSTER FLUSHSLOTS
+ R 0 CLUSTER ADDSLOTS 100 101
+
+ foreach orderby $metrics {
+ set num_assigned_slots 2
+ set limit 5
+ set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
+ set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
+ set slot_stats_desc_length [llength $slot_stats_desc]
+ set slot_stats_asc_length [llength $slot_stats_asc]
+ set expected_response_length [expr min($num_assigned_slots, $limit)]
+ assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length}
+
+ set expected_slots [dict create 100 0 101 0]
+ assert_slot_visibility $slot_stats_desc $expected_slots
+ assert_slot_visibility $slot_stats_asc $expected_slots
+ }
+ }
+
+ test "CLUSTER SLOT-STATS ORDERBY arg sanity check." {
+ # Non-existent argument.
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count non-existent-arg}
+ # Negative LIMIT.
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count DESC LIMIT -1}
+ # Non-existent ORDERBY metric.
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY non-existent-metric}
+ # When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics.
+ R 0 CONFIG SET cluster-slot-stats-enabled no
+ set orderby "cpu-usec"
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
+ set orderby "network-bytes-in"
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
+ set orderby "network-bytes-out"
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
+ set orderby "memory-bytes"
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
+
+ # When only cpu net is enabled, memory-bytes ORDERBY should fail
+ R 0 CONFIG SET cluster-slot-stats-enabled "cpu net"
+ assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY memory-bytes}
+ }
+
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS replication.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+
+ # Define shared variables.
+ set key "key"
+ set key_slot [R 0 CLUSTER KEYSLOT $key]
+ set primary [Rn 0]
+ set replica [Rn 1]
+
+ # For replication, assertions are split between deterministic and non-deterministic metrics.
+ # * For deterministic metrics, strict equality assertions are made.
+ # * For non-deterministic metrics, non-zeroness assertions are made.
+ # Non-zeroness as in, both primary and replica should either have some value, or no value at all.
+ #
+ # * key-count is deterministic between primary and its replica.
+ # * cpu-usec is non-deterministic between primary and its replica.
+ # * network-bytes-in is deterministic between primary and its replica.
+ # * network-bytes-out will remain empty in the replica, since primary client do not receive replies, unless for replicationSendAck().
+ set deterministic_metrics [list key-count network-bytes-in]
+ set non_deterministic_metrics [list cpu-usec]
+ set empty_metrics [list network-bytes-out]
+
+ # Setup replication.
+ assert {[s -1 role] eq {slave}}
+ wait_for_condition 1000 50 {
+ [s -1 master_link_status] eq {up}
+ } else {
+ fail "Instance #1 master link status is not up"
+ }
+ R 1 readonly
+
+ test "CLUSTER SLOT-STATS metrics replication for new keys" {
+ # *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
+ R 0 SET $key VALUE
+
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create key-count 1 network-bytes-in 33
+ ]
+ ]
+ set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
+
+ wait_for_condition 500 10 {
+ [string match {*calls=1,*} [cmdrstat set $replica]]
+ } else {
+ fail "Replica did not receive the command."
+ }
+ set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
+ assert_empty_slot_stats $slot_stats_replica $empty_metrics
+ }
+ R 0 CONFIG RESETSTAT
+ R 1 CONFIG RESETSTAT
+
+ test "CLUSTER SLOT-STATS metrics replication for existing keys" {
+ # *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$13\r\nvalue_updated\r\n --> 42 bytes.
+ R 0 SET $key VALUE_UPDATED
+
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create key-count 1 network-bytes-in 42
+ ]
+ ]
+ set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
+
+ wait_for_condition 500 10 {
+ [string match {*calls=1,*} [cmdrstat set $replica]]
+ } else {
+ fail "Replica did not receive the command."
+ }
+ set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
+ assert_empty_slot_stats $slot_stats_replica $empty_metrics
+ }
+ R 0 CONFIG RESETSTAT
+ R 1 CONFIG RESETSTAT
+
+ test "CLUSTER SLOT-STATS metrics replication for deleting keys" {
+ # *2\r\n$3\r\ndel\r\n$3\r\nkey\r\n --> 22 bytes.
+ R 0 DEL $key
+
+ set expected_slot_stats [
+ dict create $key_slot [
+ dict create key-count 0 network-bytes-in 22
+ ]
+ ]
+ set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
+
+ wait_for_condition 500 10 {
+ [string match {*calls=1,*} [cmdrstat del $replica]]
+ } else {
+ fail "Replica did not receive the command."
+ }
+ set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
+ assert_empty_slot_stats $slot_stats_replica $empty_metrics
+ }
+ R 0 CONFIG RESETSTAT
+ R 1 CONFIG RESETSTAT
+}
+
+start_cluster 2 2 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+ test "CLUSTER SLOT-STATS reset upon atomic slot migration" {
+ # key on slot-0
+ set key0 "{06S}mykey0"
+ set key0_slot [R 0 CLUSTER KEYSLOT $key0]
+ R 0 SET $key0 VALUE
+
+ # Migrate slot-0 to node-1
+ R 1 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 1 cluster_slot_migration_active_tasks] == 0
+ } else {
+ fail "ASM tasks did not complete"
+ }
+
+ set expected_slot_stats [
+ dict create \
+ $key0_slot [ \
+ dict create key-count 1 \
+ dict create cpu-usec 0 \
+ dict create network-bytes-in 0 \
+ dict create network-bytes-out 0 \
+ ]
+ ]
+ set metrics_to_assert [list key-count cpu-usec network-bytes-in network-bytes-out]
+
+ # Verify metrics are reset except key-count
+ set slot_stats [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 0]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+
+ # Migrate slot-0 back to node-0
+ R 0 CLUSTER MIGRATION IMPORT 0 0
+ wait_for_condition 1000 10 {
+ [CI 0 cluster_slot_migration_active_tasks] == 0 &&
+ [CI 1 cluster_slot_migration_active_tasks] == 0
+ } else {
+ fail "ASM tasks did not complete"
+ }
+
+ # Verify metrics are reset except key-count
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 0]
+ assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
+ }
+}
+
+# -----------------------------------------------------------------------------
+# Test cases for CLUSTER SLOT-STATS memory-bytes field presence.
+# -----------------------------------------------------------------------------
+
+start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
+ # Define shared variables.
+ set key "FOO"
+ set key_slot [R 0 cluster keyslot $key]
+
+ test "CLUSTER SLOT-STATS memory-bytes field present when cluster-slot-stats-enabled set on startup" {
+ R 0 SET $key VALUE
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ # Verify memory-bytes field is present
+ assert {[dict exists $slot_stats $key_slot]}
+ set stats [dict get $slot_stats $key_slot]
+ assert {[dict exists $stats memory-bytes]}
+ assert {[dict get $stats memory-bytes] > 0}
+ }
+
+ test "CLUSTER SLOT-STATS net mem combination shows only net and mem stats" {
+ R 0 CONFIG SET cluster-slot-stats-enabled "net mem"
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ set stats [dict get $slot_stats $key_slot]
+ assert {[dict exists $stats memory-bytes]}
+ assert {[dict exists $stats network-bytes-in]}
+ assert {[dict exists $stats network-bytes-out]}
+ assert {![dict exists $stats cpu-usec]}
+ }
+
+ test "CLUSTER SLOT-STATS cpu mem combination shows only cpu and mem stats" {
+ R 0 CONFIG SET cluster-slot-stats-enabled "cpu mem"
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ set stats [dict get $slot_stats $key_slot]
+ assert {[dict exists $stats memory-bytes]}
+ assert {[dict exists $stats cpu-usec]}
+ assert {![dict exists $stats network-bytes-in]}
+ assert {![dict exists $stats network-bytes-out]}
+
+ # Restore to yes for subsequent tests
+ R 0 CONFIG SET cluster-slot-stats-enabled yes
+ }
+
+ test "CLUSTER SLOT-STATS memory-bytes field not present after disabling cluster-slot-stats-enabled" {
+ R 0 CONFIG SET cluster-slot-stats-enabled no
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ # Verify memory-bytes field is not present after disabling config
+ # (memory tracking is disabled when MEM flag is removed)
+ assert {[dict exists $slot_stats $key_slot]}
+ set stats [dict get $slot_stats $key_slot]
+ assert {![dict exists $stats memory-bytes]}
+
+ # Verify other stats fields are not present
+ assert {![dict exists $stats cpu-usec]}
+ assert {![dict exists $stats network-bytes-in]}
+ assert {![dict exists $stats network-bytes-out]}
+ }
+
+ test "CLUSTER SLOT-STATS memory tracking cannot be re-enabled after being disabled" {
+ # Once memory tracking is disabled, it cannot be re-enabled at runtime
+ assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled yes}
+ assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled mem}
+
+ # But cpu and net can still be enabled
+ R 0 CONFIG SET cluster-slot-stats-enabled "cpu net"
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ assert {[dict exists $slot_stats $key_slot]}
+ set stats [dict get $slot_stats $key_slot]
+ assert {![dict exists $stats memory-bytes]}
+ assert {[dict exists $stats cpu-usec]}
+ assert {[dict exists $stats network-bytes-in]}
+ assert {[dict exists $stats network-bytes-out]}
+ }
+}
+
+start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled no}} {
+ # Define shared variables.
+ set key "FOO"
+ set key_slot [R 0 cluster keyslot $key]
+
+ test "CLUSTER SLOT-STATS memory-bytes field not present when cluster-slot-stats-enabled not set on startup" {
+ R 0 SET $key VALUE
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ # Verify memory-bytes field is not present
+ assert {[dict exists $slot_stats $key_slot]}
+ set stats [dict get $slot_stats $key_slot]
+ assert {![dict exists $stats memory-bytes]}
+
+ # Only key-count should be present
+ assert {[dict exists $stats key-count]}
+ assert {[dict get $stats key-count] == 1}
+ }
+
+ test "CLUSTER SLOT-STATS enabling mem at runtime fails when not enabled at startup" {
+ # Trying to enable memory tracking at runtime should fail
+ assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled mem}
+ assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled yes}
+ assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled "cpu net mem"}
+ }
+
+ test "CLUSTER SLOT-STATS enabling cpu and net at runtime works" {
+ R 0 CONFIG SET cluster-slot-stats-enabled "cpu net"
+ set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
+ set slot_stats [convert_array_into_dict $slot_stats]
+
+ # Verify memory-bytes field is still not present
+ assert {[dict exists $slot_stats $key_slot]}
+ set stats [dict get $slot_stats $key_slot]
+ assert {![dict exists $stats memory-bytes]}
+
+ # Other stats fields should now be present
+ assert {[dict exists $stats cpu-usec]}
+ assert {[dict exists $stats network-bytes-in]}
+ assert {[dict exists $stats network-bytes-out]}
+ }
+}