From 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda Mon Sep 17 00:00:00 2001 From: Mitja Felicijan Date: Wed, 21 Jan 2026 22:40:55 +0100 Subject: Add Redis source code for testing --- .../tests/unit/cluster/announced-endpoints.tcl | 75 + .../tests/unit/cluster/atomic-slot-migration.tcl | 3063 ++++++++++++++++++++ examples/redis-unstable/tests/unit/cluster/cli.tcl | 415 +++ .../tests/unit/cluster/cluster-response-tls.tcl | 110 + .../tests/unit/cluster/failure-marking.tcl | 53 + .../tests/unit/cluster/hostnames.tcl | 230 ++ .../unit/cluster/human-announced-nodename.tcl | 29 + .../tests/unit/cluster/internal-secret.tcl | 71 + .../redis-unstable/tests/unit/cluster/links.tcl | 292 ++ .../redis-unstable/tests/unit/cluster/misc.tcl | 36 + .../tests/unit/cluster/multi-slot-operations.tcl | 182 ++ .../tests/unit/cluster/scripting.tcl | 91 + .../tests/unit/cluster/sharded-pubsub.tcl | 67 + .../tests/unit/cluster/slot-ownership.tcl | 61 + .../tests/unit/cluster/slot-stats.tcl | 1169 ++++++++ 15 files changed, 5944 insertions(+) create mode 100644 examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/cli.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/failure-marking.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/hostnames.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/internal-secret.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/links.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/misc.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/scripting.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl create mode 100644 examples/redis-unstable/tests/unit/cluster/slot-stats.tcl (limited to 'examples/redis-unstable/tests/unit/cluster') diff --git a/examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl b/examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl new file mode 100644 index 0000000..a37ca58 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/announced-endpoints.tcl @@ -0,0 +1,75 @@ +start_cluster 2 2 {tags {external:skip cluster}} { + + test "Test change cluster-announce-port and cluster-announce-tls-port at runtime" { + if {$::tls} { + set baseport [lindex [R 0 config get tls-port] 1] + } else { + set baseport [lindex [R 0 config get port] 1] + } + set count [expr [llength $::servers] + 1] + set used_port [find_available_port $baseport $count] + + R 0 config set cluster-announce-tls-port $used_port + R 0 config set cluster-announce-port $used_port + + assert_match "*:$used_port@*" [R 0 CLUSTER NODES] + wait_for_condition 50 100 { + [string match "*:$used_port@*" [R 1 CLUSTER NODES]] + } else { + fail "Cluster announced port was not propagated via gossip" + } + + R 0 config set cluster-announce-tls-port 0 + R 0 config set cluster-announce-port 0 + assert_match "*:$baseport@*" [R 0 CLUSTER NODES] + } + + test "Test change cluster-announce-bus-port at runtime" { + if {$::tls} { + set baseport [lindex [R 0 config get tls-port] 1] + } else { + set baseport [lindex [R 0 config get port] 1] + } + set count [expr [llength $::servers] + 1] + set used_port [find_available_port $baseport $count] + + # Verify config set cluster-announce-bus-port + R 0 config set cluster-announce-bus-port $used_port + assert_match "*@$used_port *" [R 0 CLUSTER NODES] + wait_for_condition 50 100 { + [string match "*@$used_port *" [R 1 CLUSTER NODES]] + } else { + fail "Cluster announced port was not propagated via gossip" + } + + # Verify restore default cluster-announce-port + set base_bus_port [expr $baseport + 10000] + R 0 config set cluster-announce-bus-port 0 + assert_match "*@$base_bus_port *" [R 0 CLUSTER NODES] + } + + test "CONFIG SET port updates cluster-announced port" { + set count [expr [llength $::servers] + 1] + # Get the original port and change to new_port + if {$::tls} { + set orig_port [lindex [R 0 config get tls-port] 1] + } else { + set orig_port [lindex [R 0 config get port] 1] + } + assert {$orig_port != ""} + set new_port [find_available_port $orig_port $count] + + if {$::tls} { + R 0 config set tls-port $new_port + } else { + R 0 config set port $new_port + } + + # Verify that the new port appears in the output of cluster slots + wait_for_condition 50 100 { + [string match "*$new_port*" [R 0 cluster slots]] + } else { + fail "Cluster announced port was not updated in cluster slots" + } + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl new file mode 100644 index 0000000..f04257f --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl @@ -0,0 +1,3063 @@ +set ::slot_prefixes [dict create \ + 0 "{06S}" \ + 1 "{Qi}" \ + 2 "{5L5}" \ + 3 "{4Iu}" \ + 4 "{4gY}" \ + 5 "{460}" \ + 6 "{1Y7}" \ + 7 "{1LV}" \ + 101 "{1j2}" \ + 102 "{75V}" \ + 103 "{bno}" \ + 5462 "{450}"\ + 5463 "{4dY}"\ + 6000 "{4L7}" \ + 6001 "{4YV}" \ + 6002 "{0bx}" \ + 6003 "{AJ}" \ + 6004 "{of}" \ + 16383 "{6ZJ}" \ +] + +# Helper functions +proc get_port {node_id} { + if {$::tls} { + return [lindex [R $node_id config get tls-port] 1] + } else { + return [lindex [R $node_id config get port] 1] + } +} + +# return the prefix for the given slot +proc slot_prefix {slot} { + return [dict get $::slot_prefixes $slot] +} + +# return a key for the given slot +proc slot_key {slot {suffix ""}} { + return "[slot_prefix $slot]$suffix" +} + +# Populate a slot with keys +# TODO: Consider merging with populate() +proc populate_slot {num args} { + # Default values + set prefix "key:" + set size 3 + set idx 0 + set prints false + set expires 0 + set slot -1 + + # Parse named arguments + foreach {key value} $args { + switch -- $key { + -prefix { set prefix $value } + -size { set size $value } + -idx { set idx $value } + -prints { set prints $value } + -expires { set expires $value } + -slot { set slot $value } + default { error "Unknown option: $key" } + } + } + + # If slot is specified, use slot prefix from table + if {$slot >= 0} { + if {[dict exists $::slot_prefixes $slot]} { + set prefix [dict get $::slot_prefixes $slot] + } else { + error "Slot $slot not supported in slot_prefixes table, add it manually" + } + } + + R $idx deferred 1 + if {$num > 16} {set pipeline 16} else {set pipeline $num} + set val [string repeat A $size] + for {set j 0} {$j < $pipeline} {incr j} { + if {$expires > 0} { + R $idx set $prefix$j $val ex $expires + } else { + R $idx set $prefix$j $val + } + if {$prints} {puts $j} + } + for {} {$j < $num} {incr j} { + if {$expires > 0} { + R $idx set $prefix$j $val ex $expires + } else { + R $idx set $prefix$j $val + } + R $idx read + if {$prints} {puts $j} + } + for {set j 0} {$j < $pipeline} {incr j} { + R $idx read + if {$prints} {puts $j} + } + R $idx deferred 0 +} + +# Return 1 if all instances are idle +proc asm_all_instances_idle {total} { + for {set i 0} {$i < $total} {incr i} { + if {[CI $i cluster_slot_migration_active_tasks] != 0} { return 0 } + if {[CI $i cluster_slot_migration_active_trim_running] != 0} { return 0 } + } + return 1 +} + +# Wait for all ASM tasks to complete in the cluster +proc wait_for_asm_done {} { + set total_instances [expr {$::cluster_master_nodes + $::cluster_replica_nodes}] + + wait_for_condition 1000 10 { + [asm_all_instances_idle $total_instances] == 1 + } else { + # Print the number of active tasks on each instance + for {set i 0} {$i < $total_instances} {incr i} { + set migration_count [CI $i cluster_slot_migration_active_tasks] + set trim_count [CI $i cluster_slot_migration_active_trim_running] + puts "Instance $i: migration_tasks=$migration_count, trim_tasks=$trim_count" + } + fail "ASM tasks did not complete on all instances" + } + # wait all nodes to reach the same cluster config after ASM + wait_for_cluster_propagation +} + +proc failover_and_wait_for_done {node_id {failover_arg ""}} { + set max_attempts 5 + for {set attempt 1} {$attempt <= $max_attempts} {incr attempt} { + if {$failover_arg eq ""} { + R $node_id cluster failover + } else { + R $node_id cluster failover $failover_arg + } + + set completed 1 + wait_for_condition 1000 10 { + [string match "*master*" [R $node_id role]] + } else { + set completed 0 + } + + if {$completed} { + wait_for_cluster_propagation + return + } + } + fail "Failover did not complete after $max_attempts attempts for node $node_id" +} + +proc migration_status {node_id task_id field} { + set status [R $node_id CLUSTER MIGRATION STATUS ID $task_id] + + # STATUS ID returns single task, so get first element + if {[llength $status] == 0} { + return "" + } + + set task_status [lindex $status 0] + set field_value "" + + # Parse the key-value pairs in the task + for {set i 0} {$i < [llength $task_status]} {incr i 2} { + set key [lindex $task_status $i] + set value [lindex $task_status [expr $i + 1]] + + if {$key eq $field} { + set field_value $value + break + } + } + + return $field_value +} + +# Setup slot migration test with keys and delay, then start migration +# Returns the task_id for the migration +proc setup_slot_migration_with_delay {src_node dst_node start_slot end_slot {keys 2} {delay 1000000}} { + # Two keys on the start slot + populate_slot $keys -idx $src_node -slot $start_slot + + # we set a delay to ensure migration takes time for testing, + # with default parameters, two keys cost 2s to save + R $src_node config set rdb-key-save-delay $delay + + # migrate slot range from src_node to dst_node + set task_id [R $dst_node CLUSTER MIGRATION IMPORT $start_slot $end_slot] + wait_for_condition 2000 10 { + [string match {*send-bulk-and-stream*} [migration_status $src_node $task_id state]] + } else { + fail "ASM task did not start" + } + + return $task_id +} + +# Helper function to clear module internal event logs +proc clear_module_event_log {} { + for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} { + R $i asm.clear_event_log + } +} + +proc reset_default_trim_method {} { + for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} { + R $i debug asm-trim-method default + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} { + foreach trim_method {"active" "bg"} { + test "Simple slot migration (trim method: $trim_method)" { + R 0 debug asm-trim-method $trim_method + R 3 debug asm-trim-method $trim_method + + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "a" + set slot1_key [slot_key 1 mykey] + R 0 set $slot1_key "b" + set slot101_key [slot_key 101 mykey] + R 0 set $slot101_key "c" + # 3 keys cost 3s to save + R 0 config set rdb-key-save-delay 1000000 + + # load a function + R 0 function load {#!lua name=test1 + redis.register_function('test1', function() return 'hello1' end) + } + + # migrate slot 0-100 to R 1 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + # migration is start, and in accumulating buffer stage + wait_for_condition 1000 50 { + [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] && + [string match {*accumulate-buffer*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not start" + } + + # append 99 times during migration + for {set i 0} {$i < 99} {incr i} { + R 0 multi + R 0 append $slot0_key "a" + R 0 exec + R 0 append $slot1_key "b" + R 0 append $slot101_key "c" + } + + # wait until migration of 0-100 successful + wait_for_asm_done + + # verify task state became completed + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + + # the appended 99 times should also be migrated + assert_equal [string repeat a 100] [R 1 get $slot0_key] + assert_equal [string repeat b 100] [R 1 get $slot1_key] + + # function should be migrated + assert_equal [R 0 function dump] [R 1 function dump] + # the slave should also get the data + wait_for_ofs_sync [Rn 1] [Rn 4] + + R 4 readonly + assert_equal [string repeat a 100] [R 4 get $slot0_key] + assert_equal [string repeat b 100] [R 4 get $slot1_key] + assert_equal [R 0 function dump] [R 4 function dump] + + # verify key that was not in the slot range is not migrated + assert_equal [string repeat c 100] [R 0 get $slot101_key] + # verify changes in replica + wait_for_ofs_sync [Rn 0] [Rn 3] + R 3 readonly + assert_equal [string repeat c 100] [R 3 get $slot101_key] + + # cleanup + R 0 config set rdb-key-save-delay 0 + R 0 flushall + R 0 function flush + R 1 flushall + R 1 function flush + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + } +} + +# Skip most of the tests when running under valgrind since it is hard to +# stabilize tests under valgrind. +if {!$::valgrind} { +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} { + test "Test CLUSTER MIGRATION IMPORT input validation" { + # invalid arguments + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100 200 300} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION UNKNOWN 1 2} + + # invalid slot range + assert_error {*greater than end slot number*} {R 0 CLUSTER MIGRATION IMPORT 200 100} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 17000 18000} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 14000 18000} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 16384} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 -1} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -1 2} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -2 -1} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 10 a} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT sd sd} + assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 200} + } + + test "Test CLUSTER MIGRATION CANCEL input validation" { + # invalid arguments + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID 12345 EXTRAARG} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ALL EXTRAARG} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL UNKNOWNARG} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL abc def} + # empty string id should not cancel any task + assert_equal 0 [R 0 CLUSTER MIGRATION CANCEL ID ""] + } + + test "Test CLUSTER MIGRATION STATUS input validation" { + # invalid arguments + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID id EXTRAARG} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ALL EXTRAARG} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS ABC DEF} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS UNKNOWNARG} + # empty string id should not list any task + assert_equal {} [R 0 CLUSTER MIGRATION STATUS ID ""] + } + + test "Test TRIMSLOTS input validation" { + # Wrong number of arguments + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 1} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 2 100} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 17000 1} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES abc} + + # Missing ranges argument + assert_error {*missing ranges argument*} {R 0 TRIMSLOTS UNKNOWN 1 100 200} + + # Invalid number of ranges + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 0 1 1} + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES -1 2 2} + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 17000 1 2} + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 2 100 200 300} + + # Invalid slot numbers + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -1 0} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -2 -1} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 0 16384} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 abc def} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 100 abc} + + # Start slot greater than end slot + assert_error {*greater than end slot number*} {R 0 TRIMSLOTS RANGES 1 200 100} + } + + test "Test IMPORT not allowed on replica" { + assert_error {* not allowed on replica*} {R 4 CLUSTER MIGRATION IMPORT 100 200} + } + + test "Test IMPORT not allowed during manual migration" { + set dst_id [R 1 CLUSTER MYID] + + # Set a slot to IMPORTING + R 0 CLUSTER SETSLOT 15000 IMPORTING $dst_id + assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200} + # Revert the change + R 0 CLUSTER SETSLOT 15000 STABLE + + # Same test with setting a slot to MIGRATING + R 0 CLUSTER SETSLOT 5000 MIGRATING $dst_id + assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200} + # Revert the change + R 0 CLUSTER SETSLOT 5000 STABLE + } + + test "Test IMPORT not allowed if the node is already the owner" { + assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 100} + } + + test "Test IMPORT not allowed for a slot without an owner" { + # Slot will have no owner + R 0 CLUSTER DELSLOTS 5000 + + assert_error {*slot has no owner: 5000*} {R 0 CLUSTER MIGRATION IMPORT 5000 5000} + + # Revert the change + R 0 CLUSTER ADDSLOTS 5000 + } + + test "Test IMPORT not allowed if slot ranges belong to different nodes" { + assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 15000} + assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 14000 15000} + } + + test "Test IMPORT not allowed if slot is given multiple times" { + assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 8000 9000} + assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 7900 9000} + } + + test "Test CLUSTER MIGRATION STATUS ALL lists all tasks" { + # Create 3 completed tasks + R 0 CLUSTER MIGRATION IMPORT 7000 7001 + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 7002 7003 + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 7004 7005 + wait_for_asm_done + + # Get node IDs for verification + set node0_id [R 0 cluster myid] + set node1_id [R 1 cluster myid] + + # Verify CLUSTER MIGRATION STATUS ALL reply from both nodes + foreach node_idx {0 1} { + set tasks [R $node_idx CLUSTER MIGRATION STATUS ALL] + assert_equal 3 [llength $tasks] + + for {set i 0} {$i < 3} {incr i} { + set task [lindex $tasks $i] + + # Verify field order + set expected_fields {id slots source dest operation state + last_error retries create_time start_time + end_time write_pause_ms} + for {set j 0} {$j < [llength $expected_fields]} {incr j} { + set expected_field [lindex $expected_fields $j] + set actual_field [lindex $task [expr $j * 2]] + assert_equal $expected_field $actual_field + } + + # Verify basic fields + assert_equal "completed" [dict get $task state] + assert_equal "" [dict get $task last_error] + assert_equal 0 [dict get $task retries] + assert {[dict get $task write_pause_ms] >= 0} + + # Verify operation based on node + if {$node_idx == 0} { + assert_equal "import" [dict get $task operation] + } else { + assert_equal "migrate" [dict get $task operation] + } + + # Verify node IDs (all tasks: node1 -> node0) + assert_equal $node1_id [dict get $task source] + assert_equal $node0_id [dict get $task dest] + + # Verify timestamps exist and are reasonable + set create_time [dict get $task create_time] + set start_time [dict get $task start_time] + set end_time [dict get $task end_time] + assert {$create_time > 0} + assert {$start_time >= $create_time} + assert {$end_time >= $start_time} + + # Verify specific slot ranges for each task + set slots [dict get $task slots] + if {$i == 0} { + assert_equal "7004-7005" $slots + } elseif {$i == 1} { + assert_equal "7002-7003" $slots + } elseif {$i == 2} { + assert_equal "7000-7001" $slots + } + } + } + + # cleanup + R 1 CLUSTER MIGRATION IMPORT 7000 7005 + wait_for_asm_done + } + + test "Test IMPORT not allowed if there is an overlapping import" { + # Let slot migration take long time, so that we can test overlapping import + R 1 config set rdb-key-save-delay 1000000 + R 1 set tag22273 tag22273 ;# slot hash is 7000 + R 1 set tag9283 tag9283 ;# slot hash is 8000 + + set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 8000] + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 8000 9000} + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 7500 8500} + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6000 7000} + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6500 7500} + + wait_for_condition 1000 50 { + [string match {*completed*} [migration_status 0 $task_id state]] && + [string match {*completed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not start" + } + assert_equal "tag22273" [R 0 get tag22273] + assert_equal "tag9283" [R 0 get tag9283] + R 1 config set rdb-key-save-delay 0 + + # revert the migration + R 1 CLUSTER MIGRATION IMPORT 7000 8000 + wait_for_asm_done + } + + test "Test IMPORT with unsorted and adjacent ranges" { + # Redis should sort and merge adjacent ranges + # Adjacent means: prev.end + 1 == next.start + # e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005 + + # Test with adjacent ranges + set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7000-7100" [migration_status 0 $task_id slots] + assert_equal "7000-7100" [migration_status 1 $task_id slots] + + # Test with unsorted and adjacent ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots] + assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots] + + # Another test with unsorted and adjacent ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7006-7009" [migration_status 0 $task_id slots] + assert_equal "7006-7009" [migration_status 1 $task_id slots] + } + + test "Simple slot migration with write load" { + # Perform slot migration while traffic is on and verify data consistency. + # Trimming is disabled on source nodes so, we can compare the dbs after + # migration via DEBUG DIGEST to ensure no data loss during migration. + # Steps: + # 1. Disable trimming on both nodes + # 2. Populate slot 0 on node-0 and slot 6000 on node-1 + # 2. Start write traffic on both nodes + # 3. Migrate slot 0 from node-0 to node-1 + # 4. Migrate slot 6000 from node-1 to node-0 + # 5. Stop write traffic, verify db's are identical. + + # This test runs slowly under the thread sanitizer. + # 1. Increase the lag threshold from the default 1 MB to 10 MB to let the destination catch up easily. + # 2. Increase the write pause timeout from the default 10s to 60s so the source can wait longer. + set prev_config_lag [lindex [R 0 config get cluster-slot-migration-handoff-max-lag-bytes] 1] + R 0 config set cluster-slot-migration-handoff-max-lag-bytes 10mb + R 1 config set cluster-slot-migration-handoff-max-lag-bytes 10mb + set prev_config_timeout [lindex [R 0 config get cluster-slot-migration-write-pause-timeout] 1] + R 0 config set cluster-slot-migration-write-pause-timeout 60000 + R 1 config set cluster-slot-migration-write-pause-timeout 60000 + + R 0 flushall + R 0 debug asm-trim-method none + populate_slot 10000 -idx 0 -slot 0 + + R 1 flushall + R 1 debug asm-trim-method none + populate_slot 10000 -idx 1 -slot 6000 + + # Start write traffic on node-0 + # Throws -MOVED error once asm is completed, catch block will ignore it. + catch { + # Start the slot 0 write load on the R 0 + set port [get_port 0] + set key [slot_key 0 mykey] + set load_handle0 [start_write_load "127.0.0.1" $port 100 $key 0 5] + } + + # Start write traffic on node-1 + # Throws -MOVED error once asm is completed, catch block will ignore it. + catch { + # Start the slot 6000 write load on the R 1 + set port [get_port 1] + set key [slot_key 6000 mykey] + set load_handle1 [start_write_load "127.0.0.1" $port 100 $key 0 5] + } + + # Migrate keys + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 6000 6100 + wait_for_asm_done + + stop_write_load $load_handle0 + stop_write_load $load_handle1 + + # verify data + assert_morethan [R 0 dbsize] 0 + assert_equal [R 0 debug digest] [R 1 debug digest] + + # cleanup + R 0 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag + R 0 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout + R 0 debug asm-trim-method default + R 0 flushall + R 1 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag + R 1 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout + R 1 debug asm-trim-method default + R 1 flushall + + R 1 CLUSTER MIGRATION IMPORT 6000 6100 + wait_for_asm_done + } + + test "Verify expire time is migrated correctly" { + R 0 flushall + R 1 flushall + + set string_key [slot_key 0 string_key] + set list_key [slot_key 0 list_key] + set hash_key [slot_key 0 hash_key] + set stream_key [slot_key 0 stream_key] + + for {set i 0} {$i < 20} {incr i} { + R 1 hset $hash_key $i $i + R 1 xadd $stream_key * item $i + } + for {set i 0} {$i < 2000} {incr i} { + R 1 lpush $list_key $i + } + + # set expire time of some keys + R 1 set $string_key "a" EX 1000 + R 1 EXPIRE $list_key 1000 + R 1 EXPIRE $hash_key 1000 + + # migrate slot 0-100 to R 0 + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + + # check expire times are migrated correctly + assert_range [R 0 ttl $string_key] 900 1000 + assert_range [R 0 ttl $list_key] 900 1000 + assert_range [R 0 ttl $hash_key] 900 1000 + assert_equal -1 [R 0 ttl $stream_key] + + # cleanup + R 0 flushall + R 1 flushall + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + + test "Slot migration with complex data types can work well" { + R 0 flushall + R 1 flushall + + set list_key [slot_key 0 list_key] + set set_key [slot_key 0 set_key] + set zset_key [slot_key 0 zset_key] + set hash_key [slot_key 0 hash_key] + set stream_key [slot_key 0 stream_key] + + # generate big keys for each data type + for {set i 0} {$i < 1000} {incr i} { + R 1 lpush $list_key $i + R 1 sadd $set_key $i + R 1 zadd $zset_key $i $i + R 1 hset $hash_key $i $i + R 1 xadd $stream_key * item $i + } + + # migrate slot 0-100 to R 0 + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + # check the data on destination node is correct + assert_equal 1000 [R 0 llen $list_key] + assert_equal 1000 [R 0 scard $set_key] + assert_equal 1000 [R 0 zcard $zset_key] + assert_equal 1000 [R 0 hlen $hash_key] + assert_equal 1000 [R 0 xlen $stream_key] + # migrate slot 0-100 to R 1 + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + + proc asm_basic_error_handling_test {operation channel all_states} { + foreach state $all_states { + if {$::verbose} { puts "Testing $operation $channel channel with state: $state"} + + # For states that need incremental data streaming, set a longer delay + set streaming_states [list "streaming-buffer" "accumulate-buffer" "send-bulk-and-stream" "send-stream"] + if {$state in $streaming_states} { + R 1 config set rdb-key-save-delay 1000000 + } + + # Let the destination node take time to stream buffer, so the source node will handle + # slot snapshot child process exit, and then enter "send-stream" state. + if {$state == "send-stream"} { + R 0 config set key-load-delay 100000 + } + + # Start the slot 0 write load on the R 1 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key 500] + + # clear old fail points and set the new fail point + assert_equal {OK} [R 0 debug asm-failpoint "" ""] + assert_equal {OK} [R 1 debug asm-failpoint "" ""] + if {$operation eq "import"} { + assert_equal {OK} [R 0 debug asm-failpoint "import-$channel-channel" $state] + } elseif {$operation eq "migrate"} { + assert_equal {OK} [R 1 debug asm-failpoint "migrate-$channel-channel" $state] + } else { + fail "Unknown operation: $operation" + } + + # Start the migration + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100] + + # The task should be failed due to the fail point + wait_for_condition 2000 10 { + [string match -nocase "*$channel*${state}*" [migration_status 0 $task_id last_error]] || + [string match -nocase "*$channel*${state}*" [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail with expected error - + (dst: [migration_status 0 $task_id last_error] + src: [migration_status 1 $task_id last_error] + expected: $channel $state)" + } + stop_write_load $load_handle + + # Cancel the task + R 0 CLUSTER MIGRATION CANCEL ID $task_id + R 1 CLUSTER MIGRATION CANCEL ID $task_id + + R 1 config set rdb-key-save-delay 0 + R 0 config set key-load-delay 0 + } + } + + test "Destination node main channel basic error-handling tests " { + set all_states [list \ + "connecting" \ + "auth-reply" \ + "handshake-reply" \ + "syncslots-reply" \ + "accumulate-buffer" \ + "streaming-buffer" \ + "wait-stream-eof" \ + ] + asm_basic_error_handling_test "import" "main" $all_states + } + + test "Destination node rdb channel basic error-handling tests" { + set all_states [list \ + "connecting" \ + "auth-reply" \ + "rdbchannel-reply" \ + "rdbchannel-transfer" \ + ] + asm_basic_error_handling_test "import" "rdb" $all_states + } + + test "Source node main channel basic error-handling tests " { + set all_states [list \ + "wait-rdbchannel" \ + "send-bulk-and-stream" \ + "send-stream" \ + "handoff" \ + ] + asm_basic_error_handling_test "migrate" "main" $all_states + } + + test "Source node rdb channel basic error-handling tests" { + set all_states [list \ + "wait-bgsave-start" \ + "send-bulk-and-stream" \ + ] + asm_basic_error_handling_test "migrate" "rdb" $all_states + } + + test "Migration will be successful after fail points are cleared" { + R 0 flushall + R 1 flushall + set slot0_key [slot_key 0 mykey] + set slot1_key [slot_key 1 mykey] + R 1 set $slot0_key "a" + R 1 set $slot1_key "b" + + # we set a delay to write incremental data + R 1 config set rdb-key-save-delay 1000000 + + # Start the slot 0 write load on the R 1 + set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key] + + # Clear all fail points + assert_equal {OK} [R 0 debug asm-failpoint "" ""] + assert_equal {OK} [R 1 debug asm-failpoint "" ""] + + # Start the migration + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100] + + # Wait for the migration to complete + wait_for_asm_done + + stop_write_load $load_handle + + # Verify the data is migrated, slot 0 and 1 should belong to R 1 + # slot 0 key should be changed by the write load + assert_not_equal "a" [R 0 get $slot0_key] + assert_equal "b" [R 0 get $slot1_key] + R 1 config set rdb-key-save-delay 0 + } + + test "Client output buffer limit is reached on source side" { + R 0 flushall + R 1 flushall + set r1_pid [S 1 process_id] + R 1 debug repl-pause on-streaming-repl-buf + + # Set a small output buffer limit to trigger the error + R 0 config set client-output-buffer-limit "replica 4mb 0 0" + + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # some write traffic is to have chance to enter streaming buffer state + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "a" + + # after 3 second, the slots snapshot (costs 2s to generate) should be transferred, + # then start streaming buffer + after 3000 + + set loglines [count_log_lines 0] + + # Start the slot 0 write load on the R 0 + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 1000] + + # verify the metric is accessible, it is transient, will be reset on disconnect + assert {[S 0 mem_cluster_slot_migration_output_buffer] >= 0} + + # After some time, the client output buffer limit should be reached + wait_for_log_messages 0 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 1000 10 + wait_for_condition 1000 10 { + [string match {*send*stream*} [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail as expected" + } + + stop_write_load $load_handle + + # Reset configurations + R 0 config set client-output-buffer-limit "replica 0 0 0" + R 0 config set rdb-key-save-delay 0 + + # resume server and clear pause point + resume_process $r1_pid + R 1 debug repl-pause clear + + # Wait for the migration to complete + wait_for_asm_done + } + + test "Full sync buffer limit is reached on destination side" { + # Set a small replication buffer limit to trigger the error + R 0 config set replica-full-sync-buffer-limit 1mb + + # start migration from 1 to 0, cost 4s to transfer slots snapshot + set task_id [setup_slot_migration_with_delay 1 0 0 100 2 2000000] + set loglines [count_log_lines 0] + + # Create some traffic on slot 0 + populate_slot 100 -idx 1 -slot 0 -size 100000 + + # After some time, slots sync buffer limit should be reached, but migration would not fail + # since the buffer will be accumulated on source side from now. + wait_for_log_messages 0 {"*Slots sync buffer limit has been reached*"} $loglines 1000 10 + + # verify the peak value, should be greater than 1mb + assert {[S 0 mem_cluster_slot_migration_input_buffer_peak] > 1000000} + # verify the metric is accessible, it is transient, will be reset on disconnect + assert {[S 0 mem_cluster_slot_migration_input_buffer] >= 0} + + wait_for_asm_done + + # Reset configurations + R 0 config set replica-full-sync-buffer-limit 0 + R 1 config set rdb-key-save-delay 0 + R 1 cluster migration import 0 100 + wait_for_asm_done + } + + test "Expired key is not deleted and SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT filter keys in importing slots" { + set slot0_key [slot_key 0 mykey] + set slot1_key [slot_key 1 mykey] + set slot2_key [slot_key 2 mykey] + R 1 flushall + R 0 flushall + + # we set a delay to write incremental data + R 1 config set rdb-key-save-delay 1000000 + + # set expire time 2s. Generating slot snapshot will 3s, so these + # three keys will be expired after slot snapshot is transferred + R 1 setex $slot0_key 2 "a" + R 1 setex $slot1_key 2 "b" + R 1 hset $slot2_key "f1" "1" + R 1 expire $slot2_key 2 + R 1 hexpire $slot2_key 2 FIELDS 1 "f1" + + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100] + wait_for_condition 2000 10 { + [string match {*send-bulk-and-stream*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not start" + } + + # update expire time during mirgration + R 1 setex $slot0_key 100 "a" + R 1 expire $slot1_key 80 + R 1 expire $slot2_key 60 + R 1 hincrbyfloat $slot2_key "f1" 1 + R 1 hexpire $slot2_key 60 FIELDS 1 "f1" + + # after 2s, at least a key should be transferred, and should not be deleted + # due to expired, neither active nor lazy expiration (SCAN) takes effect, + # Besides SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT command can not find them + after 2000 + R 3 readonly + foreach id {0 3} { ;# 0 is the master, 3 is the replica + assert_equal {0 {}} [R $id scan 0 count 10] + assert_equal {} [R $id keys "*"] + assert_equal {} [R $id keys "{06S}*"] + assert_equal {} [R $id randomkey] + assert_equal {} [R $id cluster getkeysinslot 0 100] + assert_equal [R $id cluster countkeysinslot 0] 0 + assert_equal [R $id dbsize] 0 + + # but we can see the number of keys is increased in INFO KEYSPACE + assert {[scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d] >= 1} + assert {[scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d] >= 1} + } + + wait_for_asm_done + + wait_for_ofs_sync [Rn 0] [Rn 3] + + foreach id {0 3} { ;# 0 is the master, 3 is the replica + # verify the keys are valid + assert_range [R $id ttl $slot0_key] 90 100 + assert_range [R $id ttl $slot1_key] 70 80 + assert_range [R $id ttl $slot2_key] 50 60 + assert_range [R $id httl $slot2_key FIELDS 1 "f1"] 50 60 + + # KEYS/SCAN/RANDOMKEY/CLUSTER GETKEYSINSLOT will find the keys after migration + assert_equal [list 0 [list $slot0_key $slot1_key $slot2_key]] [R $id scan 0 count 10] + assert_equal [list $slot0_key $slot1_key $slot2_key] [R $id keys "*"] + assert_equal [list $slot0_key] [R $id keys "{06S}*"] + assert_not_equal {} [R $id randomkey] + assert_equal [list $slot0_key] [R $id cluster getkeysinslot 0 100] + + # INFO KEYSPACE/DBSIZE/CLUSTER COUNTKEYSINSLOT will also reflect the keys + assert_equal 3 [scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d] + assert_equal 3 [scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d] + assert_equal 1 [scan [regexp -inline {subexpiry\=([\d]*)} [R $id info keyspace]] subexpiry=%d] + assert_equal 3 [R $id dbsize] + assert_equal 1 [R $id cluster countkeysinslot 0] + } + + # update expire time to 10ms, after some time, the keys should be deleted due to + # active expiration + R 0 pexpire $slot0_key 10 + R 0 pexpire $slot1_key 10 + R 0 hpexpire $slot2_key 10 FIELDS 1 "f1" ;# the last field is expired, the key will be deleted + wait_for_condition 100 50 { + [scan [regexp -inline {keys\=([\d]*)} [R 0 info keyspace]] keys=%d] == {} && + [scan [regexp -inline {keys\=([\d]*)} [R 3 info keyspace]] keys=%d] == {} + } else { + fail "keys did not expire" + } + + R 1 config set rdb-key-save-delay 0 + } + + test "Eviction does not evict keys in importing slots" { + set slot0_key [slot_key 0 mykey] + set slot1_key [slot_key 1 mykey] + set slot2_key [slot_key 2 mykey] + set slot5462_key [slot_key 5462 mykey] + set slot5463_key [slot_key 5463 mykey] + R 1 flushall + R 0 flushall + + # we set a delay to write incremental data + R 0 config set rdb-key-save-delay 1000000 + + set 1k_str [string repeat "a" 1024] + set 1m_str [string repeat "a" 1048576] + + # set two keys to be evicted + R 1 set $slot5462_key $1k_str + R 1 set $slot5463_key $1k_str + + # set maxmemory to 200kb more than current used memory, + # redis should evict some keys if importing some big keys + set r1_mem_used [S 1 used_memory] + set r1_max_mem [expr {$r1_mem_used + 200*1024}] + R 1 config set maxmemory $r1_max_mem + R 1 config set maxmemory-policy allkeys-lru + + # set 3 keys to be migrated + R 0 set $slot0_key $1m_str + R 0 set $slot1_key $1m_str + R 0 set $slot2_key $1m_str + + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + wait_for_condition 2000 10 { + [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not start" + } + + # after 2.2s, at least two keys should be transferred, they should not be evicted + # but other keys (slot5462_key and slot5463_key) should be evicted + after 2200 + for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction + assert_equal 0 [R 1 exists $slot5462_key] + assert_equal 0 [R 1 exists $slot5463_key] + assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 2} + + # current used memory should be more than the maxmemory, since the big keys that + # belong importing slots can not be evicted. + set r1_mem_used [S 1 used_memory] + assert {$r1_mem_used > $r1_max_mem + 1024*1024} + + wait_for_asm_done + + # after migration, these big keys should be evicted + for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction + assert_equal {} [scan [regexp -inline {expires\=([\d]*)} [R 1 info keyspace]] expires=%d] + } + + test "Failover will cancel slot migration tasks" { + # migrate slot 0-100 from 1 to 0 + set task_id [setup_slot_migration_with_delay 1 0 0 100] + + # FAILOVER happens on the destination node, instance #3 become master, #0 become slave + failover_and_wait_for_done 3 + + # the old master will cancel the importing task, and the migrating task on + # the source node will be failed + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*failover*} [migration_status 0 $task_id last_error]] && + [string match {*failed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not cancel" + } + + # We can restart ASM tasks on new master, migrate slot 0-100 from 1 to 3 + R 1 config set rdb-key-save-delay 0 + set task_id [R 3 CLUSTER MIGRATION IMPORT 0 100] + wait_for_asm_done + + # migrate slot 0-100 from 3 to 1 + set task_id [setup_slot_migration_with_delay 3 1 0 100] + + # FAILOVER happens on the source node, instance #3 become slave, #0 become master + failover_and_wait_for_done 0 + + # the old master will cancel the migrating task, but the destination node will + # retry the importing task, and then succeed. + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 3 $task_id state]] + } else { + fail "ASM task did not cancel" + } + wait_for_asm_done + } + + test "Flush-like command can cancel slot migration task" { + # flushall, flushdb + foreach flushcmd {flushall flushdb} { + # start slot migration from 1 to 0 + set task_id [setup_slot_migration_with_delay 1 0 0 100] + + if {$::verbose} { puts "Testing flush command: $flushcmd"} + R 0 $flushcmd + + # flush-like will cancel the task + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not cancel" + } + } + + R 1 config set rdb-key-save-delay 0 + R 0 cluster migration import 0 100 + wait_for_asm_done + } + + test "CLUSTER SETSLOT command when there is a slot migration task" { + # Setup slot migration test from node 0 to node 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # Cluster SETSLOT command is not allowed when there is a slot migration task + # on the slot. #0 and #1 are having migration task now. + foreach instance {0 1} { + set node_id [R $instance cluster myid] + + catch {R $instance cluster setslot 0 migrating $node_id} err + assert_match {*in an active atomic slot migration*} $err + + catch {R $instance cluster setslot 0 importing $node_id} err + assert_match {*in an active atomic slot migration*} $err + + catch {R $instance cluster setslot 0 stable} err + assert_match {*in an active atomic slot migration*} $err + + catch {R $instance cluster setslot 0 node $node_id} err + assert_match {*in an active atomic slot migration*} $err + } + + # CLUSTER SETSLOT on other node will cancel the migration task, we update + # the owner of slot 0 (that is migrating from #0 to #1) to #2 on #2, we + # bump the config epoch to make sure the change can update #0 and #1 + # slot configuration, so #0 and #1 will cancel the migration task. + # BTW, if config epoch is not bumped, the slot config of #2 may be + # updated by #0 and #1. + R 2 cluster bumpepoch + R 2 cluster setslot 0 node [R 2 cluster myid] + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] && + [string match {*canceled*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not cancel" + } + + # set slot 0 back to #0 + R 0 cluster bumpepoch + R 0 cluster setslot 0 node [R 0 cluster myid] + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + test "CLUSTER DELSLOTSRANGE command cancels a slot migration task" { + # start slot migration from 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + R 0 cluster delslotsrange 0 100 + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] && + [string match {*failed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not cancel" + } + R 1 cluster migration cancel id $task_id + + # add the slots back + R 0 cluster addslotsrange 0 100 + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + # NOTE: this test needs more than 60s, maybe you can skip when testing + test "CLUSTER FORGET command cancels a slot migration task" { + R 0 config set rdb-key-save-delay 0 + # Migrate all slot on #0 to #1, so we can forget #0 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 5461] + wait_for_asm_done + + # start slot migration from 1 to 0 + set task_id [setup_slot_migration_with_delay 1 0 0 5461] + + # Forget #0 on #1, the migration task on #1 will be canceled due to node deleted, + # and the importing task on #0 will be failed + R 1 cluster forget [R 0 cluster myid] + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 1 $task_id state]] && + [string match {*node deleted*} [migration_status 1 $task_id last_error]] && + [string match {*failed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not cancel" + } + + # Add #0 back into cluster + # NOTE: this will cost 60s to let #0 join the cluster since + # other nodes add #0 into black list for 60s after FORGET. + R 1 config set rdb-key-save-delay 0 + R 1 cluster meet "127.0.0.1" [lindex [R 0 config get port] 1] + + # the importing task on #0 will be retried, and eventually succeed + # since now #0 is back in the cluster + wait_for_condition 3000 50 { + [string match {*completed*} [migration_status 0 $task_id state]] && + [string match {*completed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not finish" + } + + # make sure #0 is completely back to the cluster + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + test "CLIENT PAUSE can cancel slot migration task" { + # start slot migration from 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # CLIENT PAUSE happens on the destination node, #1 will cancel the importing task + R 1 client pause 100000 write ;# pause 100s + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 1 $task_id state]] && + [string match {*client pause*} [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not cancel" + } + + # start task again + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + after 200 ;# give some time to have chance to schedule the task + # the task should not start since server is paused + assert {[string match {*none*} [migration_status 1 $task_id state]]} + + # unpause the server, the task should start + R 1 client unpause + wait_for_asm_done + + # migrate back to original node #0 + R 0 config set rdb-key-save-delay 0 + R 1 config set rdb-key-save-delay 0 + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + + test "Server shutdown can cancel slot migration task, exit with success" { + # start slot migration from 0 to 1 + setup_slot_migration_with_delay 0 1 0 100 + + set loglines [count_log_lines -1] + + # Shutdown the server, it should cancel the migration task + restart_server -1 true false true nosave + + wait_for_log_messages -1 {"*Cancelled due to server shutdown*"} $loglines 100 100 + + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + test "Cancel import task when streaming buffer into db" { + # set a delay to have time to cancel import task that is streaming buf to db + R 1 config set key-load-delay 50000 + # start slot migration from 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100 5] + + # start the slot 0 write load on the node 0 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 500] + + # wait for entering streaming buffer state + wait_for_condition 1000 10 { + [string match {*streaming-buffer*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not enter streaming buffer state" + } + stop_write_load $load_handle + + # cancel the import task on #1, the destination node works fine + R 1 cluster migration cancel id $task_id + assert_match {*canceled*} [migration_status 1 $task_id state] + + # reset config + R 0 config set key-load-delay 0 + R 1 config set key-load-delay 0 + } + + test "Destination node main channel timeout when waiting stream EOF" { + set task_id [setup_slot_migration_with_delay 0 1 0 100] + R 1 config set repl-timeout 5 + + # pause the source node to make EOF wait timeout. Do not pause + # the child process, so it can deliver slot snapshot to destination + set r0_process_id [S 0 process_id] + pause_process $r0_process_id + + # the destination node will fail after 7s, 5s for EOF wait and 2s for slot snapshot + wait_for_condition 1000 20 { + [string match {*failed*} [migration_status 1 $task_id state]] && + [string match {*Main channel*Connection timeout*wait-stream-eof*} \ + [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + # resume the source node + resume_process $r0_process_id + + # After the source node is resumed, the task on source node may receive + # ACKs from destination and consider the task is stream-done. In this case, + # the task on source node will be failed after several seconds + if {[string match {*stream-done*} [migration_status 0 $task_id state]]} { + wait_for_condition 1000 20 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Server paused*} [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + } + + R 1 config set repl-timeout 60 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Destination node rdb channel timeout when transferring slots snapshot" { + # cost 10s to transfer each key + set task_id [setup_slot_migration_with_delay 0 1 0 100 2 10000000] + R 1 config set repl-timeout 3 + + # the destination node will fail after 3s + wait_for_condition 1000 20 { + [string match {*failed*} [migration_status 1 $task_id state]] && + [string match {*RDB channel*Connection timeout*rdbchannel-transfer*} \ + [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + R 1 config set repl-timeout 60 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Source node rdb channel timeout when transferring slots snapshot" { + set r1_pid [S 1 process_id] + R 0 flushall + R 0 config set save "" + # generate several large keys, make sure the memory usage is more than + # socket buffer size, so the rdb channel will block and timeout if + # no data is received by destination. + set val [string repeat "a" 102400] ;# 100kb + for {set i 0} {$i < 1000} {incr i} { + set key [slot_key 0 "key$i"] + R 0 set $key $val + } + R 0 config set repl-timeout 3 ;# 3s for rdb channel timeout + R 0 config set rdb-key-save-delay 10000 ;# 1000 keys cost 10s to save + + # start migration from #0 to #1 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + wait_for_condition 1000 20 { + [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not start" + } + + # pause the destination node to make rdb channel timeout + pause_process $r1_pid + + # the source node will fail, the rdb child process can not + # write data to destination, so it will timeout + wait_for_condition 1000 30 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*RDB channel*Failed to send slots snapshot*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + resume_process $r1_pid + + R 0 config set repl-timeout 60 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Source node main channel timeout when sending incremental stream" { + R 0 flushall + R 0 config set repl-timeout 2 ;# 2s for main channel timeout + + set r1_pid [S 1 process_id] + # in order to have time to pause the destination node + R 1 config set key-load-delay 50000 ;# 50ms each 16k data + + # start migration from #0 to #1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # Create 200 keys of 16k size traffic on slot 0, streaming buffer need 10s (200*50ms) + populate_slot 200 -idx 0 -slot 0 -size 16384 + + # wait for streaming buffer state, then pause the destination node + wait_for_condition 1000 20 { + [string match {*streaming-buffer*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not stream buffer, state: [migration_status 1 $task_id state]" + } + pause_process $r1_pid + + # Start the slot 0 write load on the R 0 + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 [slot_key 0 mykey] 500] + + # the source node will fail after several seconds (including the time + # to fill the socket buffer of source node), the main channel can not + # write data to destination since the destination is paused + wait_for_condition 1000 30 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Main channel*Connection timeout*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + stop_write_load $load_handle + resume_process $r1_pid + + R 0 config set repl-timeout 60 + R 1 config set key-load-delay 0 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + R 0 flushall + } + + test "Source server paused timeout" { + # set timeout to 0, so the task will fail immediately when checking timeout + R 0 config set cluster-slot-migration-write-pause-timeout 0 + + # start migration from node 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # start the slot 0 write load on the node 0 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key] + + # node 0 will fail since server paused timeout + wait_for_condition 2000 10 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Server paused timeout*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + stop_write_load $load_handle + + # reset config + R 0 config set cluster-slot-migration-write-pause-timeout 10000 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Sync buffer drain timeout" { + # set a fail point to avoid the source node to enter handoff prep state + # to test the sync buffer drain timeout + R 0 debug asm-failpoint "migrate-main-channel" "handoff-prep" + R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 5000 + + set r1_pid [S 1 process_id] + + # start migration from node 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # start the slot 0 write load on the node 0 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key] + + # wait for entering streaming buffer state + wait_for_condition 1000 10 { + [string match {*wait-stream-eof*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not enter wait-stream-eof state" + } + + pause_process $r1_pid ;# avoid the destination to apply commands + + # node 0 will fail since sync buffer drain timeout + wait_for_condition 2000 10 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Sync buffer drain timeout*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + stop_write_load $load_handle + resume_process $r1_pid + + # reset config + R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 60000 + R 0 debug asm-failpoint "" "" + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Cluster implementation cannot start migrate task temporarily" { + # Inject a fail point to make the source node not ready + R 0 debug asm-failpoint "migrate-main-channel" "none" + + # start migration from node 0 to 1 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + + # verify source node replies SYNCSLOTS with -NOTREADY + set loglines [count_log_lines -1] + wait_for_log_messages -1 {"*Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later*"} $loglines 100 100 + + # clear the fail point and verify the task is completed + R 0 debug asm-failpoint "" "" + wait_for_asm_done + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} { + test "Test bgtrim after a successful migration" { + R 0 debug asm-trim-method bg + R 3 debug asm-trim-method bg + R 0 CONFIG RESETSTAT + R 3 CONFIG RESETSTAT + + R 0 flushall + # Fill slot 0 + populate_slot 1000 -idx 0 -slot 0 + # Fill slot 1 with keys that have TTL + populate_slot 1000 -idx 0 -slot 1 -prefix "expirekey" -expires 100 + # HFE key on slot 2 + set slot2_hfekey [slot_key 2 hfekey] + R 0 HSETEX $slot2_hfekey EX 10 FIELDS 1 f1 v1 + + # Fill slot 101, these keys won't be migrated + populate_slot 1000 -idx 0 -slot 101 + # Fill slot 102 with keys that have TTL + populate_slot 1000 -idx 0 -slot 102 -prefix "expirekey" -expires 100 + # HFE key on slot 103 + set slot103_hfekey [slot_key 103 hfekey] + R 0 HSETEX $slot103_hfekey EX 10 FIELDS 1 f1 v1 + + # migrate slot 0 to node-1 + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + + # Verify the data is migrated + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 2001 [R 0 dbsize] + assert_equal 2001 [R 3 dbsize] + wait_for_ofs_sync [Rn 1] [Rn 4] + assert_equal 2001 [R 1 dbsize] + assert_equal 2001 [R 4 dbsize] + + # Verify the keys are trimmed lazily + wait_for_condition 1000 10 { + [S 0 lazyfreed_objects] == 2001 && + [S 3 lazyfreed_objects] == 2001 + } else { + puts "lazyfreed_objects: [S 0 lazyfreed_objects] [S 3 lazyfreed_objects]" + fail "Background trim did not happen" + } + + # Cleanup + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + } + + test "Test bgtrim after a failed migration" { + R 0 debug asm-trim-method bg + R 3 debug asm-trim-method bg + R 1 CONFIG RESETSTAT + R 4 CONFIG RESETSTAT + + # Fill slot 0 on node-0 and migrate it to node-1 (with some delay) + R 0 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000] + after 1000 ;# wait some time so that some keys are moved + + # Fail the migration + R 1 CLUSTER MIGRATION CANCEL ID $task_id + wait_for_asm_done + + # Verify the data is not migrated + assert_equal 10000 [R 0 dbsize] + assert_equal 10000 [R 3 dbsize] + + # Verify the keys are trimmed lazily after a failed import on dest side. + wait_for_condition 1000 20 { + [R 1 dbsize] == 0 && + [R 4 dbsize] == 0 && + [S 1 lazyfreed_objects] > 0 && + [S 4 lazyfreed_objects] > 0 + } else { + fail "Background trim did not happen" + } + + # Cleanup + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + } + + test "Test bgtrim unblocks stream client" { + # Two clients waiting for data on two different streams which are in + # different slots. We are going to migrate one slot, which will unblock + # the client. The other client should still be blocked. + R 0 debug asm-trim-method bg + + set key0 [slot_key 0 mystream] + set key1 [slot_key 1 mystream] + + # First client waits on slot-0 key + R 0 DEL $key0 + R 0 XADD $key0 666 f v + R 0 XGROUP CREATE $key0 mygroup $ + set rd0 [redis_deferring_client] + $rd0 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key0 ">" + wait_for_blocked_clients_count 1 + + # Second client waits on slot-1 key + R 0 DEL $key1 + R 0 XADD $key1 666 f v + R 0 XGROUP CREATE $key1 mygroup $ + set rd1 [redis_deferring_client] + $rd1 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key1 ">" + wait_for_blocked_clients_count 2 + + # Migrate slot 0 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + + # First client should get MOVED error + assert_error "*MOVED*" {$rd0 read} + $rd0 close + + # Second client should operate normally + R 0 XADD $key1 667 f v + set res [$rd1 read] + assert_equal [lindex $res 0 1 0] {667-0 {f v}} + $rd1 close + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + + test "Test bgtrim touches watched keys" { + R 0 debug asm-trim-method bg + + # bgtrim should touch watched keys on migrated slots + set key0 [slot_key 0 key] + R 0 set $key0 30 + R 0 watch $key0 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + R 0 multi + R 0 ping + assert_equal {} [R 0 exec] + + # bgtrim should not touch watched keys on other slots + set key2 [slot_key 2 key] + R 0 set $key2 30 + R 0 watch $key2 + R 1 CLUSTER MIGRATION IMPORT 1 1 + wait_for_asm_done + R 0 multi + R 0 ping + assert_equal PONG [R 0 exec] + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + + test "Test bgtrim after a FAILOVER on destination side" { + R 1 debug asm-trim-method bg + R 4 debug asm-trim-method bg + + set loglines [count_log_lines -4] + + # Fill slot 0 on node-0 and migrate it to node-1 (with some delay) + R 0 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000] + after 1000 ;# wait some time so that some keys are moved + + # Trigger a failover with force to simulate unreachable master and + # verify unowned keys are trimmed once replica becomes master. + failover_and_wait_for_done 4 force + wait_for_log_messages -4 {"*Detected keys in slots that do not belong*Scheduling trim*"} $loglines 1000 10 + wait_for_condition 1000 10 { + [R 1 dbsize] == 0 && + [R 4 dbsize] == 0 + } else { + fail "Background trim did not happen" + } + + # cleanup + wait_for_cluster_propagation + failover_and_wait_for_done 1 + R 0 config set rdb-key-save-delay 0 + R 1 debug asm-trim-method default + R 4 debug asm-trim-method default + wait_for_asm_done + } + + test "CLUSTER SETSLOT is not allowed if there is a pending trim job" { + R 0 debug asm-trim-method bg + R 3 debug asm-trim-method bg + + # Fill slot 0 on node-0 and migrate it to node-1 (with some delay) + R 0 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000] + + # Pause will cancel the task and there will be a pending trim job + # until writes are allowed again. + R 1 client pause 100000 write ;# pause 100s + wait_for_asm_done + + # CLUSTER SETSLOT is not allowed if there is a pending trim job. + assert_error {*There is a pending trim job for slot 0*} {R 1 CLUSTER SETSLOT 0 STABLE} + + # Unpause the server, trim will be triggered and SETSLOT will be allowed + R 1 client unpause + R 1 CLUSTER SETSLOT 0 STABLE + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no save ""}} { + test "Test active trim after a successful migration" { + R 0 debug asm-trim-method active + R 3 debug asm-trim-method active + populate_slot 500 -slot 0 + populate_slot 500 -slot 1 + populate_slot 500 -slot 3 + populate_slot 500 -slot 4 + + # Migrate 1500 keys + R 1 CLUSTER MIGRATION IMPORT 0 1 3 3 + wait_for_asm_done + + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] == 1500 && + [CI 3 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_current_job_trimmed] == 1500 + } else { + fail "trim failed" + } + + assert_equal 1500 [CI 0 cluster_slot_migration_active_trim_current_job_keys] + assert_equal 1500 [CI 3 cluster_slot_migration_active_trim_current_job_keys] + + assert_equal 500 [R 0 dbsize] + assert_equal 500 [R 3 dbsize] + assert_equal 1500 [R 1 dbsize] + assert_equal 1500 [R 4 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 0 [R 0 cluster countkeysinslot 3] + assert_equal 500 [R 0 cluster countkeysinslot 4] + + # cleanup + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 1 3 3 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test multiple active trim jobs can be scheduled" { + # Active trim will be scheduled but it won't run + R 0 debug asm-trim-method active -1 + R 3 debug asm-trim-method active -1 + + populate_slot 500 -slot 0 + populate_slot 500 -slot 1 + populate_slot 500 -slot 3 + populate_slot 500 -slot 4 + + # Migrate 1500 keys + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # Migrate another slot and verify there are two trim tasks on the source + R 1 CLUSTER MIGRATION IMPORT 3 3 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 2 && + [CI 3 cluster_slot_migration_active_trim_running] == 2 + } else { + fail "migrate failed" + } + + # Enabled active trim and wait until it is completed. + R 0 debug asm-trim-method active 0 + R 3 debug asm-trim-method active 0 + wait_for_asm_done + + assert_equal 500 [R 0 dbsize] + assert_equal 500 [R 3 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 0 [R 0 cluster countkeysinslot 3] + assert_equal 500 [R 0 cluster countkeysinslot 4] + + # cleanup + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 1 3 3 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test active-trim clears partially imported keys on cancel" { + R 1 debug asm-trim-method active + R 4 debug asm-trim-method active + + # Rdb delivery will take 10 seconds + R 0 config set rdb-key-save-delay 10000 + populate_slot 250 -slot 0 + populate_slot 250 -slot 1 + populate_slot 250 -slot 3 + populate_slot 250 -slot 4 + + R 1 CLUSTER MIGRATION IMPORT 0 100 + after 2000 + R 1 CLUSTER MIGRATION CANCEL ALL + wait_for_asm_done + + assert_morethan [CI 1 cluster_slot_migration_active_trim_current_job_keys] 0 + assert_morethan [CI 4 cluster_slot_migration_active_trim_current_job_trimmed] 0 + + assert_equal 1000 [R 0 dbsize] + assert_equal 1000 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + + # Cleanup + R 1 debug asm-trim-method default + R 4 debug asm-trim-method default + R 0 config set rdb-key-save-delay 0 + } + + test "Test active-trim clears partially imported keys on failover" { + R 1 debug asm-trim-method active + R 4 debug asm-trim-method active + + # Rdb delivery will take 10 seconds + R 0 config set rdb-key-save-delay 10000 + + populate_slot 250 -slot 0 + populate_slot 250 -slot 1 + populate_slot 250 -slot 3 + populate_slot 250 -slot 4 + + set prev_trim_started_1 [CI 1 cluster_slot_migration_stats_active_trim_started] + set prev_trim_started_4 [CI 4 cluster_slot_migration_stats_active_trim_started] + + R 1 CLUSTER MIGRATION IMPORT 0 100 + after 2000 + failover_and_wait_for_done 4 + wait_for_asm_done + + # Verify there is at least one trim job started + assert_morethan [CI 1 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_1 + assert_morethan [CI 4 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_4 + + assert_equal 1000 [R 0 dbsize] + assert_equal 1000 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + + # Cleanup + failover_and_wait_for_done 1 + R 1 debug asm-trim-method default + R 4 debug asm-trim-method default + R 0 config set rdb-key-save-delay 0 + R 0 flushall + R 1 flushall + } + + test "Test import task does not start if active trim is in progress for the same slots" { + # Active trim will be scheduled but it won't run + R 0 flushall + R 1 flushall + R 0 debug asm-trim-method active -1 + + populate_slot 500 -slot 0 + populate_slot 500 -slot 1 + + # Migrate 1000 keys + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # Try to migrate slots back + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_log_messages 0 {"*Can not start import task*trim in progress for some of the slots*"} 0 1000 10 + + # Enabled active trim and verify slots are imported back + R 0 debug asm-trim-method active 0 + wait_for_asm_done + + assert_equal 1000 [R 0 dbsize] + assert_equal 500 [R 0 cluster countkeysinslot 0] + assert_equal 500 [R 0 cluster countkeysinslot 1] + + # cleanup + R 0 debug asm-trim-method default + R 0 flushall + } + + test "Rdb save during active trim should skip keys in trimmed slots" { + # Insert some delay to activate trim + R 0 debug asm-trim-method active 1000 + R 0 config set repl-diskless-sync-delay 0 + R 0 flushall + + populate_slot 5000 -idx 0 -slot 0 + populate_slot 5000 -idx 0 -slot 1 + populate_slot 5000 -idx 0 -slot 2 + + # Start migration and wait until trim is in progress + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 && + [S 0 rdb_bgsave_in_progress] == 0 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + # Trigger save during active trim + R 0 save + # Wait until the log contains a "keys skipped" message with a non-zero value + wait_for_log_messages 0 {"*BGSAVE done, 5000 keys saved, [1-9]* keys skipped*"} 0 1000 10 + + restart_server 0 yes no yes nosave + assert_equal 5000 [R 0 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 5000 [R 0 cluster countkeysinslot 2] + + # Cleanup + wait_for_cluster_propagation + wait_for_cluster_state "ok" + R 0 flushall + R 1 flushall + R 0 save + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_asm_done + } + + test "AOF rewrite during active trim should skip keys in trimmed slots" { + R 0 debug asm-trim-method active 1000 + R 0 config set repl-diskless-sync-delay 0 + R 0 config set aof-use-rdb-preamble no + R 0 config set appendonly yes + R 0 config rewrite + R 0 flushall + populate_slot 5000 -idx 0 -slot 0 + populate_slot 5000 -idx 0 -slot 1 + populate_slot 5000 -idx 0 -slot 2 + + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + wait_for_condition 50 100 { + [S 0 rdb_bgsave_in_progress] == 0 + } else { + fail "bgsave is in progress" + } + + R 0 bgrewriteaof + # Wait until the log contains a "keys skipped" message with a non-zero value + wait_for_log_messages 0 {"*AOF rewrite done, [1-9]* keys saved, [1-9]* keys skipped*"} 0 1000 10 + + restart_server 0 yes no yes nosave + assert_equal 5000 [R 0 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 5000 [R 0 cluster countkeysinslot 2] + + # cleanup + R 0 config set appendonly no + R 0 config rewrite + restart_server 0 yes no yes nosave + wait_for_cluster_propagation + wait_for_cluster_state "ok" + R 0 flushall + R 1 flushall + R 0 save + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_asm_done + } + + test "Pause actions will stop active trimming" { + R 0 debug asm-trim-method active 1000 + R 0 config set repl-diskless-sync-delay 0 + R 0 flushall + populate_slot 10000 -idx 0 -slot 0 + + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + # Pause the server and verify no keys are trimmed + R 0 client pause 100000 write ;# pause 100s + set prev [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] + after 1000 ; # wait some time to see if any keys are trimmed + set curr [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] + assert_equal $prev $curr + + R 0 client unpause + R 0 debug asm-trim-method default + wait_for_asm_done + assert_equal 0 [R 0 dbsize] + + # revert + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + assert_equal 10000 [R 0 dbsize] + } + + foreach diskless_load {"disabled" "swapdb" "on-empty-db"} { + test "Test fullsync cancels active trim (repl-diskless-load $diskless_load)" { + R 3 debug asm-trim-method active -10 + R 3 config set repl-diskless-load $diskless_load + R 0 flushall + + R 0 config set repl-diskless-sync-delay 0 + populate_slot 10000 -idx 0 -slot 0 + + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + puts "[CI 3 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + set prev_cancelled [CI 3 cluster_slot_migration_stats_active_trim_cancelled] + R 0 config set client-output-buffer-limit "replica 1024 0 0" + + # Trigger a fullsync + populate_slot 1 -idx 0 -size 2000000 -slot 2 + + wait_for_condition 1000 10 { + [CI 3 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_stats_active_trim_cancelled] == $prev_cancelled + 1 + } else { + puts "[CI 3 cluster_slot_migration_active_trim_running]" + puts "[CI 3 cluster_slot_migration_stats_active_trim_cancelled]" + fail "trim failed" + } + + R 3 debug asm-trim-method active 0 + R 3 config set repl-diskless-load disabled + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 10001 [R 0 dbsize] + assert_equal 10001 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + R 0 flushall + } + } + + test "Test importing slots while active-trim is in progress for the same slots on replica" { + R 3 debug asm-trim-method active 10000 + R 0 flushall + populate_slot 10000 -slot 0 + wait_for_ofs_sync [Rn 0] [Rn 3] + + # Wait until active trim is in progress on replica + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + puts "[CI 3 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + set loglines [count_log_lines -3] + + # Get slots back + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_condition 1000 20 { + [CI 0 cluster_slot_migration_active_tasks] == 1 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "trim failed" + } + + # Verify replica blocks master until trim is done + wait_for_log_messages -3 {"*Blocking master client until trim job is done*"} $loglines 1000 30 + R 3 debug asm-trim-method active 0 + wait_for_log_messages -3 {"*Unblocking master client after active trim*"} $loglines 1000 30 + + wait_for_asm_done + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 10000 [R 0 dbsize] + assert_equal 10000 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + } + + test "TRIMSLOTS should not trim slots that this node is serving" { + assert_error {*the slot 0 is served by this node*} {R 0 trimslots ranges 1 0 0} + assert_error {*READONLY*} {R 3 trimslots ranges 1 0 100} + assert_equal {OK} [R 0 trimslots ranges 1 16383 16383] + assert_error {*READONLY*} {R 3 trimslots ranges 1 16383 16383} + } + + test "Trigger multiple active trim jobs at the same time" { + R 1 debug asm-trim-method active 0 + R 1 flushall + + set prev_trim_done [CI 1 cluster_slot_migration_stats_active_trim_completed] + + R 1 debug populate 1000 [slot_prefix 0] 100 + R 1 debug populate 1000 [slot_prefix 1] 100 + R 1 debug populate 1000 [slot_prefix 2] 100 + + R 1 multi + R 1 trimslots ranges 1 0 0 + R 1 trimslots ranges 1 1 1 + R 1 trimslots ranges 1 2 2 + R 1 exec + + wait_for_condition 1000 10 { + [CI 1 cluster_slot_migration_stats_active_trim_completed] == $prev_trim_done + 3 + } else { + fail "active trim failed" + } + + R 1 flushall + R 1 debug asm-trim-method default + } + + test "Restart will clean up unowned slot keys" { + R 1 flushall + + # generate 1000 keys belonging to slot 0 + R 1 debug populate 1000 [slot_prefix 0] 100 + assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1000} + + # restart node-1 + restart_server -1 true false true save + wait_for_cluster_propagation + wait_for_cluster_state "ok" + + # Node-1 has no keys since unowned slot 0 keys were cleaned up during restart + assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] == {}} + + R 1 flushall + } + + test "Test active trim is used when client tracking is used" { + R 0 flushall + R 1 flushall + R 0 debug asm-trim-method default + R 1 debug asm-trim-method default + + set prev_active_trim [CI 0 cluster_slot_migration_stats_active_trim_completed] + + # Setup a tracking client that is redirected to a pubsub client + set rd_redirection [redis_deferring_client] + $rd_redirection client id + set redir_id [$rd_redirection read] + $rd_redirection subscribe __redis__:invalidate + $rd_redirection read ; # Consume the SUBSCRIBE reply. + + # setup tracking + set key0 [slot_key 0 key] + R 0 CLIENT TRACKING on REDIRECT $redir_id + R 0 SET $key0 1 + R 0 GET $key0 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_stats_active_trim_completed] == [expr $prev_active_trim + 1] + } else { + fail "active trim did not happen" + } + + # Verify the tracking client received the invalidation message + set msg [$rd_redirection read] + set head [lindex $msg 0] + + if {$head eq "message"} { + # RESP 2 + set got_key [lindex [lindex $msg 2] 0] + } elseif {$head eq "invalidate"} { + # RESP 3 + set got_key [lindex $msg 1 0] + } else { + fail "unexpected invalidation message: $msg" + } + assert_equal $got_key $key0 + + # cleanup + $rd_redirection close + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + R 0 flushall + } +} + +set testmodule [file normalize tests/modules/atomicslotmigration.so] + +start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no]] { + test "Module api sanity" { + R 0 asm.sanity ;# on master + R 3 asm.sanity ;# on replica + } + + test "Module replicate cross slot command" { + set task_id [setup_slot_migration_with_delay 0 1 0 100] + set listkey [slot_key 0 "asmlist"] + # replicate cross slot command during migrating + R 0 asm.lpush_replicate_crossslot_command $listkey "item1" + + # node 0 will fail due to cross slot + wait_for_condition 2000 10 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*cross slot*} [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + R 1 CLUSTER MIGRATION CANCEL ID $task_id + + # sanity check if lpush replicated correctly to the replica + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal {item1} [R 0 lrange $listkey 0 -1] + R 3 readonly + assert_equal {item1} [R 3 lrange $listkey 0 -1] + } + + test "Test RM_ClusterCanAccessKeysInSlot" { + # Test invalid slots + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot -1] + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 20000] + assert_equal 0 [R 2 asm.cluster_can_access_keys_in_slot 16384] + assert_equal 0 [R 5 asm.cluster_can_access_keys_in_slot 16384] + + # Test on a master-replica pair + assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 100] + assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 100] + + # Test on a master-replica pair + assert_equal 1 [R 2 asm.cluster_can_access_keys_in_slot 16383] + assert_equal 1 [R 5 asm.cluster_can_access_keys_in_slot 16383] + } + + test "Test RM_ClusterCanAccessKeysInSlot returns false for unowned slots" { + # Active trim will be scheduled but it won't run + R 0 debug asm-trim-method active -1 + R 3 debug asm-trim-method active -1 + + setup_slot_migration_with_delay 0 1 0 100 3 1000000 + + # Verify importing slots are not local + assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 100] + assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 100] + + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # Wait for config propagation before checking the slot ownership on replica + wait_for_cluster_propagation + + # Verify slots that are being trimmed are not local + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 100] + assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 100] + + # Enabled active trim and wait until it is completed. + R 0 debug asm-trim-method active 0 + R 3 debug asm-trim-method active 0 + wait_for_asm_done + wait_for_ofs_sync [Rn 0] [Rn 3] + + # Verify slots are local after migration + assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 100] + assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 100] + + # cleanup + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + foreach trim_method {"active" "bg"} { + test "Test cluster module notifications on a successful migration ($trim_method-trim)" { + clear_module_event_log + R 0 debug asm-trim-method $trim_method + R 3 debug asm-trim-method $trim_method + R 6 debug asm-trim-method $trim_method + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Migrate the slot ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100 200 300] + wait_for_asm_done + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Verify the events on source, both master and replica + set migrate_event_log [list \ + "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + "sub: cluster-slot-migration-migrate-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + ] + assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log + assert_equal [R 3 asm.get_cluster_event_log] {} + assert_equal [R 6 asm.get_cluster_event_log] {} + + # Verify the events on destination, both master and replica + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + ] + wait_for_condition 500 20 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + puts "R1: [R 1 asm.get_cluster_event_log]" + puts "R4: [R 4 asm.get_cluster_event_log]" + puts "R7: [R 7 asm.get_cluster_event_log]" + fail "ASM import event not received" + } + + # Verify the trim events + if {$trim_method eq "active"} { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-100,200-300" \ + "keyspace: key_trimmed, key: $key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-100,200-300" \ + ] + } else { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-background, slots:0-100,200-300" \ + ] + } + wait_for_condition 500 10 { + [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + fail "ASM source trim event not received" + } + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 100 200 300 + wait_for_asm_done + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + + test "Test cluster module notifications on a failed migration ($trim_method-trim)" { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration and cancel it + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + R 1 CLUSTER MIGRATION CANCEL ID $task_id + wait_for_asm_done + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Verify the events on source, both master and replica + set migrate_event_log [list \ + "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log + assert_equal [R 3 asm.get_cluster_event_log] {} + assert_equal [R 6 asm.get_cluster_event_log] {} + + # Verify the events on destination, both master and replica + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 10 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + fail "ASM import event not received" + } + + # Verify the trim events on destination (partially imported keys are trimmed) + if {$trim_method eq "active"} { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-100" \ + "keyspace: key_trimmed, key: $key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-100" \ + ] + } else { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-background, slots:0-100" \ + ] + } + wait_for_condition 500 10 { + [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + fail "ASM destination trim event not received" + } + + # cleanup + clear_module_event_log + reset_default_trim_method + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test cluster module notifications on failover ($trim_method-trim)" { + # NOTE: cluster legacy may have a bug, multiple manual failover will fail, + # so only perform one round of failover test, fix it later + if {$trim_method eq "bg"} { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + + failover_and_wait_for_done 4 + wait_for_asm_done + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Verify the events on source, both master and replica + set migrate_event_log [list \ + "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log + assert_equal [R 3 asm.get_cluster_event_log] {} + assert_equal [R 6 asm.get_cluster_event_log] {} + + # Verify the events on destination, both master and replica + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 20 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + puts "R1: [R 1 asm.get_cluster_event_log]" + puts "R4: [R 4 asm.get_cluster_event_log]" + puts "R7: [R 7 asm.get_cluster_event_log]" + fail "ASM import event not received" + } + + # Verify the trim events on destination (partially imported keys are trimmed) + # NOTE: after failover, the new master will initiate the slot trimming, + # and only slot 0 has data, so only slot 0 is trimmed + if {$trim_method eq "active"} { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-0" \ + "keyspace: key_trimmed, key: $key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-0" \ + ] + } else { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-background, slots:0-0" \ + ] + } + wait_for_condition 500 20 { + [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + puts "R1: [R 1 asm.get_cluster_trim_event_log]" + puts "R4: [R 4 asm.get_cluster_trim_event_log]" + puts "R7: [R 7 asm.get_cluster_trim_event_log]" + fail "ASM destination trim event not received" + } + + # cleanup + failover_and_wait_for_done 1 + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + } + } + + foreach with_rdb {"with" "without"} { + test "Test cluster module notifications when replica restart $with_rdb RDB during importing" { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + R 4 config set save "" + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration, 2s delay + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + wait_for_ofs_sync [Rn 1] [Rn 4] + + # restart node 4 + if {$with_rdb eq "with"} { + restart_server -4 true false true save ;# rdb save + } else { + restart_server -4 true false true nosave ;# no rdb saved + } + wait_for_cluster_propagation + + wait_for_asm_done + + # started and completed are paired, and not duplicated + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 10 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + fail "ASM import event not received" + } + + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 4 save ;# save an empty rdb to override previous one + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + } + + test "Test cluster module notifications when replica is disconnected and full resync after importing" { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration, 2s delay + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + wait_for_ofs_sync [Rn 1] [Rn 4] + + # puase node-4 + set r4_pid [S 4 process_id] + pause_process $r4_pid + + # set a small repl-backlog-size and write some commands to make node-4 + # full resync when reconnecting after waking up + set r1_full_sync [S 1 sync_full] + R 1 config set repl-backlog-size 16kb + R 1 client kill type replica + set 1k_str [string repeat "a" 1024] + for {set i 0} {$i < 2000} {incr i} { + R 1 set [slot_key 6000] $1k_str + } + + # after ASM task is completed, wake up node-4 + wait_for_condition 1000 10 { + [CI 1 cluster_slot_migration_active_tasks] == 0 && + [CI 1 cluster_slot_migration_active_trim_running] == 0 + } else { + fail "ASM tasks did not completed" + } + resume_process $r4_pid + + # make sure full resync happens + wait_for_sync [Rn 4] + wait_for_ofs_sync [Rn 1] [Rn 4] + assert_morethan [S 1 sync_full] $r1_full_sync + + # started and completed are paired, and not duplicated + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 10 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + fail "ASM import event not received" + } + + # since ASM task is completed on node-1 before node-4 reconnects, + # no trim event should be received on node-4 + assert_equal {} [R 4 asm.get_cluster_trim_event_log] + + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + + test "Test new master can trim slots when migration is completed and failover occurs on source side" { + R 0 asm.disable_trim ;# can not start slot trimming on source side + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "value" + + # migrate slot 0 from #0 to #1, and wait it completed, but not allow to trim slots + # on source node + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] && + [string match {*completed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not complete" + } + # verify trim is not allowed on source node, and replica node doesn't have trim job either + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 1 [R 0 asm.trim_in_progress] + assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key] + assert_equal 0 [R 3 asm.trim_in_progress] + assert_equal "value" [R 3 asm.read_pending_trim_key $slot0_key] + + set loglines [count_log_lines 0] + + # failover happens on source node, instance #3 become slave, #0 become master + failover_and_wait_for_done 3 + R 0 asm.enable_trim ;# enable trim on old master + + # old master should cancel the pending trim job + wait_for_log_messages 0 {"*Cancelling the pending trim job*"} $loglines 1000 10 + + wait_for_ofs_sync [Rn 3] [Rn 0] + # verify trim is allowed on new master, and the key is trimmed + wait_for_condition 1000 10 { + [R 3 asm.trim_in_progress] == 0 && + [R 3 asm.read_pending_trim_key $slot0_key] eq "" && + [R 0 asm.trim_in_progress] == 0 && + [R 0 asm.read_pending_trim_key $slot0_key] eq "" + } else { + fail "Trim did not complete" + } + + # verify the trim events, use active trim since module is subscribed to trimmed event + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-0" \ + "keyspace: key_trimmed, key: $slot0_key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-0" \ + ] + wait_for_condition 500 20 { + [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + fail "ASM destination trim event not received" + } + + # cleanup + failover_and_wait_for_done 0 + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + + test "Test module replicates commands at the beginning of slot migration " { + R 0 flushall + R 1 flushall + + # Sanity check + assert_equal 0 [R 1 asm.read_keyless_cmd_val] + assert_equal 0 [R 4 asm.read_keyless_cmd_val] + + # Enable module command replication and set a key to be replicated + # Module will replicate two commands: + # 1- A keyless command: asm.keyless_cmd + # 2- SET command for the given key and value + set keyname [slot_key 0 modulekey] + R 0 asm.replicate_module_command 1 $keyname "value" + + setup_slot_migration_with_delay 0 1 0 100 + wait_for_asm_done + wait_for_ofs_sync [Rn 1] [Rn 4] + + # Verify the commands are replicated + assert_equal 1 [R 1 asm.read_keyless_cmd_val] + assert_equal value [R 1 get $keyname] + + # Verify the commands are replicated to replica + R 4 readonly + assert_equal 1 [R 4 asm.read_keyless_cmd_val] + assert_equal value [R 4 get $keyname] + + # cleanup + R 0 asm.replicate_module_command 0 "" "" + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test subcommand propagation during slot migration" { + R 0 flushall + R 1 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + set key [slot_key 0 mykey] + R 0 asm.parent set $key "value" ;# execute a module subcommand + wait_for_asm_done + assert_equal "value" [R 1 GET $key] + + # cleanup + R 0 cluster migration import 0 100 + wait_for_asm_done + } + + test "Test trim method selection based on module keyspace subscription" { + R 0 debug asm-trim-method default + R 1 debug asm-trim-method default + + R 0 flushall + R 1 flushall + + populate_slot 10 -idx 0 -slot 0 + + # Make sure module is subscribed to NOTIFY_KEY_TRIMMED event. In this + # case, active trim must be used. + R 0 asm.subscribe_trimmed_event 1 + set loglines [count_log_lines 0] + R 1 CLUSTER MIGRATION IMPORT 0 15 + wait_for_asm_done + wait_for_log_messages 0 {"*Active trim scheduled for slots: 0-15*"} $loglines 1000 10 + + # Move slots back to node-0. Make sure module is not subscribed to + # NOTIFY_KEY_TRIMMED event. In this case, background trim must be used. + R 1 asm.subscribe_trimmed_event 0 + set loglines [count_log_lines -1] + R 0 CLUSTER MIGRATION IMPORT 0 15 + wait_for_asm_done + wait_for_log_messages -1 {"*Background trim started for slots: 0-15*"} $loglines 1000 10 + + # cleanup + wait_for_asm_done + R 0 asm.subscribe_trimmed_event 1 + R 1 asm.subscribe_trimmed_event 1 + R 0 flushall + R 1 flushall + } + + test "Verify trimmed key value can be read in the server event callback" { + R 0 flushall + set key [slot_key 0] + set value "value123random" + R 0 set $key $value + + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + wait_for_condition 1000 10 { + [R 0 asm.get_last_deleted_key] eq "keyevent: key: $key, value: $value" + } else { + fail "Last deleted key event not received" + } + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + } + + test "Verify module cannot open a key in a slot that is being trimmed" { + R 0 flushall + R 0 debug asm-trim-method active -1 ;# disable active trim + + set key [slot_key 0] + R 0 set $key value + + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 1 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # We cannot open the key since it is in a slot being trimmed + assert_equal {} [R 0 asm.get $key] + + # cleanup + R 0 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + } + + test "Test RM_ClusterGetLocalSlotRanges" { + assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461}} + assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461}} + + R 0 cluster migration import 5463 6000 + wait_for_asm_done + wait_for_cluster_propagation + assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}} + assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}} + + R 0 cluster migration import 5462 5462 6001 10922 + wait_for_asm_done + wait_for_cluster_propagation + assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 10922}} + assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 10922}} + assert_equal [R 1 asm.cluster_get_local_slot_ranges] {} + assert_equal [R 4 asm.cluster_get_local_slot_ranges] {} + } +} + +set testmodule [file normalize tests/modules/atomicslotmigration.so] + +start_cluster 2 0 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no appendonly yes]] { + test "TRIMSLOTS in AOF will work synchronously on restart" { + # When TRIMSLOTS is replayed from AOF during restart, it must execute + # synchronously rather than using active trim. This prevents race + # conditions where subsequent AOF commands might operate on keys + # that should have been trimmed. + + # Subscribe to key trimmed event to force active trim + R 0 asm.subscribe_trimmed_event 1 + populate_slot 1000 -slot 0 + populate_slot 1000 -slot 1 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + + # verify active trim is used + assert_equal 1 [CI 0 cluster_slot_migration_stats_active_trim_completed] + + # restart server and verify aof is loaded + restart_server 0 yes no yes nosave + assert {[scan [regexp -inline {aof_current_size:([\d]*)} [R 0 info persistence]] aof_current_size=%d] > 0} + wait_for_cluster_state "ok" + + # verify TRIMSLOTS in AOF is executed synchronously + assert_equal 0 [CI 0 cluster_slot_migration_stats_active_trim_completed] + assert_equal 1000 [R 0 dbsize] + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + assert_equal 2000 [R 0 dbsize] + R 0 flushall + R 1 flushall + clear_module_event_log + + } + + test "Test trim is disabled when module requests it" { + R 0 asm.disable_trim + + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "value" + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not complete" + } + # since we disable trim, the key should still exist on source, + # we can read it with REDISMODULE_OPEN_KEY_ACCESS_TRIMMED flag + assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key] + assert_equal 1 [R 0 asm.trim_in_progress] + + # enable trim and verify the key is trimmed + R 0 asm.enable_trim + wait_for_condition 1000 10 { + [R 0 asm.read_pending_trim_key $slot0_key] eq "" && + [R 0 asm.trim_in_progress] == 0 + } else { + fail "Trim did not complete" + } + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + clear_module_event_log + } + + test "Can not start new asm task when trim is not allowed" { + # start a migration task, wait it completed but not allow to trim slots + R 0 asm.disable_trim + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not complete" + } + # Can not start new migrating task since trim is disabled + set task_id [R 1 CLUSTER MIGRATION IMPORT 1 1] + wait_for_condition 1000 10 { + [string match {*fail*} [migration_status 1 $task_id state]] && + [string match {*Trim is disabled by module*} [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + R 0 asm.enable_trim + wait_for_asm_done + + # start a migration task, wait it completed but not allow to trim slots + R 0 asm.disable_trim + set task_id [R 1 CLUSTER MIGRATION IMPORT 2 2] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not complete" + } + set logline [count_log_lines 0] + # Can not start new importing task since trim is disabled + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 1] + wait_for_log_messages 0 {"*Can not start import task*trim is disabled by module*"} $logline 1000 10 + R 0 asm.enable_trim + wait_for_asm_done + } +} + +start_server {tags "cluster external:skip"} { + test "Test RM_ClusterGetLocalSlotRanges without cluster" { + r module load $testmodule + assert_equal [r asm.cluster_get_local_slot_ranges] {{0 16383}} + } +} +} diff --git a/examples/redis-unstable/tests/unit/cluster/cli.tcl b/examples/redis-unstable/tests/unit/cluster/cli.tcl new file mode 100644 index 0000000..ce4629e --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/cli.tcl @@ -0,0 +1,415 @@ +# Primitive tests on cluster-enabled redis using redis-cli + +source tests/support/cli.tcl + +# make sure the test infra won't use SELECT +set old_singledb $::singledb +set ::singledb 1 + +# cluster creation is complicated with TLS, and the current tests don't really need that coverage +tags {tls:skip external:skip cluster} { + +# start three servers +set base_conf [list cluster-enabled yes cluster-node-timeout 1000] +start_multiple_servers 3 [list overrides $base_conf] { + + set node1 [srv 0 client] + set node2 [srv -1 client] + set node3 [srv -2 client] + set node3_pid [srv -2 pid] + set node3_rd [redis_deferring_client -2] + + test {Create 3 node cluster} { + exec src/redis-cli --cluster-yes --cluster create \ + 127.0.0.1:[srv 0 port] \ + 127.0.0.1:[srv -1 port] \ + 127.0.0.1:[srv -2 port] + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + } + + test "Run blocking command on cluster node3" { + # key9184688 is mapped to slot 10923 (first slot of node 3) + $node3_rd brpop key9184688 0 + $node3_rd flush + + wait_for_condition 50 100 { + [s -2 blocked_clients] eq {1} + } else { + fail "Client not blocked" + } + } + + test "Perform a Resharding" { + exec src/redis-cli --cluster-yes --cluster reshard 127.0.0.1:[srv -2 port] \ + --cluster-to [$node1 cluster myid] \ + --cluster-from [$node3 cluster myid] \ + --cluster-slots 1 + } + + test "Verify command got unblocked after resharding" { + # this (read) will wait for the node3 to realize the new topology + assert_error {*MOVED*} {$node3_rd read} + + # verify there are no blocked clients + assert_equal [s 0 blocked_clients] {0} + assert_equal [s -1 blocked_clients] {0} + assert_equal [s -2 blocked_clients] {0} + } + + test "Wait for cluster to be stable" { + # Cluster check just verifies the config state is self-consistent, + # waiting for cluster_state to be okay is an independent check that all the + # nodes actually believe each other are healthy, prevent cluster down error. + wait_for_condition 1000 50 { + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv 0 port]}] == 0 && + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -1 port]}] == 0 && + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -2 port]}] == 0 && + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + } + + set node1_rd [redis_deferring_client 0] + + test "use previous hostip in \"cluster-preferred-endpoint-type unknown-endpoint\" mode" { + + # backup and set cluster-preferred-endpoint-type unknown-endpoint + set endpoint_type_before_set [lindex [split [$node1 CONFIG GET cluster-preferred-endpoint-type] " "] 1] + $node1 CONFIG SET cluster-preferred-endpoint-type unknown-endpoint + + # when redis-cli not in cluster mode, return MOVE with empty host + set slot_for_foo [$node1 CLUSTER KEYSLOT foo] + assert_error "*MOVED $slot_for_foo :*" {$node1 set foo bar} + + # when in cluster mode, redirect using previous hostip + assert_equal "[exec src/redis-cli -h 127.0.0.1 -p [srv 0 port] -c set foo bar]" {OK} + assert_match "[exec src/redis-cli -h 127.0.0.1 -p [srv 0 port] -c get foo]" {bar} + + assert_equal [$node1 CONFIG SET cluster-preferred-endpoint-type "$endpoint_type_before_set"] {OK} + } + + test "Sanity test push cmd after resharding" { + assert_error {*MOVED*} {$node3 lpush key9184688 v1} + + $node1_rd brpop key9184688 0 + $node1_rd flush + + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + puts "Client not blocked" + puts "read from blocked client: [$node1_rd read]" + fail "Client not blocked" + } + + $node1 lpush key9184688 v2 + assert_equal {key9184688 v2} [$node1_rd read] + } + + $node3_rd close + + test "Run blocking command again on cluster node1" { + $node1 del key9184688 + # key9184688 is mapped to slot 10923 which has been moved to node1 + $node1_rd brpop key9184688 0 + $node1_rd flush + + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client not blocked" + } + } + + test "Kill a cluster node and wait for fail state" { + # kill node3 in cluster + pause_process $node3_pid + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {fail} && + [CI 1 cluster_state] eq {fail} + } else { + fail "Cluster doesn't fail" + } + } + + test "Verify command got unblocked after cluster failure" { + assert_error {*CLUSTERDOWN*} {$node1_rd read} + + # verify there are no blocked clients + assert_equal [s 0 blocked_clients] {0} + assert_equal [s -1 blocked_clients] {0} + } + + resume_process $node3_pid + $node1_rd close + +} ;# stop servers + +# Test redis-cli -- cluster create, add-node, call. +# Test that functions are propagated on add-node +start_multiple_servers 5 [list overrides $base_conf] { + + set node4_rd [redis_client -3] + set node5_rd [redis_client -4] + + test {Functions are added to new node on redis-cli cluster add-node} { + exec src/redis-cli --cluster-yes --cluster create \ + 127.0.0.1:[srv 0 port] \ + 127.0.0.1:[srv -1 port] \ + 127.0.0.1:[srv -2 port] + + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # upload a function to all the cluster + exec src/redis-cli --cluster-yes --cluster call 127.0.0.1:[srv 0 port] \ + FUNCTION LOAD {#!lua name=TEST + redis.register_function('test', function() return 'hello' end) + } + + # adding node to the cluster + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -3 port] \ + 127.0.0.1:[srv 0 port] + + wait_for_cluster_size 4 + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} && + [CI 3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # make sure 'test' function was added to the new node + assert_equal {{library_name TEST engine LUA functions {{name test description {} flags {}}}}} [$node4_rd FUNCTION LIST] + + # add function to node 5 + assert_equal {TEST} [$node5_rd FUNCTION LOAD {#!lua name=TEST + redis.register_function('test', function() return 'hello' end) + }] + + # make sure functions was added to node 5 + assert_equal {{library_name TEST engine LUA functions {{name test description {} flags {}}}}} [$node5_rd FUNCTION LIST] + + # adding node 5 to the cluster should failed because it already contains the 'test' function + catch { + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -4 port] \ + 127.0.0.1:[srv 0 port] + } e + assert_match {*node already contains functions*} $e + } +} ;# stop servers + +# Test redis-cli --cluster create, add-node. +# Test that one slot can be migrated to and then away from the new node. +test {Migrate the last slot away from a node using redis-cli} { + start_multiple_servers 4 [list overrides $base_conf] { + + # Create a cluster of 3 nodes + exec src/redis-cli --cluster-yes --cluster create \ + 127.0.0.1:[srv 0 port] \ + 127.0.0.1:[srv -1 port] \ + 127.0.0.1:[srv -2 port] + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Insert some data + assert_equal OK [exec src/redis-cli -c -p [srv 0 port] SET foo bar] + set slot [exec src/redis-cli -c -p [srv 0 port] CLUSTER KEYSLOT foo] + + # Add new node to the cluster + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -3 port] \ + 127.0.0.1:[srv 0 port] + + # First we wait for new node to be recognized by entire cluster + wait_for_cluster_size 4 + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} && + [CI 3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + set newnode_r [redis_client -3] + set newnode_id [$newnode_r CLUSTER MYID] + + # Find out which node has the key "foo" by asking the new node for a + # redirect. + catch { $newnode_r get foo } e + assert_match "MOVED $slot *" $e + lassign [split [lindex $e 2] :] owner_host owner_port + set owner_r [redis $owner_host $owner_port 0 $::tls] + set owner_id [$owner_r CLUSTER MYID] + + # Move slot to new node using plain Redis commands + assert_equal OK [$newnode_r CLUSTER SETSLOT $slot IMPORTING $owner_id] + assert_equal OK [$owner_r CLUSTER SETSLOT $slot MIGRATING $newnode_id] + assert_equal {foo} [$owner_r CLUSTER GETKEYSINSLOT $slot 10] + assert_equal OK [$owner_r MIGRATE 127.0.0.1 [srv -3 port] "" 0 5000 KEYS foo] + assert_equal OK [$newnode_r CLUSTER SETSLOT $slot NODE $newnode_id] + assert_equal OK [$owner_r CLUSTER SETSLOT $slot NODE $newnode_id] + + # Using --cluster check make sure we won't get `Not all slots are covered by nodes`. + # Wait for the cluster to become stable make sure the cluster is up during MIGRATE. + wait_for_condition 1000 50 { + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv 0 port]}] == 0 && + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -1 port]}] == 0 && + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -2 port]}] == 0 && + [catch {exec src/redis-cli --cluster check 127.0.0.1:[srv -3 port]}] == 0 && + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} && + [CI 3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Move the only slot back to original node using redis-cli + exec src/redis-cli --cluster reshard 127.0.0.1:[srv -3 port] \ + --cluster-from $newnode_id \ + --cluster-to $owner_id \ + --cluster-slots 1 \ + --cluster-yes + + # The empty node will become a replica of the new owner before the + # `MOVED` check, so let's wait for the cluster to become stable. + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} && + [CI 3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Check that the key foo has been migrated back to the original owner. + catch { $newnode_r get foo } e + assert_equal "MOVED $slot $owner_host:$owner_port" $e + + # Check that the empty node has turned itself into a replica of the new + # owner and that the new owner knows that. + wait_for_condition 1000 50 { + [string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]] + } else { + fail "Empty node didn't turn itself into a replica." + } + } +} + +foreach ip_or_localhost {127.0.0.1 localhost} { + +# Test redis-cli --cluster create, add-node with cluster-port. +# Create five nodes, three with custom cluster_port and two with default values. +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1]] { +start_server [list overrides [list cluster-enabled yes cluster-node-timeout 1 cluster-port [find_available_port $::baseport $::portcount]]] { + + # The first three are used to test --cluster create. + # The last two are used to test --cluster add-node + + test "redis-cli -4 --cluster create using $ip_or_localhost with cluster-port" { + exec src/redis-cli -4 --cluster-yes --cluster create \ + $ip_or_localhost:[srv 0 port] \ + $ip_or_localhost:[srv -1 port] \ + $ip_or_localhost:[srv -2 port] + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Make sure each node can meet other nodes + assert_equal 3 [CI 0 cluster_known_nodes] + assert_equal 3 [CI 1 cluster_known_nodes] + assert_equal 3 [CI 2 cluster_known_nodes] + } + + test "redis-cli -4 --cluster add-node using $ip_or_localhost with cluster-port" { + # Adding node to the cluster (without cluster-port) + exec src/redis-cli -4 --cluster-yes --cluster add-node \ + $ip_or_localhost:[srv -3 port] \ + $ip_or_localhost:[srv 0 port] + + wait_for_cluster_size 4 + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} && + [CI 3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Adding node to the cluster (with cluster-port) + exec src/redis-cli -4 --cluster-yes --cluster add-node \ + $ip_or_localhost:[srv -4 port] \ + $ip_or_localhost:[srv 0 port] + + wait_for_cluster_size 5 + + wait_for_condition 1000 50 { + [CI 0 cluster_state] eq {ok} && + [CI 1 cluster_state] eq {ok} && + [CI 2 cluster_state] eq {ok} && + [CI 3 cluster_state] eq {ok} && + [CI 4 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # Make sure each node can meet other nodes + assert_equal 5 [CI 0 cluster_known_nodes] + assert_equal 5 [CI 1 cluster_known_nodes] + assert_equal 5 [CI 2 cluster_known_nodes] + assert_equal 5 [CI 3 cluster_known_nodes] + assert_equal 5 [CI 4 cluster_known_nodes] + } +# stop 5 servers +} +} +} +} +} + +} ;# foreach ip_or_localhost + +} ;# tags + +set ::singledb $old_singledb diff --git a/examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl b/examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl new file mode 100644 index 0000000..a099fa7 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/cluster-response-tls.tcl @@ -0,0 +1,110 @@ +source tests/support/cluster.tcl + +proc get_port_from_moved_error {e} { + set ip_port [lindex [split $e " "] 2] + return [lindex [split $ip_port ":"] 1] +} + +proc get_pport_by_port {port} { + foreach srv $::servers { + set srv_port [dict get $srv port] + if {$port == $srv_port} { + return [dict get $srv pport] + } + } + return 0 +} + +proc get_port_from_node_info {line} { + set fields [split $line " "] + set addr [lindex $fields 1] + set ip_port [lindex [split $addr "@"] 0] + return [lindex [split $ip_port ":"] 1] +} + +proc cluster_response_tls {tls_cluster} { + + test "CLUSTER SLOTS with different connection type -- tls-cluster $tls_cluster" { + set slots1 [R 0 cluster slots] + set pport [srv 0 pport] + set cluster_client [redis_cluster 127.0.0.1:$pport 0] + set slots2 [$cluster_client cluster slots] + $cluster_client close + # Compare the ports in the first row + assert_no_match [lindex $slots1 0 2 1] [lindex $slots2 0 2 1] + } + + test "CLUSTER NODES return port according to connection type -- tls-cluster $tls_cluster" { + set nodes [R 0 cluster nodes] + set port1 [get_port_from_node_info [lindex [split $nodes "\r\n"] 0]] + set pport [srv 0 pport] + set cluster_client [redis_cluster 127.0.0.1:$pport 0] + set nodes [$cluster_client cluster nodes] + set port2 [get_port_from_node_info [lindex [split $nodes "\r\n"] 0]] + $cluster_client close + assert_not_equal $port1 $port2 + } + + set cluster [redis_cluster 127.0.0.1:[srv 0 port]] + set cluster_pport [redis_cluster 127.0.0.1:[srv 0 pport] 0] + $cluster refresh_nodes_map + + test "Set many keys in the cluster -- tls-cluster $tls_cluster" { + for {set i 0} {$i < 5000} {incr i} { + $cluster set $i $i + assert { [$cluster get $i] eq $i } + } + } + + test "Test cluster responses during migration of slot x -- tls-cluster $tls_cluster" { + set slot 10 + array set nodefrom [$cluster masternode_for_slot $slot] + array set nodeto [$cluster masternode_notfor_slot $slot] + $nodeto(link) cluster setslot $slot importing $nodefrom(id) + $nodefrom(link) cluster setslot $slot migrating $nodeto(id) + + # Get a key from that slot + set key [$nodefrom(link) cluster GETKEYSINSLOT $slot "1"] + # MOVED REPLY + catch {$nodeto(link) set $key "newVal"} e_moved1 + assert_match "*MOVED*" $e_moved1 + # ASK REPLY + catch {$nodefrom(link) set "abc{$key}" "newVal"} e_ask1 + assert_match "*ASK*" $e_ask1 + + # UNSTABLE REPLY + assert_error "*TRYAGAIN*" {$nodefrom(link) mset "a{$key}" "newVal" $key "newVal2"} + + # Connecting using another protocol + array set nodefrom_pport [$cluster_pport masternode_for_slot $slot] + array set nodeto_pport [$cluster_pport masternode_notfor_slot $slot] + + # MOVED REPLY + catch {$nodeto_pport(link) set $key "newVal"} e_moved2 + assert_match "*MOVED*" $e_moved2 + # ASK REPLY + catch {$nodefrom_pport(link) set "abc{$key}" "newVal"} e_ask2 + assert_match "*ASK*" $e_ask2 + # Compare MOVED error's port + set port1 [get_port_from_moved_error $e_moved1] + set port2 [get_port_from_moved_error $e_moved2] + assert_not_equal $port1 $port2 + assert_equal $port1 $nodefrom(port) + assert_equal $port2 [get_pport_by_port $nodefrom(port)] + # Compare ASK error's port + set port1 [get_port_from_moved_error $e_ask1] + set port2 [get_port_from_moved_error $e_ask2] + assert_not_equal $port1 $port2 + assert_equal $port1 $nodeto(port) + assert_equal $port2 [get_pport_by_port $nodeto(port)] + } +} + +if {$::tls} { + start_cluster 3 3 {tags {external:skip cluster tls} overrides {tls-cluster yes tls-replication yes}} { + cluster_response_tls yes + } + start_cluster 3 3 {tags {external:skip cluster tls} overrides {tls-cluster no tls-replication no}} { + cluster_response_tls no + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/failure-marking.tcl b/examples/redis-unstable/tests/unit/cluster/failure-marking.tcl new file mode 100644 index 0000000..c4746c8 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/failure-marking.tcl @@ -0,0 +1,53 @@ +# Test a single primary can mark replica as `fail` +start_cluster 1 1 {tags {external:skip cluster}} { + + test "Verify that single primary marks replica as failed" { + set primary [srv -0 client] + + set replica1 [srv -1 client] + set replica1_pid [srv -1 pid] + set replica1_instance_id [dict get [cluster_get_myself 1] id] + + assert {[lindex [$primary role] 0] eq {master}} + assert {[lindex [$replica1 role] 0] eq {slave}} + + wait_for_sync $replica1 + + pause_process $replica1_pid + + wait_node_marked_fail 0 $replica1_instance_id + } +} + +# Test multiple primaries wait for a quorum and then mark a replica as `fail` +start_cluster 2 1 {tags {external:skip cluster}} { + + test "Verify that multiple primaries mark replica as failed" { + set primary1 [srv -0 client] + + set primary2 [srv -1 client] + set primary2_pid [srv -1 pid] + + set replica1 [srv -2 client] + set replica1_pid [srv -2 pid] + set replica1_instance_id [dict get [cluster_get_myself 2] id] + + assert {[lindex [$primary1 role] 0] eq {master}} + assert {[lindex [$primary2 role] 0] eq {master}} + assert {[lindex [$replica1 role] 0] eq {slave}} + + wait_for_sync $replica1 + + pause_process $replica1_pid + + # Pause other primary to allow time for pfail flag to appear + pause_process $primary2_pid + + wait_node_marked_pfail 0 $replica1_instance_id + + # Resume other primary and wait for to show replica as failed + resume_process $primary2_pid + + wait_node_marked_fail 0 $replica1_instance_id + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/hostnames.tcl b/examples/redis-unstable/tests/unit/cluster/hostnames.tcl new file mode 100644 index 0000000..2236228 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/hostnames.tcl @@ -0,0 +1,230 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of (a) the Redis Source Available License 2.0 +# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the +# GNU Affero General Public License v3 (AGPLv3). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + +proc get_slot_field {slot_output shard_id node_id attrib_id} { + return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id] +} + +# Start a cluster with 3 masters and 4 replicas. +# These tests rely on specific node ordering, so make sure no node fails over. +start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-replica-no-failover yes}} { +test "Set cluster hostnames and verify they are propagated" { + for {set j 0} {$j < [llength $::servers]} {incr j} { + R $j config set cluster-announce-hostname "host-$j.com" + } + + wait_for_condition 50 100 { + [are_hostnames_propagated "host-*.com"] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + + # Now that everything is propagated, assert everyone agrees + wait_for_cluster_propagation +} + +test "Update hostnames and make sure they are all eventually propagated" { + for {set j 0} {$j < [llength $::servers]} {incr j} { + R $j config set cluster-announce-hostname "host-updated-$j.com" + } + + wait_for_condition 50 100 { + [are_hostnames_propagated "host-updated-*.com"] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + + # Now that everything is propagated, assert everyone agrees + wait_for_cluster_propagation +} + +test "Remove hostnames and make sure they are all eventually propagated" { + for {set j 0} {$j < [llength $::servers]} {incr j} { + R $j config set cluster-announce-hostname "" + } + + wait_for_condition 50 100 { + [are_hostnames_propagated ""] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + + # Now that everything is propagated, assert everyone agrees + wait_for_cluster_propagation +} + +test "Verify cluster-preferred-endpoint-type behavior for redirects and info" { + R 0 config set cluster-announce-hostname "me.com" + R 1 config set cluster-announce-hostname "" + R 2 config set cluster-announce-hostname "them.com" + + wait_for_cluster_propagation + + # Verify default behavior + set slot_result [R 0 cluster slots] + assert_equal "" [lindex [get_slot_field $slot_result 0 2 0] 1] + assert_equal "" [lindex [get_slot_field $slot_result 2 2 0] 1] + assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 0] + assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 1] + assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 0] + assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 1] + + # Redirect will use the IP address + catch {R 0 set foo foo} redir_err + assert_match "MOVED * 127.0.0.1:*" $redir_err + + # Verify prefer hostname behavior + R 0 config set cluster-preferred-endpoint-type hostname + + set slot_result [R 0 cluster slots] + assert_equal "me.com" [get_slot_field $slot_result 0 2 0] + assert_equal "them.com" [get_slot_field $slot_result 2 2 0] + + # Redirect should use hostname + catch {R 0 set foo foo} redir_err + assert_match "MOVED * them.com:*" $redir_err + + # Redirect to an unknown hostname returns ? + catch {R 0 set barfoo bar} redir_err + assert_match "MOVED * ?:*" $redir_err + + # Verify unknown hostname behavior + R 0 config set cluster-preferred-endpoint-type unknown-endpoint + + # Verify default behavior + set slot_result [R 0 cluster slots] + assert_equal "ip" [lindex [get_slot_field $slot_result 0 2 3] 0] + assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 0 2 3] 1] + assert_equal "ip" [lindex [get_slot_field $slot_result 2 2 3] 0] + assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 2 2 3] 1] + assert_equal "ip" [lindex [get_slot_field $slot_result 1 2 3] 0] + assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 1 2 3] 1] + # Not required by the protocol, but IP comes before hostname + assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 2] + assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 3] + assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 2] + assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 3] + + # This node doesn't have a hostname + assert_equal 2 [llength [get_slot_field $slot_result 1 2 3]] + + # Redirect should use empty string + catch {R 0 set foo foo} redir_err + assert_match "MOVED * :*" $redir_err + + R 0 config set cluster-preferred-endpoint-type ip +} + +test "Verify the nodes configured with prefer hostname only show hostname for new nodes" { + # Have everyone forget node 6 and isolate it from the cluster. + isolate_node 6 + + set primaries 3 + for {set j 0} {$j < $primaries} {incr j} { + # Set hostnames for the masters, now that the node is isolated + R $j config set cluster-announce-hostname "shard-$j.com" + } + + # Prevent Node 0 and Node 6 from properly meeting, + # they'll hang in the handshake phase. This allows us to + # test the case where we "know" about it but haven't + # successfully retrieved information about it yet. + R 0 DEBUG DROP-CLUSTER-PACKET-FILTER 0 + R 6 DEBUG DROP-CLUSTER-PACKET-FILTER 0 + + # Have a replica meet the isolated node + R 3 cluster meet 127.0.0.1 [srv -6 port] + + # Wait for the isolated node to learn about the rest of the cluster, + # which correspond to a single entry in cluster nodes. Note this + # doesn't mean the isolated node has successfully contacted each + # node. + wait_for_condition 50 100 { + [llength [split [R 6 CLUSTER NODES] "\n"]] eq [expr [llength $::servers] + 1] + } else { + fail "Isolated node didn't learn about the rest of the cluster *" + } + + # Now, we wait until the two nodes that aren't filtering packets + # to accept our isolated nodes connections. At this point they will + # start showing up in cluster slots. + wait_for_condition 50 100 { + [llength [R 6 CLUSTER SLOTS]] eq 2 + } else { + fail "Node did not learn about the 2 shards it can talk to" + } + wait_for_condition 50 100 { + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com" + } else { + fail "hostname for shard-1 didn't reach node 6" + } + + wait_for_condition 50 100 { + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com" + } else { + fail "hostname for shard-2 didn't reach node 6" + } + + # Also make sure we know about the isolated master, we + # just can't reach it. + set master_id [R 0 CLUSTER MYID] + assert_match "*$master_id*" [R 6 CLUSTER NODES] + + # Stop dropping cluster packets, and make sure everything + # stabilizes + R 0 DEBUG DROP-CLUSTER-PACKET-FILTER -1 + R 6 DEBUG DROP-CLUSTER-PACKET-FILTER -1 + + # This operation sometimes spikes to around 5 seconds to resolve the state, + # so it has a higher timeout. + wait_for_condition 50 500 { + [llength [R 6 CLUSTER SLOTS]] eq 3 + } else { + fail "Node did not learn about the 2 shards it can talk to" + } + + for {set j 0} {$j < $primaries} {incr j} { + wait_for_condition 50 100 { + [lindex [get_slot_field [R 6 CLUSTER SLOTS] $j 2 3] 1] eq "shard-$j.com" + } else { + fail "hostname information for shard-$j didn't reach node 6" + } + } +} + +test "Test restart will keep hostname information" { + # Set a new hostname, reboot and make sure it sticks + R 0 config set cluster-announce-hostname "restart-1.com" + + # Store the hostname in the config + R 0 config rewrite + + restart_server 0 true false + set slot_result [R 0 CLUSTER SLOTS] + assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "restart-1.com" + + # As a sanity check, make sure everyone eventually agrees + wait_for_cluster_propagation +} + +test "Test hostname validation" { + catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err + assert_match "*Hostnames must be less than 256 characters*" $err + catch {R 0 config set cluster-announce-hostname "?.com"} err + assert_match "*Hostnames may only contain alphanumeric characters, hyphens or dots*" $err + + # Note this isn't a valid hostname, but it passes our internal validation + R 0 config set cluster-announce-hostname "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-." +} +} diff --git a/examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl b/examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl new file mode 100644 index 0000000..a595ca6 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/human-announced-nodename.tcl @@ -0,0 +1,29 @@ +# Check if cluster's view of human announced nodename is reported in logs +start_cluster 3 0 {tags {external:skip cluster}} { + test "Set cluster human announced nodename and let it propagate" { + for {set j 0} {$j < [llength $::servers]} {incr j} { + R $j config set cluster-announce-hostname "host-$j.com" + R $j config set cluster-announce-human-nodename "nodename-$j" + } + + # We wait for everyone to agree on the hostnames. Since they are gossiped + # the same way as nodenames, it implies everyone knows the nodenames too. + wait_for_condition 50 100 { + [are_hostnames_propagated "host-*.com"] eq 1 + } else { + fail "cluster hostnames were not propagated" + } + } + + test "Human nodenames are visible in log messages" { + # Pause instance 0, so everyone thinks it is dead + pause_process [srv 0 pid] + + # We're going to use a message we will know will be sent, node unreachable, + # since it includes the other node gossiping. + wait_for_log_messages -1 {"*Node * (nodename-2) reported node * (nodename-0) as not reachable*"} 0 20 500 + wait_for_log_messages -2 {"*Node * (nodename-1) reported node * (nodename-0) as not reachable*"} 0 20 500 + + resume_process [srv 0 pid] + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/internal-secret.tcl b/examples/redis-unstable/tests/unit/cluster/internal-secret.tcl new file mode 100644 index 0000000..f310b74 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/internal-secret.tcl @@ -0,0 +1,71 @@ +proc num_unique_secrets {num_nodes} { + set secrets [list] + for {set i 0} {$i < $num_nodes} {incr i} { + lappend secrets [R $i debug internal_secret] + } + set num_secrets [llength [lsort -unique $secrets]] + return $num_secrets +} + +proc wait_for_secret_sync {maxtries delay num_nodes} { + wait_for_condition $maxtries $delay { + [num_unique_secrets $num_nodes] eq 1 + } else { + fail "Failed waiting for secrets to sync" + } +} + +start_cluster 3 3 {tags {external:skip cluster}} { + test "Test internal secret sync" { + wait_for_secret_sync 50 100 6 + } + + + set first_shard_host [srv 0 host] + set first_shard_port [srv 0 port] + + if {$::verbose} { + puts {cluster internal secret:} + puts [R 1 debug internal_secret] + } + + test "Join a node to the cluster and make sure it gets the same secret" { + start_server {tags {"external:skip"} overrides {cluster-enabled {yes}}} { + r cluster meet $first_shard_host $first_shard_port + wait_for_condition 50 100 { + [r debug internal_secret] eq [R 1 debug internal_secret] + } else { + puts [r debug internal_secret] + puts [R 1 debug internal_secret] + fail "Secrets not match" + } + } + } + + test "Join another cluster, make sure clusters sync on the internal secret" { + start_server {tags {"external:skip"} overrides {cluster-enabled {yes}}} { + set new_shard_host [srv 0 host] + set new_shard_port [srv 0 port] + start_server {tags {"external:skip"} overrides {cluster-enabled {yes}}} { + r cluster meet $new_shard_host $new_shard_port + wait_for_condition 50 100 { + [r debug internal_secret] eq [r -1 debug internal_secret] + } else { + puts [r debug internal_secret] + puts [r -1 debug internal_secret] + fail "Secrets not match" + } + if {$::verbose} { + puts {new cluster internal secret:} + puts [r -1 debug internal_secret] + } + r cluster meet $first_shard_host $first_shard_port + wait_for_secret_sync 50 100 8 + if {$::verbose} { + puts {internal secret after join to bigger cluster:} + puts [r -1 debug internal_secret] + } + } + } + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/links.tcl b/examples/redis-unstable/tests/unit/cluster/links.tcl new file mode 100644 index 0000000..a202c37 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/links.tcl @@ -0,0 +1,292 @@ +proc get_links_with_peer {this_instance_id peer_nodename} { + set links [R $this_instance_id cluster links] + set links_with_peer {} + foreach l $links { + if {[dict get $l node] eq $peer_nodename} { + lappend links_with_peer $l + } + } + return $links_with_peer +} + +# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that +# corresponds to the link established toward a peer identified by `peer_nodename` +proc get_link_to_peer {this_instance_id peer_nodename} { + set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] + foreach l $links_with_peer { + if {[dict get $l direction] eq "to"} { + return $l + } + } + return {} +} + +# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that +# corresponds to the link accepted from a peer identified by `peer_nodename` +proc get_link_from_peer {this_instance_id peer_nodename} { + set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] + foreach l $links_with_peer { + if {[dict get $l direction] eq "from"} { + return $l + } + } + return {} +} + +# Reset cluster links to their original state +proc reset_links {id} { + set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1] + + # Set a 1 byte limit and wait for cluster cron to run + # (executes every 100ms) and terminate links + R $id CONFIG SET cluster-link-sendbuf-limit 1 + after 150 + + # Reset limit + R $id CONFIG SET cluster-link-sendbuf-limit $limit + + # Wait until the cluster links come back up for each node + wait_for_condition 50 100 { + [number_of_links $id] == [expr [number_of_peers $id] * 2] + } else { + fail "Cluster links did not come back up" + } +} + +proc number_of_peers {id} { + expr [llength $::servers] - 1 +} + +proc number_of_links {id} { + llength [R $id cluster links] +} + +proc publish_messages {server num_msgs msg_size} { + for {set i 0} {$i < $num_msgs} {incr i} { + $server PUBLISH channel [string repeat "x" $msg_size] + } +} + +start_cluster 1 2 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica1 [Rn $replica1_id] + + test "Broadcast message across a cluster shard while a cluster link is down" { + set replica1_node_id [$replica1 CLUSTER MYID] + + set channelname ch3 + + # subscribe on replica1 + set subscribeclient1 [redis_deferring_client -1] + $subscribeclient1 deferred 1 + $subscribeclient1 SSUBSCRIBE $channelname + $subscribeclient1 read + + # subscribe on replica2 + set subscribeclient2 [redis_deferring_client -2] + $subscribeclient2 deferred 1 + $subscribeclient2 SSUBSCRIBE $channelname + $subscribeclient2 read + + # Verify number of links with cluster stable state + assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id] + + # Disconnect the cluster between primary and replica1 and publish a message. + $primary MULTI + $primary DEBUG CLUSTERLINK KILL TO $replica1_node_id + $primary SPUBLISH $channelname hello + set res [$primary EXEC] + + # Verify no client exists on the primary to receive the published message. + assert_equal $res {OK 0} + + # Wait for all the cluster links are healthy + wait_for_condition 50 100 { + [number_of_peers $primary_id]*2 == [number_of_links $primary_id] + } else { + fail "All peer links couldn't be established" + } + + # Publish a message afterwards. + $primary SPUBLISH $channelname world + + # Verify replica1 has received only (world) / hello is lost. + assert_equal "smessage ch3 world" [$subscribeclient1 read] + + # Verify replica2 has received both messages (hello/world) + assert_equal "smessage ch3 hello" [$subscribeclient2 read] + assert_equal "smessage ch3 world" [$subscribeclient2 read] + } {} {needs:debug} +} + +start_cluster 3 0 {tags {external:skip cluster}} { + test "Each node has two links with each peer" { + for {set id 0} {$id < [llength $::servers]} {incr id} { + # Assert that from point of view of each node, there are two links for + # each peer. It might take a while for cluster to stabilize so wait up + # to 5 seconds. + wait_for_condition 50 100 { + [number_of_peers $id]*2 == [number_of_links $id] + } else { + assert_equal [expr [number_of_peers $id]*2] [number_of_links $id] + } + + set nodes [get_cluster_nodes $id] + set links [R $id cluster links] + + # For each peer there should be exactly one + # link "to" it and one link "from" it. + foreach n $nodes { + if {[cluster_has_flag $n myself]} continue + set peer [dict get $n id] + set to 0 + set from 0 + foreach l $links { + if {[dict get $l node] eq $peer} { + if {[dict get $l direction] eq "to"} { + incr to + } elseif {[dict get $l direction] eq "from"} { + incr from + } + } + } + assert {$to eq 1} + assert {$from eq 1} + } + } + } + + test {Validate cluster links format} { + set lines [R 0 cluster links] + foreach l $lines { + if {$l eq {}} continue + assert_equal [llength $l] 12 + assert_equal 1 [dict exists $l "direction"] + assert_equal 1 [dict exists $l "node"] + assert_equal 1 [dict exists $l "create-time"] + assert_equal 1 [dict exists $l "events"] + assert_equal 1 [dict exists $l "send-buffer-allocated"] + assert_equal 1 [dict exists $l "send-buffer-used"] + } + } + + set primary1_id 0 + set primary2_id 1 + + set primary1 [Rn $primary1_id] + set primary2 [Rn $primary2_id] + + test "Disconnect link when send buffer limit reached" { + # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts + set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1] + $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000] + + # Get primary1's links with primary2 + set primary2_name [dict get [cluster_get_myself $primary2_id] id] + set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] + + # On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be + # overflowed by regular gossip messages but also small enough that it doesn't take too much + # memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this + # limit is overflowed in some RAM-limited test environments. + set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1] + $primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024] + assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0} + + # To manufacture an ever-growing send buffer from primary1 to primary2, + # make primary2 unresponsive. + set primary2_pid [srv [expr -1*$primary2_id] pid] + pause_process $primary2_pid + + # On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from + # primary1 to primary2 exceeds buffer limit therefore be dropped. + # For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP + # receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB + # messages should be sufficient. + set i 0 + wait_for_condition 100 0 { + [catch {incr i} e] == 0 && + [catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 && + [catch {after 500} e] == 0 && + [CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1 + } else { + fail "Cluster link not freed as expected" + } + + # A new link to primary2 should have been recreated + set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + assert {[dict get $new_link_p1_to_p2 create-time] > [dict get $orig_link_p1_to_p2 create-time]} + + # Link from primary2 should not be affected + set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] + assert {[dict get $same_link_p1_from_p2 create-time] eq [dict get $orig_link_p1_from_p2 create-time]} + + # Revive primary2 + resume_process $primary2_pid + + # Reset configs on primary1 so config changes don't leak out to other tests + $primary1 CONFIG set cluster-node-timeout $oldtimeout + $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit + + reset_links $primary1_id + } + + test "Link memory increases with publishes" { + set server_id 0 + set server [Rn $server_id] + set msg_size 10000 + set num_msgs 10 + + # Remove any sendbuf limit + $primary1 CONFIG set cluster-link-sendbuf-limit 0 + + # Publish ~100KB to one of the servers + $server MULTI + $server INFO memory + publish_messages $server $num_msgs $msg_size + $server INFO memory + set res [$server EXEC] + + set link_mem_before_pubs [getInfoProperty $res mem_cluster_links] + + # Remove the first half of the response string which contains the + # first "INFO memory" results and search for the property again + set res [string range $res [expr [string length $res] / 2] end] + set link_mem_after_pubs [getInfoProperty $res mem_cluster_links] + + # We expect the memory to have increased by more than + # the culmulative size of the publish messages + set mem_diff_floor [expr $msg_size * $num_msgs] + set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs] + assert {$mem_diff > $mem_diff_floor} + + # Reset links to ensure no leftover data for the next test + reset_links $server_id + } + + test "Link memory resets after publish messages flush" { + set server [Rn 0] + set msg_size 100000 + set num_msgs 10 + + set link_mem_before [status $server mem_cluster_links] + + # Publish ~1MB to one of the servers + $server MULTI + publish_messages $server $num_msgs $msg_size + $server EXEC + + # Wait until the cluster link memory has returned to below the pre-publish value. + # We can't guarantee it returns to the exact same value since gossip messages + # can cause the values to fluctuate. + wait_for_condition 1000 500 { + [status $server mem_cluster_links] <= $link_mem_before + } else { + fail "Cluster link memory did not settle back to expected range" + } + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/misc.tcl b/examples/redis-unstable/tests/unit/cluster/misc.tcl new file mode 100644 index 0000000..62bdcf7 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/misc.tcl @@ -0,0 +1,36 @@ +start_cluster 2 2 {tags {external:skip cluster}} { + test {Key lazy expires during key migration} { + R 0 DEBUG SET-ACTIVE-EXPIRE 0 + + set key_slot [R 0 CLUSTER KEYSLOT FOO] + R 0 set FOO BAR PX 10 + set src_id [R 0 CLUSTER MYID] + set trg_id [R 1 CLUSTER MYID] + R 0 CLUSTER SETSLOT $key_slot MIGRATING $trg_id + R 1 CLUSTER SETSLOT $key_slot IMPORTING $src_id + after 11 + assert_error {ASK*} {R 0 GET FOO} + R 0 ping + } {PONG} + + test "Coverage: Basic cluster commands" { + assert_equal {OK} [R 0 CLUSTER saveconfig] + + set id [R 0 CLUSTER MYID] + assert_equal {0} [R 0 CLUSTER count-failure-reports $id] + + R 0 flushall + assert_equal {OK} [R 0 CLUSTER flushslots] + } + + test "CROSSSLOT error for keys in different slots" { + # Test MSET with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MSET foo bar baz qux} + + # Test DEL with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 DEL foo bar} + + # Test MGET with keys in different slots + assert_error {*CROSSSLOT Keys in request don't hash to the same slot*} {R 0 MGET foo bar} + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl b/examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl new file mode 100644 index 0000000..5d2d03e --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/multi-slot-operations.tcl @@ -0,0 +1,182 @@ +# This test uses a custom slot allocation for testing +proc cluster_allocate_with_continuous_slots_local {n} { + R 0 cluster ADDSLOTSRANGE 0 3276 + R 1 cluster ADDSLOTSRANGE 3277 6552 + R 2 cluster ADDSLOTSRANGE 6553 9828 + R 3 cluster ADDSLOTSRANGE 9829 13104 + R 4 cluster ADDSLOTSRANGE 13105 16383 +} + +start_cluster 5 0 {tags {external:skip cluster}} { + +set master1 [srv 0 "client"] +set master2 [srv -1 "client"] +set master3 [srv -2 "client"] +set master4 [srv -3 "client"] +set master5 [srv -4 "client"] + +test "Continuous slots distribution" { + assert_match "* 0-3276*" [$master1 CLUSTER NODES] + assert_match "* 3277-6552*" [$master2 CLUSTER NODES] + assert_match "* 6553-9828*" [$master3 CLUSTER NODES] + assert_match "* 9829-13104*" [$master4 CLUSTER NODES] + assert_match "* 13105-16383*" [$master5 CLUSTER NODES] + assert_match "*0 3276*" [$master1 CLUSTER SLOTS] + assert_match "*3277 6552*" [$master2 CLUSTER SLOTS] + assert_match "*6553 9828*" [$master3 CLUSTER SLOTS] + assert_match "*9829 13104*" [$master4 CLUSTER SLOTS] + assert_match "*13105 16383*" [$master5 CLUSTER SLOTS] + + $master1 CLUSTER DELSLOTSRANGE 3001 3050 + assert_match "* 0-3000 3051-3276*" [$master1 CLUSTER NODES] + assert_match "*0 3000*3051 3276*" [$master1 CLUSTER SLOTS] + + $master2 CLUSTER DELSLOTSRANGE 5001 5500 + assert_match "* 3277-5000 5501-6552*" [$master2 CLUSTER NODES] + assert_match "*3277 5000*5501 6552*" [$master2 CLUSTER SLOTS] + + $master3 CLUSTER DELSLOTSRANGE 7001 7100 8001 8500 + assert_match "* 6553-7000 7101-8000 8501-9828*" [$master3 CLUSTER NODES] + assert_match "*6553 7000*7101 8000*8501 9828*" [$master3 CLUSTER SLOTS] + + $master4 CLUSTER DELSLOTSRANGE 11001 12000 12101 12200 + assert_match "* 9829-11000 12001-12100 12201-13104*" [$master4 CLUSTER NODES] + assert_match "*9829 11000*12001 12100*12201 13104*" [$master4 CLUSTER SLOTS] + + $master5 CLUSTER DELSLOTSRANGE 13501 14000 15001 16000 + assert_match "* 13105-13500 14001-15000 16001-16383*" [$master5 CLUSTER NODES] + assert_match "*13105 13500*14001 15000*16001 16383*" [$master5 CLUSTER SLOTS] +} + +test "ADDSLOTS command with several boundary conditions test suite" { + assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTS 3001 aaa} + assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTS 3001 -1000} + assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTS 3001 30003} + + assert_error "ERR Slot 3200 is already busy" {R 0 cluster ADDSLOTS 3200} + assert_error "ERR Slot 8501 is already busy" {R 0 cluster ADDSLOTS 8501} + + assert_error "ERR Slot 3001 specified multiple times" {R 0 cluster ADDSLOTS 3001 3002 3001} +} + +test "ADDSLOTSRANGE command with several boundary conditions test suite" { + # Add multiple slots with incorrect argument number + assert_error "ERR wrong number of arguments for 'cluster|addslotsrange' command" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030} + + # Add multiple slots with invalid input slot + assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030 aaa} + assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030 70000} + assert_error "ERR Invalid or out of range slot" {R 0 cluster ADDSLOTSRANGE 3001 3020 -1000 3030} + + # Add multiple slots when start slot number is greater than the end slot + assert_error "ERR start slot number 3030 is greater than end slot number 3025" {R 0 cluster ADDSLOTSRANGE 3001 3020 3030 3025} + + # Add multiple slots with busy slot + assert_error "ERR Slot 3200 is already busy" {R 0 cluster ADDSLOTSRANGE 3001 3020 3200 3250} + + # Add multiple slots with assigned multiple times + assert_error "ERR Slot 3001 specified multiple times" {R 0 cluster ADDSLOTSRANGE 3001 3020 3001 3020} +} + +test "DELSLOTSRANGE command with several boundary conditions test suite" { + # Delete multiple slots with incorrect argument number + assert_error "ERR wrong number of arguments for 'cluster|delslotsrange' command" {R 0 cluster DELSLOTSRANGE 1000 2000 2100} + assert_match "* 0-3000 3051-3276*" [$master1 CLUSTER NODES] + assert_match "*0 3000*3051 3276*" [$master1 CLUSTER SLOTS] + + # Delete multiple slots with invalid input slot + assert_error "ERR Invalid or out of range slot" {R 0 cluster DELSLOTSRANGE 1000 2000 2100 aaa} + assert_error "ERR Invalid or out of range slot" {R 0 cluster DELSLOTSRANGE 1000 2000 2100 70000} + assert_error "ERR Invalid or out of range slot" {R 0 cluster DELSLOTSRANGE 1000 2000 -2100 2200} + assert_match "* 0-3000 3051-3276*" [$master1 CLUSTER NODES] + assert_match "*0 3000*3051 3276*" [$master1 CLUSTER SLOTS] + + # Delete multiple slots when start slot number is greater than the end slot + assert_error "ERR start slot number 5800 is greater than end slot number 5750" {R 1 cluster DELSLOTSRANGE 5600 5700 5800 5750} + assert_match "* 3277-5000 5501-6552*" [$master2 CLUSTER NODES] + assert_match "*3277 5000*5501 6552*" [$master2 CLUSTER SLOTS] + + # Delete multiple slots with already unassigned + assert_error "ERR Slot 7001 is already unassigned" {R 2 cluster DELSLOTSRANGE 7001 7100 9000 9200} + assert_match "* 6553-7000 7101-8000 8501-9828*" [$master3 CLUSTER NODES] + assert_match "*6553 7000*7101 8000*8501 9828*" [$master3 CLUSTER SLOTS] + + # Delete multiple slots with assigned multiple times + assert_error "ERR Slot 12500 specified multiple times" {R 3 cluster DELSLOTSRANGE 12500 12600 12500 12600} + assert_match "* 9829-11000 12001-12100 12201-13104*" [$master4 CLUSTER NODES] + assert_match "*9829 11000*12001 12100*12201 13104*" [$master4 CLUSTER SLOTS] +} +} cluster_allocate_with_continuous_slots_local + +start_cluster 2 0 {tags {external:skip cluster experimental}} { + +set master1 [srv 0 "client"] +set master2 [srv -1 "client"] + +test "SFLUSH - Errors and output validation" { + assert_match "* 0-8191*" [$master1 CLUSTER NODES] + assert_match "* 8192-16383*" [$master2 CLUSTER NODES] + assert_match "*0 8191*" [$master1 CLUSTER SLOTS] + assert_match "*8192 16383*" [$master2 CLUSTER SLOTS] + + # make master1 non-continuous slots + $master1 cluster DELSLOTSRANGE 1000 2000 + + # Test SFLUSH errors validation + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4} + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 SYNC} + assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH x 4} + assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH 0 12x} + assert_error {ERR Slot 3 specified multiple times} {$master1 SFLUSH 2 4 3 5} + assert_error {ERR start slot number 8 is greater than*} {$master1 SFLUSH 8 4} + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 8 10} + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 0 999 2001 8191 ASYNCX} + + # Test SFLUSH output validation + assert_match "" [$master1 SFLUSH 2 4] + assert_match "" [$master1 SFLUSH 0 4] + assert_match "" [$master2 SFLUSH 0 4] + assert_match "" [$master1 SFLUSH 1 8191] + assert_match "" [$master1 SFLUSH 0 8190] + assert_match "" [$master1 SFLUSH 0 998 2001 8191] + assert_match "" [$master1 SFLUSH 1 999 2001 8191] + assert_match "" [$master1 SFLUSH 0 999 2001 8190] + assert_match "" [$master1 SFLUSH 0 999 2002 8191] + assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 999 2001 8191] + assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 8191] + assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 4000 4001 8191] + assert_match "" [$master2 SFLUSH 8193 16383] + assert_match "" [$master2 SFLUSH 8192 16382] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 SYNC] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 ASYNC] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 SYNC] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 ASYNC] + + # restore master1 continuous slots + $master1 cluster ADDSLOTSRANGE 1000 2000 +} + +test "SFLUSH - Deletes the keys with argument /SYNC/ASYNC" { + foreach op {"" "SYNC" "ASYNC"} { + for {set i 0} {$i < 100} {incr i} { + catch {$master1 SET key$i val$i} + catch {$master2 SET key$i val$i} + } + + assert {[$master1 DBSIZE] > 0} + assert {[$master2 DBSIZE] > 0} + if {$op eq ""} { + assert_match "{0 8191}" [ $master1 SFLUSH 0 8191] + } else { + assert_match "{0 8191}" [ $master1 SFLUSH 0 8191 $op] + } + assert {[$master1 DBSIZE] == 0} + assert {[$master2 DBSIZE] > 0} + assert_match "{8192 16383}" [ $master2 SFLUSH 8192 16383] + assert {[$master2 DBSIZE] == 0} + } +} + +} diff --git a/examples/redis-unstable/tests/unit/cluster/scripting.tcl b/examples/redis-unstable/tests/unit/cluster/scripting.tcl new file mode 100644 index 0000000..76aa882 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/scripting.tcl @@ -0,0 +1,91 @@ +start_cluster 1 0 {tags {external:skip cluster}} { + + test {Eval scripts with shebangs and functions default to no cross slots} { + # Test that scripts with shebang block cross slot operations + assert_error "ERR Script attempted to access keys that do not hash to the same slot*" { + r 0 eval {#!lua + redis.call('set', 'foo', 'bar') + redis.call('set', 'bar', 'foo') + return 'OK' + } 0} + + # Test the functions by default block cross slot operations + r 0 function load REPLACE {#!lua name=crossslot + local function test_cross_slot(keys, args) + redis.call('set', 'foo', 'bar') + redis.call('set', 'bar', 'foo') + return 'OK' + end + + redis.register_function('test_cross_slot', test_cross_slot)} + assert_error "ERR Script attempted to access keys that do not hash to the same slot*" {r FCALL test_cross_slot 0} + } + + test {Cross slot commands are allowed by default for eval scripts and with allow-cross-slot-keys flag} { + # Old style lua scripts are allowed to access cross slot operations + r 0 eval "redis.call('set', 'foo', 'bar'); redis.call('set', 'bar', 'foo')" 0 + + # scripts with allow-cross-slot-keys flag are allowed + r 0 eval {#!lua flags=allow-cross-slot-keys + redis.call('set', 'foo', 'bar'); redis.call('set', 'bar', 'foo') + } 0 + + # Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup + # during cross-slot operation from the above lua script. + assert_equal "bar" [r 0 get foo] + assert_equal "foo" [r 0 get bar] + r 0 del foo + r 0 del bar + + # Functions with allow-cross-slot-keys flag are allowed + r 0 function load REPLACE {#!lua name=crossslot + local function test_cross_slot(keys, args) + redis.call('set', 'foo', 'bar') + redis.call('set', 'bar', 'foo') + return 'OK' + end + + redis.register_function{function_name='test_cross_slot', callback=test_cross_slot, flags={ 'allow-cross-slot-keys' }}} + r FCALL test_cross_slot 0 + + # Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup + # during cross-slot operation from the above lua function. + assert_equal "bar" [r 0 get foo] + assert_equal "foo" [r 0 get bar] + } + + test {Cross slot commands are also blocked if they disagree with pre-declared keys} { + assert_error "ERR Script attempted to access keys that do not hash to the same slot*" { + r 0 eval {#!lua + redis.call('set', 'foo', 'bar') + return 'OK' + } 1 bar} + } + + test {Cross slot commands are allowed by default if they disagree with pre-declared keys} { + r 0 flushall + r 0 eval "redis.call('set', 'foo', 'bar')" 1 bar + + # Make sure the script writes to the right slot + assert_equal 1 [r 0 cluster COUNTKEYSINSLOT 12182] ;# foo slot + assert_equal 0 [r 0 cluster COUNTKEYSINSLOT 5061] ;# bar slot + } + + test "Function no-cluster flag" { + R 0 function load {#!lua name=test + redis.register_function{function_name='f1', callback=function() return 'hello' end, flags={'no-cluster'}} + } + catch {R 0 fcall f1 0} e + assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e + } + + test "Script no-cluster flag" { + catch { + R 0 eval {#!lua flags=no-cluster + return 1 + } 0 + } e + + assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl b/examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl new file mode 100644 index 0000000..57b550a --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/sharded-pubsub.tcl @@ -0,0 +1,67 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Licensed under your choice of (a) the Redis Source Available License 2.0 +# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the +# GNU Affero General Public License v3 (AGPLv3). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + +start_cluster 1 1 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica [Rn $replica1_id] + + test "Sharded pubsub publish behavior within multi/exec" { + foreach {node} {primary replica} { + set node [set $node] + $node MULTI + $node SPUBLISH ch1 "hello" + $node EXEC + } + } + + test "Sharded pubsub within multi/exec with cross slot operation" { + $primary MULTI + $primary SPUBLISH ch1 "hello" + $primary GET foo + catch {$primary EXEC} err + assert_match {CROSSSLOT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with read operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary GET foo + $primary EXEC + } {0 {}} + + test "Sharded pubsub publish behavior within multi/exec with read operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica GET foo]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with write operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary SET foo bar + $primary EXEC + } {0 OK} + + test "Sharded pubsub publish behavior within multi/exec with write operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica SET foo bar]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl b/examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl new file mode 100644 index 0000000..0f3e3cc --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/slot-ownership.tcl @@ -0,0 +1,61 @@ +start_cluster 2 2 {tags {external:skip cluster}} { + + test "Verify that slot ownership transfer through gossip propagates deletes to replicas" { + assert {[s -2 role] eq {slave}} + wait_for_condition 1000 50 { + [s -2 master_link_status] eq {up} + } else { + fail "Instance #2 master link status is not up" + } + + assert {[s -3 role] eq {slave}} + wait_for_condition 1000 50 { + [s -3 master_link_status] eq {up} + } else { + fail "Instance #3 master link status is not up" + } + + # Set a single key that will be used to test deletion + set key "FOO" + R 0 SET $key TEST + set key_slot [R 0 cluster keyslot $key] + set slot_keys_num [R 0 cluster countkeysinslot $key_slot] + assert {$slot_keys_num > 0} + + # Wait for replica to have the key + R 2 readonly + wait_for_condition 1000 50 { + [R 2 exists $key] eq "1" + } else { + fail "Test key was not replicated" + } + + assert_equal [R 2 cluster countkeysinslot $key_slot] $slot_keys_num + + # Assert other shards in cluster doesn't have the key + assert_equal [R 1 cluster countkeysinslot $key_slot] "0" + assert_equal [R 3 cluster countkeysinslot $key_slot] "0" + + set nodeid [R 1 cluster myid] + + R 1 cluster bumpepoch + # Move $key_slot to node 1 + assert_equal [R 1 cluster setslot $key_slot node $nodeid] "OK" + + wait_for_cluster_propagation + + # src master will delete keys in the slot + wait_for_condition 50 100 { + [R 0 cluster countkeysinslot $key_slot] eq 0 + } else { + fail "master 'countkeysinslot $key_slot' did not eq 0" + } + + # src replica will delete keys in the slot + wait_for_condition 50 100 { + [R 2 cluster countkeysinslot $key_slot] eq 0 + } else { + fail "replica 'countkeysinslot $key_slot' did not eq 0" + } + } +} diff --git a/examples/redis-unstable/tests/unit/cluster/slot-stats.tcl b/examples/redis-unstable/tests/unit/cluster/slot-stats.tcl new file mode 100644 index 0000000..1123731 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/slot-stats.tcl @@ -0,0 +1,1169 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of (a) the Redis Source Available License 2.0 +# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the +# GNU Affero General Public License v3 (AGPLv3). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + +# Integration tests for CLUSTER SLOT-STATS command. + +# ----------------------------------------------------------------------------- +# Helper functions for CLUSTER SLOT-STATS test cases. +# ----------------------------------------------------------------------------- + +# Converts array RESP response into a dict. +# This is useful for many test cases, where unnecessary nesting is removed. +proc convert_array_into_dict {slot_stats} { + set res [dict create] + foreach slot_stat $slot_stats { + # slot_stat is an array of size 2, where 0th index represents (int) slot, + # and 1st index represents (map) usage statistics. + dict set res [lindex $slot_stat 0] [lindex $slot_stat 1] + } + return $res +} + +proc get_cmdstat_usec {cmd r} { + set cmdstatline [cmdrstat $cmd r] + regexp "usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline -> usec _ + return $usec +} + +proc initialize_expected_slots_dict {} { + set expected_slots [dict create] + for {set i 0} {$i < 16384} {incr i 1} { + dict set expected_slots $i 0 + } + return $expected_slots +} + +proc initialize_expected_slots_dict_with_range {start_slot end_slot} { + assert {$start_slot <= $end_slot} + set expected_slots [dict create] + for {set i $start_slot} {$i <= $end_slot} {incr i 1} { + dict set expected_slots $i 0 + } + return $expected_slots +} + +proc assert_empty_slot_stats {slot_stats metrics_to_assert} { + set slot_stats [convert_array_into_dict $slot_stats] + dict for {slot stats} $slot_stats { + foreach metric_name $metrics_to_assert { + set metric_value [dict get $stats $metric_name] + assert {$metric_value == 0} + } + } +} + +proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_to_assert} { + set slot_stats [convert_array_into_dict $slot_stats] + dict for {slot stats} $exception_slots { + assert {[dict exists $slot_stats $slot]} ;# slot_stats must contain the expected slots. + } + dict for {slot stats} $slot_stats { + if {[dict exists $exception_slots $slot]} { + foreach metric_name $metrics_to_assert { + set metric_value [dict get $exception_slots $slot $metric_name] + assert {[dict get $stats $metric_name] == $metric_value} + } + } else { + dict for {metric value} $stats { + assert {$value == 0} + } + } + } +} + +proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 deterministic_metrics non_deterministic_metrics} { + set slot_stats_1 [convert_array_into_dict $slot_stats_1] + set slot_stats_2 [convert_array_into_dict $slot_stats_2] + assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]} + + dict for {slot stats_1} $slot_stats_1 { + assert {[dict exists $slot_stats_2 $slot]} + set stats_2 [dict get $slot_stats_2 $slot] + + # For deterministic metrics, we assert their equality. + foreach metric $deterministic_metrics { + assert {[dict get $stats_1 $metric] == [dict get $stats_2 $metric]} + } + # For non-deterministic metrics, we assert their non-zeroness as a best-effort. + foreach metric $non_deterministic_metrics { + assert {([dict get $stats_1 $metric] == 0 && [dict get $stats_2 $metric] == 0) || \ + ([dict get $stats_1 $metric] != 0 && [dict get $stats_2 $metric] != 0)} + } + } +} + +proc assert_all_slots_have_been_seen {expected_slots} { + dict for {k v} $expected_slots { + assert {$v == 1} + } +} + +proc assert_slot_visibility {slot_stats expected_slots} { + set slot_stats [convert_array_into_dict $slot_stats] + dict for {slot _} $slot_stats { + assert {[dict exists $expected_slots $slot]} + dict set expected_slots $slot 1 + } + + assert_all_slots_have_been_seen $expected_slots +} + +proc assert_slot_stats_monotonic_order {slot_stats orderby is_desc} { + # For Tcl dict, the order of iteration is the order in which the keys were inserted into the dictionary + # Thus, the response ordering is preserved upon calling 'convert_array_into_dict()'. + # Source: https://www.tcl.tk/man/tcl8.6.11/TclCmd/dict.htm + set slot_stats [convert_array_into_dict $slot_stats] + set prev_metric -1 + dict for {_ stats} $slot_stats { + set curr_metric [dict get $stats $orderby] + if {$prev_metric != -1} { + if {$is_desc == 1} { + assert {$prev_metric >= $curr_metric} + } else { + assert {$prev_metric <= $curr_metric} + } + } + set prev_metric $curr_metric + } +} + +proc assert_slot_stats_monotonic_descent {slot_stats orderby} { + assert_slot_stats_monotonic_order $slot_stats $orderby 1 +} + +proc assert_slot_stats_monotonic_ascent {slot_stats orderby} { + assert_slot_stats_monotonic_order $slot_stats $orderby 0 +} + +proc wait_for_replica_key_exists {key key_count} { + wait_for_condition 1000 50 { + [R 1 exists $key] eq "$key_count" + } else { + fail "Test key was not replicated" + } +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS cpu-usec metric correctness. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + set key_secondary "FOO2" + set key_secondary_slot [R 0 cluster keyslot $key_secondary] + set metrics_to_assert [list cpu-usec] + + test "CLUSTER SLOT-STATS cpu-usec reset upon CONFIG RESETSTAT." { + R 0 SET $key VALUE + R 0 DEL $key + R 0 CONFIG RESETSTAT + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec reset upon slot migration." { + R 0 SET $key VALUE + + R 0 CLUSTER DELSLOTS $key_slot + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + R 0 CLUSTER ADDSLOTS $key_slot + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for non-slot specific commands." { + R 0 INFO + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for slot specific commands." { + R 0 SET $key VALUE + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set usec [get_cmdstat_usec set r] + set expected_slot_stats [ + dict create $key_slot [ + dict create cpu-usec $usec + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on keyspace update." { + # Blocking command with no timeout. Only keyspace update can unblock this client. + set rd [redis_deferring_client] + $rd BLPOP $key 0 + wait_for_blocked_clients_count 1 + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + # When the client is blocked, no accumulation is made. This behaviour is identical to INFO COMMANDSTATS. + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # Unblocking command. + R 0 LPUSH $key value + wait_for_blocked_clients_count 0 + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set lpush_usec [get_cmdstat_usec lpush r] + set blpop_usec [get_cmdstat_usec blpop r] + + # Assert that both blocking and non-blocking command times have been accumulated. + set expected_slot_stats [ + dict create $key_slot [ + dict create cpu-usec [expr $lpush_usec + $blpop_usec] + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on timeout." { + # Blocking command with 0.5 seconds timeout. + set rd [redis_deferring_client] + $rd BLPOP $key 0.5 + + # Confirm that the client is blocked, then unblocked within 1 second. + wait_for_blocked_clients_count 1 + wait_for_blocked_clients_count 0 + + # Assert that the blocking command time has been accumulated. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set blpop_usec [get_cmdstat_usec blpop r] + set expected_slot_stats [ + dict create $key_slot [ + dict create cpu-usec $blpop_usec + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for transactions." { + set r1 [redis_client] + $r1 MULTI + $r1 SET $key value + $r1 GET $key + + # CPU metric is not accumulated until EXEC is reached. This behaviour is identical to INFO COMMANDSTATS. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # Execute transaction, and assert that all nested command times have been accumulated. + $r1 EXEC + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set exec_usec [get_cmdstat_usec exec r] + set expected_slot_stats [ + dict create $key_slot [ + dict create cpu-usec $exec_usec + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, without cross-slot keys." { + R 0 eval {#!lua + redis.call('set', KEYS[1], 'bar') redis.call('get', KEYS[2]) + } 2 $key $key + + set eval_usec [get_cmdstat_usec eval r] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + + set expected_slot_stats [ + dict create $key_slot [ + dict create cpu-usec $eval_usec + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, with cross-slot keys." { + R 0 eval {#!lua flags=allow-cross-slot-keys + redis.call('set', KEYS[1], 'bar') redis.call('get', ARGV[1]) + } 1 $key $key_secondary + + # For cross-slot, we do not accumulate at all. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for functions, without cross-slot keys." { + R 0 function load replace {#!lua name=f1 + redis.register_function{ + function_name='f1', + callback=function(keys, args) redis.call('set', keys[1], '1') redis.call('get', keys[2]) end + } + } + R 0 fcall f1 2 $key $key + + set fcall_usec [get_cmdstat_usec fcall r] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + + set expected_slot_stats [ + dict create $key_slot [ + dict create cpu-usec $fcall_usec + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS cpu-usec for functions, with cross-slot keys." { + R 0 function load replace {#!lua name=f1 + redis.register_function{ + function_name='f1', + callback=function(keys, args) redis.call('set', keys[1], '1') redis.call('get', args[1]) end, + flags={'allow-cross-slot-keys'} + } + } + R 0 fcall f1 1 $key $key_secondary + + # For cross-slot, we do not accumulate at all. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS network-bytes-in. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + # Define shared variables. + set key "key" + set key_slot [R 0 cluster keyslot $key] + set metrics_to_assert [list network-bytes-in] + + test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." { + # *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + R 0 SET $key value + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 33 + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." { + set rd [redis_deferring_client] + # SET key value\r\n --> 15 bytes. + $rd write "SET $key value\r\n" + $rd flush + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 15 + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, blocking command." { + set rd [redis_deferring_client] + # *3\r\n$5\r\nblpop\r\n$3\r\nkey\r\n$1\r\n0\r\n --> 31 bytes. + $rd BLPOP $key 0 + wait_for_blocked_clients_count 1 + + # Slot-stats must be empty here, as the client is yet to be unblocked. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # *3\r\n$5\r\nlpush\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 35 bytes. + R 0 LPUSH $key value + wait_for_blocked_clients_count 0 + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 66 ;# 31 + 35 bytes. + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, multi-exec transaction." { + set r [redis_client] + # *1\r\n$5\r\nmulti\r\n --> 15 bytes. + $r MULTI + # *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + assert {[$r SET $key value] eq {QUEUED}} + # *1\r\n$4\r\nexec\r\n --> 14 bytes. + assert {[$r EXEC] eq {OK}} + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 62 ;# 15 + 33 + 14 bytes. + ] + ] + + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, non slot specific command." { + R 0 INFO + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-in, pub/sub." { + # PUB/SUB does not get accumulated at per-slot basis, + # as it is cluster-wide and is not slot specific. + set rd [redis_deferring_client] + $rd subscribe channel + R 0 publish channel message + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + set channel "channel" + set key_slot [R 0 cluster keyslot $channel] + set metrics_to_assert [list network-bytes-in] + + # Setup replication. + assert {[s -1 role] eq {slave}} + wait_for_condition 1000 50 { + [s -1 master_link_status] eq {up} + } else { + fail "Instance #1 master link status is not up" + } + R 1 readonly + + test "CLUSTER SLOT-STATS network-bytes-in, sharded pub/sub." { + set slot [R 0 cluster keyslot $channel] + set primary [Rn 0] + set replica [Rn 1] + set replica_subcriber [redis_deferring_client -1] + $replica_subcriber SSUBSCRIBE $channel + # *2\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n --> 34 bytes. + $primary SPUBLISH $channel hello + # *3\r\n$8\r\nspublish\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + + set slot_stats [$primary CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 42 + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + + set slot_stats [$replica CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-in 34 + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS network-bytes-out correctness. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster}} { + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + set expected_slots_to_key_count [dict create $key_slot 1] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + test "CLUSTER SLOT-STATS network-bytes-out, for non-slot specific commands." { + R 0 INFO + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-out, for slot specific commands." { + R 0 SET $key value + # +OK\r\n --> 5 bytes + + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 5 + ] + ] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-out, blocking commands." { + set rd [redis_deferring_client] + $rd BLPOP $key 0 + wait_for_blocked_clients_count 1 + + # Assert empty slot stats here, since COB is yet to be flushed due to the block. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # Unblock the command. + # LPUSH client) :1\r\n --> 4 bytes. + # BLPOP client) *2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 24 bytes, upon unblocking. + R 0 LPUSH $key value + wait_for_blocked_clients_count 0 + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 28 ;# 4 + 24 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +start_cluster 1 1 {tags {external:skip cluster}} { + + # Define shared variables. + set key "FOO" + set key_slot [R 0 CLUSTER KEYSLOT $key] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + # Setup replication. + assert {[s -1 role] eq {slave}} + wait_for_condition 1000 50 { + [s -1 master_link_status] eq {up} + } else { + fail "Instance #1 master link status is not up" + } + R 1 readonly + + test "CLUSTER SLOT-STATS network-bytes-out, replication stream egress." { + assert_equal [R 0 SET $key VALUE] {OK} + # Local client) +OK\r\n --> 5 bytes. + # Replication stream) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 38 ;# 5 + 33 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + +start_cluster 1 1 {tags {external:skip cluster}} { + + # Define shared variables. + set channel "channel" + set key_slot [R 0 cluster keyslot $channel] + set channel_secondary "channel2" + set key_slot_secondary [R 0 cluster keyslot $channel_secondary] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, single channel." { + set slot [R 0 cluster keyslot $channel] + set publisher [Rn 0] + set subscriber [redis_client] + set replica [redis_deferring_client -1] + + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes + $subscriber SSUBSCRIBE $channel + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 38 + ] + ] + R 0 CONFIG RESETSTAT + + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + assert_equal 1 [$publisher SPUBLISH $channel hello] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 46 ;# 4 + 42 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + $subscriber QUIT + R 0 FLUSHALL + R 0 CONFIG RESETSTAT + + test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, cross-slot channels." { + set slot [R 0 cluster keyslot $channel] + set publisher [Rn 0] + set subscriber [redis_client] + set replica [redis_deferring_client -1] + + # Stack multi-slot subscriptions against a single client. + # For primary channel; + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes + # For secondary channel; + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$8\r\nchannel2\r\n:1\r\n --> 39 bytes + $subscriber SSUBSCRIBE $channel + $subscriber SSUBSCRIBE $channel_secondary + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create \ + $key_slot [ \ + dict create network-bytes-out 38 + ] \ + $key_slot_secondary [ \ + dict create network-bytes-out 39 + ] + ] + R 0 CONFIG RESETSTAT + + # For primary channel; + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + # For secondary channel; + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes. + assert_equal 1 [$publisher SPUBLISH $channel hello] + assert_equal 1 [$publisher SPUBLISH $channel_secondary hello] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create \ + $key_slot [ \ + dict create network-bytes-out 46 ;# 4 + 42 bytes. + ] \ + $key_slot_secondary [ \ + dict create network-bytes-out 47 ;# 4 + 43 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS key-count metric correctness. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + set metrics_to_assert [list key-count] + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 + ] + ] + + test "CLUSTER SLOT-STATS contains default value upon redis-server startup" { + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + + test "CLUSTER SLOT-STATS contains correct metrics upon key introduction" { + R 0 SET $key TEST + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + + test "CLUSTER SLOT-STATS contains correct metrics upon key mutation" { + R 0 SET $key NEW_VALUE + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + + test "CLUSTER SLOT-STATS contains correct metrics upon key deletion" { + R 0 DEL $key + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + + test "CLUSTER SLOT-STATS slot visibility based on slot ownership changes" { + R 0 CONFIG SET cluster-require-full-coverage no + + R 0 CLUSTER DELSLOTS $key_slot + set expected_slots [initialize_expected_slots_dict] + dict unset expected_slots $key_slot + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert {[dict size $expected_slots] == 16383} + assert_slot_visibility $slot_stats $expected_slots + + R 0 CLUSTER ADDSLOTS $key_slot + set expected_slots [initialize_expected_slots_dict] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert {[dict size $expected_slots] == 16384} + assert_slot_visibility $slot_stats $expected_slots + } +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS SLOTSRANGE sub-argument. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster}} { + + test "CLUSTER SLOT-STATS SLOTSRANGE all slots present" { + set start_slot 100 + set end_slot 102 + set expected_slots [initialize_expected_slots_dict_with_range $start_slot $end_slot] + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE $start_slot $end_slot] + assert_slot_visibility $slot_stats $expected_slots + } + + test "CLUSTER SLOT-STATS SLOTSRANGE some slots missing" { + set start_slot 100 + set end_slot 102 + set expected_slots [initialize_expected_slots_dict_with_range $start_slot $end_slot] + + R 0 CLUSTER DELSLOTS $start_slot + dict unset expected_slots $start_slot + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE $start_slot $end_slot] + assert_slot_visibility $slot_stats $expected_slots + } +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS ORDERBY sub-argument. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + set metrics [list "key-count" "memory-bytes" "cpu-usec" "network-bytes-in" "network-bytes-out"] + + # SET keys for target hashslots, to encourage ordering. + set hash_tags [list 0 1 2 3 4] + set num_keys 1 + foreach hash_tag $hash_tags { + for {set i 0} {$i < $num_keys} {incr i 1} { + R 0 SET "$i{$hash_tag}" VALUE + } + incr num_keys 1 + } + + # SET keys for random hashslots, for random noise. + set num_keys 0 + while {$num_keys < 1000} { + set random_key [randomInt 16384] + R 0 SET $random_key VALUE + incr num_keys 1 + } + + test "CLUSTER SLOT-STATS ORDERBY DESC correct ordering" { + foreach orderby $metrics { + set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC] + assert_slot_stats_monotonic_descent $slot_stats $orderby + } + } + + test "CLUSTER SLOT-STATS ORDERBY ASC correct ordering" { + foreach orderby $metrics { + set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC] + assert_slot_stats_monotonic_ascent $slot_stats $orderby + } + } + + test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is less than number of assigned slots" { + R 0 FLUSHALL SYNC + R 0 CONFIG RESETSTAT + + foreach orderby $metrics { + set limit 5 + set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC] + set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC] + set slot_stats_desc_length [llength $slot_stats_desc] + set slot_stats_asc_length [llength $slot_stats_asc] + assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length} + + # All slot statistics have been reset to 0, so we will order by slot in ascending order. + set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0] + assert_slot_visibility $slot_stats_desc $expected_slots + assert_slot_visibility $slot_stats_asc $expected_slots + } + } + + test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is greater than number of assigned slots" { + R 0 CONFIG SET cluster-require-full-coverage no + R 0 FLUSHALL SYNC + R 0 CLUSTER FLUSHSLOTS + R 0 CLUSTER ADDSLOTS 100 101 + + foreach orderby $metrics { + set num_assigned_slots 2 + set limit 5 + set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC] + set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC] + set slot_stats_desc_length [llength $slot_stats_desc] + set slot_stats_asc_length [llength $slot_stats_asc] + set expected_response_length [expr min($num_assigned_slots, $limit)] + assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length} + + set expected_slots [dict create 100 0 101 0] + assert_slot_visibility $slot_stats_desc $expected_slots + assert_slot_visibility $slot_stats_asc $expected_slots + } + } + + test "CLUSTER SLOT-STATS ORDERBY arg sanity check." { + # Non-existent argument. + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count non-existent-arg} + # Negative LIMIT. + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count DESC LIMIT -1} + # Non-existent ORDERBY metric. + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY non-existent-metric} + # When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics. + R 0 CONFIG SET cluster-slot-stats-enabled no + set orderby "cpu-usec" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} + set orderby "network-bytes-in" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} + set orderby "network-bytes-out" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} + set orderby "memory-bytes" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby} + + # When only cpu net is enabled, memory-bytes ORDERBY should fail + R 0 CONFIG SET cluster-slot-stats-enabled "cpu net" + assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY memory-bytes} + } + +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS replication. +# ----------------------------------------------------------------------------- + +start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + + # Define shared variables. + set key "key" + set key_slot [R 0 CLUSTER KEYSLOT $key] + set primary [Rn 0] + set replica [Rn 1] + + # For replication, assertions are split between deterministic and non-deterministic metrics. + # * For deterministic metrics, strict equality assertions are made. + # * For non-deterministic metrics, non-zeroness assertions are made. + # Non-zeroness as in, both primary and replica should either have some value, or no value at all. + # + # * key-count is deterministic between primary and its replica. + # * cpu-usec is non-deterministic between primary and its replica. + # * network-bytes-in is deterministic between primary and its replica. + # * network-bytes-out will remain empty in the replica, since primary client do not receive replies, unless for replicationSendAck(). + set deterministic_metrics [list key-count network-bytes-in] + set non_deterministic_metrics [list cpu-usec] + set empty_metrics [list network-bytes-out] + + # Setup replication. + assert {[s -1 role] eq {slave}} + wait_for_condition 1000 50 { + [s -1 master_link_status] eq {up} + } else { + fail "Instance #1 master link status is not up" + } + R 1 readonly + + test "CLUSTER SLOT-STATS metrics replication for new keys" { + # *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + R 0 SET $key VALUE + + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 network-bytes-in 33 + ] + ] + set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics + + wait_for_condition 500 10 { + [string match {*calls=1,*} [cmdrstat set $replica]] + } else { + fail "Replica did not receive the command." + } + set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics + assert_empty_slot_stats $slot_stats_replica $empty_metrics + } + R 0 CONFIG RESETSTAT + R 1 CONFIG RESETSTAT + + test "CLUSTER SLOT-STATS metrics replication for existing keys" { + # *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$13\r\nvalue_updated\r\n --> 42 bytes. + R 0 SET $key VALUE_UPDATED + + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 1 network-bytes-in 42 + ] + ] + set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics + + wait_for_condition 500 10 { + [string match {*calls=1,*} [cmdrstat set $replica]] + } else { + fail "Replica did not receive the command." + } + set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics + assert_empty_slot_stats $slot_stats_replica $empty_metrics + } + R 0 CONFIG RESETSTAT + R 1 CONFIG RESETSTAT + + test "CLUSTER SLOT-STATS metrics replication for deleting keys" { + # *2\r\n$3\r\ndel\r\n$3\r\nkey\r\n --> 22 bytes. + R 0 DEL $key + + set expected_slot_stats [ + dict create $key_slot [ + dict create key-count 0 network-bytes-in 22 + ] + ] + set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics + + wait_for_condition 500 10 { + [string match {*calls=1,*} [cmdrstat del $replica]] + } else { + fail "Replica did not receive the command." + } + set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics + assert_empty_slot_stats $slot_stats_replica $empty_metrics + } + R 0 CONFIG RESETSTAT + R 1 CONFIG RESETSTAT +} + +start_cluster 2 2 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + test "CLUSTER SLOT-STATS reset upon atomic slot migration" { + # key on slot-0 + set key0 "{06S}mykey0" + set key0_slot [R 0 CLUSTER KEYSLOT $key0] + R 0 SET $key0 VALUE + + # Migrate slot-0 to node-1 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 1 cluster_slot_migration_active_tasks] == 0 + } else { + fail "ASM tasks did not complete" + } + + set expected_slot_stats [ + dict create \ + $key0_slot [ \ + dict create key-count 1 \ + dict create cpu-usec 0 \ + dict create network-bytes-in 0 \ + dict create network-bytes-out 0 \ + ] + ] + set metrics_to_assert [list key-count cpu-usec network-bytes-in network-bytes-out] + + # Verify metrics are reset except key-count + set slot_stats [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 0] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + + # Migrate slot-0 back to node-0 + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 1 cluster_slot_migration_active_tasks] == 0 + } else { + fail "ASM tasks did not complete" + } + + # Verify metrics are reset except key-count + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 0] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS memory-bytes field presence. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} { + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + + test "CLUSTER SLOT-STATS memory-bytes field present when cluster-slot-stats-enabled set on startup" { + R 0 SET $key VALUE + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + # Verify memory-bytes field is present + assert {[dict exists $slot_stats $key_slot]} + set stats [dict get $slot_stats $key_slot] + assert {[dict exists $stats memory-bytes]} + assert {[dict get $stats memory-bytes] > 0} + } + + test "CLUSTER SLOT-STATS net mem combination shows only net and mem stats" { + R 0 CONFIG SET cluster-slot-stats-enabled "net mem" + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + set stats [dict get $slot_stats $key_slot] + assert {[dict exists $stats memory-bytes]} + assert {[dict exists $stats network-bytes-in]} + assert {[dict exists $stats network-bytes-out]} + assert {![dict exists $stats cpu-usec]} + } + + test "CLUSTER SLOT-STATS cpu mem combination shows only cpu and mem stats" { + R 0 CONFIG SET cluster-slot-stats-enabled "cpu mem" + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + set stats [dict get $slot_stats $key_slot] + assert {[dict exists $stats memory-bytes]} + assert {[dict exists $stats cpu-usec]} + assert {![dict exists $stats network-bytes-in]} + assert {![dict exists $stats network-bytes-out]} + + # Restore to yes for subsequent tests + R 0 CONFIG SET cluster-slot-stats-enabled yes + } + + test "CLUSTER SLOT-STATS memory-bytes field not present after disabling cluster-slot-stats-enabled" { + R 0 CONFIG SET cluster-slot-stats-enabled no + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + # Verify memory-bytes field is not present after disabling config + # (memory tracking is disabled when MEM flag is removed) + assert {[dict exists $slot_stats $key_slot]} + set stats [dict get $slot_stats $key_slot] + assert {![dict exists $stats memory-bytes]} + + # Verify other stats fields are not present + assert {![dict exists $stats cpu-usec]} + assert {![dict exists $stats network-bytes-in]} + assert {![dict exists $stats network-bytes-out]} + } + + test "CLUSTER SLOT-STATS memory tracking cannot be re-enabled after being disabled" { + # Once memory tracking is disabled, it cannot be re-enabled at runtime + assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled yes} + assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled mem} + + # But cpu and net can still be enabled + R 0 CONFIG SET cluster-slot-stats-enabled "cpu net" + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + assert {[dict exists $slot_stats $key_slot]} + set stats [dict get $slot_stats $key_slot] + assert {![dict exists $stats memory-bytes]} + assert {[dict exists $stats cpu-usec]} + assert {[dict exists $stats network-bytes-in]} + assert {[dict exists $stats network-bytes-out]} + } +} + +start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled no}} { + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + + test "CLUSTER SLOT-STATS memory-bytes field not present when cluster-slot-stats-enabled not set on startup" { + R 0 SET $key VALUE + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + # Verify memory-bytes field is not present + assert {[dict exists $slot_stats $key_slot]} + set stats [dict get $slot_stats $key_slot] + assert {![dict exists $stats memory-bytes]} + + # Only key-count should be present + assert {[dict exists $stats key-count]} + assert {[dict get $stats key-count] == 1} + } + + test "CLUSTER SLOT-STATS enabling mem at runtime fails when not enabled at startup" { + # Trying to enable memory tracking at runtime should fail + assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled mem} + assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled yes} + assert_error "ERR*memory tracking cannot be enabled at runtime*" {R 0 CONFIG SET cluster-slot-stats-enabled "cpu net mem"} + } + + test "CLUSTER SLOT-STATS enabling cpu and net at runtime works" { + R 0 CONFIG SET cluster-slot-stats-enabled "cpu net" + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set slot_stats [convert_array_into_dict $slot_stats] + + # Verify memory-bytes field is still not present + assert {[dict exists $slot_stats $key_slot]} + set stats [dict get $slot_stats $key_slot] + assert {![dict exists $stats memory-bytes]} + + # Other stats fields should now be present + assert {[dict exists $stats cpu-usec]} + assert {[dict exists $stats network-bytes-in]} + assert {[dict exists $stats network-bytes-out]} + } +} -- cgit v1.2.3