diff options
Diffstat (limited to 'examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl')
| -rw-r--r-- | examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl | 3063 |
1 files changed, 3063 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl new file mode 100644 index 0000000..f04257f --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl @@ -0,0 +1,3063 @@ +set ::slot_prefixes [dict create \ + 0 "{06S}" \ + 1 "{Qi}" \ + 2 "{5L5}" \ + 3 "{4Iu}" \ + 4 "{4gY}" \ + 5 "{460}" \ + 6 "{1Y7}" \ + 7 "{1LV}" \ + 101 "{1j2}" \ + 102 "{75V}" \ + 103 "{bno}" \ + 5462 "{450}"\ + 5463 "{4dY}"\ + 6000 "{4L7}" \ + 6001 "{4YV}" \ + 6002 "{0bx}" \ + 6003 "{AJ}" \ + 6004 "{of}" \ + 16383 "{6ZJ}" \ +] + +# Helper functions +proc get_port {node_id} { + if {$::tls} { + return [lindex [R $node_id config get tls-port] 1] + } else { + return [lindex [R $node_id config get port] 1] + } +} + +# return the prefix for the given slot +proc slot_prefix {slot} { + return [dict get $::slot_prefixes $slot] +} + +# return a key for the given slot +proc slot_key {slot {suffix ""}} { + return "[slot_prefix $slot]$suffix" +} + +# Populate a slot with keys +# TODO: Consider merging with populate() +proc populate_slot {num args} { + # Default values + set prefix "key:" + set size 3 + set idx 0 + set prints false + set expires 0 + set slot -1 + + # Parse named arguments + foreach {key value} $args { + switch -- $key { + -prefix { set prefix $value } + -size { set size $value } + -idx { set idx $value } + -prints { set prints $value } + -expires { set expires $value } + -slot { set slot $value } + default { error "Unknown option: $key" } + } + } + + # If slot is specified, use slot prefix from table + if {$slot >= 0} { + if {[dict exists $::slot_prefixes $slot]} { + set prefix [dict get $::slot_prefixes $slot] + } else { + error "Slot $slot not supported in slot_prefixes table, add it manually" + } + } + + R $idx deferred 1 + if {$num > 16} {set pipeline 16} else {set pipeline $num} + set val [string repeat A $size] + for {set j 0} {$j < $pipeline} {incr j} { + if {$expires > 0} { + R $idx set $prefix$j $val ex $expires + } else { + R $idx set $prefix$j $val + } + if {$prints} {puts $j} + } + for {} {$j < $num} {incr j} { + if {$expires > 0} { + R $idx set $prefix$j $val ex $expires + } else { + R $idx set $prefix$j $val + } + R $idx read + if {$prints} {puts $j} + } + for {set j 0} {$j < $pipeline} {incr j} { + R $idx read + if {$prints} {puts $j} + } + R $idx deferred 0 +} + +# Return 1 if all instances are idle +proc asm_all_instances_idle {total} { + for {set i 0} {$i < $total} {incr i} { + if {[CI $i cluster_slot_migration_active_tasks] != 0} { return 0 } + if {[CI $i cluster_slot_migration_active_trim_running] != 0} { return 0 } + } + return 1 +} + +# Wait for all ASM tasks to complete in the cluster +proc wait_for_asm_done {} { + set total_instances [expr {$::cluster_master_nodes + $::cluster_replica_nodes}] + + wait_for_condition 1000 10 { + [asm_all_instances_idle $total_instances] == 1 + } else { + # Print the number of active tasks on each instance + for {set i 0} {$i < $total_instances} {incr i} { + set migration_count [CI $i cluster_slot_migration_active_tasks] + set trim_count [CI $i cluster_slot_migration_active_trim_running] + puts "Instance $i: migration_tasks=$migration_count, trim_tasks=$trim_count" + } + fail "ASM tasks did not complete on all instances" + } + # wait all nodes to reach the same cluster config after ASM + wait_for_cluster_propagation +} + +proc failover_and_wait_for_done {node_id {failover_arg ""}} { + set max_attempts 5 + for {set attempt 1} {$attempt <= $max_attempts} {incr attempt} { + if {$failover_arg eq ""} { + R $node_id cluster failover + } else { + R $node_id cluster failover $failover_arg + } + + set completed 1 + wait_for_condition 1000 10 { + [string match "*master*" [R $node_id role]] + } else { + set completed 0 + } + + if {$completed} { + wait_for_cluster_propagation + return + } + } + fail "Failover did not complete after $max_attempts attempts for node $node_id" +} + +proc migration_status {node_id task_id field} { + set status [R $node_id CLUSTER MIGRATION STATUS ID $task_id] + + # STATUS ID returns single task, so get first element + if {[llength $status] == 0} { + return "" + } + + set task_status [lindex $status 0] + set field_value "" + + # Parse the key-value pairs in the task + for {set i 0} {$i < [llength $task_status]} {incr i 2} { + set key [lindex $task_status $i] + set value [lindex $task_status [expr $i + 1]] + + if {$key eq $field} { + set field_value $value + break + } + } + + return $field_value +} + +# Setup slot migration test with keys and delay, then start migration +# Returns the task_id for the migration +proc setup_slot_migration_with_delay {src_node dst_node start_slot end_slot {keys 2} {delay 1000000}} { + # Two keys on the start slot + populate_slot $keys -idx $src_node -slot $start_slot + + # we set a delay to ensure migration takes time for testing, + # with default parameters, two keys cost 2s to save + R $src_node config set rdb-key-save-delay $delay + + # migrate slot range from src_node to dst_node + set task_id [R $dst_node CLUSTER MIGRATION IMPORT $start_slot $end_slot] + wait_for_condition 2000 10 { + [string match {*send-bulk-and-stream*} [migration_status $src_node $task_id state]] + } else { + fail "ASM task did not start" + } + + return $task_id +} + +# Helper function to clear module internal event logs +proc clear_module_event_log {} { + for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} { + R $i asm.clear_event_log + } +} + +proc reset_default_trim_method {} { + for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} { + R $i debug asm-trim-method default + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} { + foreach trim_method {"active" "bg"} { + test "Simple slot migration (trim method: $trim_method)" { + R 0 debug asm-trim-method $trim_method + R 3 debug asm-trim-method $trim_method + + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "a" + set slot1_key [slot_key 1 mykey] + R 0 set $slot1_key "b" + set slot101_key [slot_key 101 mykey] + R 0 set $slot101_key "c" + # 3 keys cost 3s to save + R 0 config set rdb-key-save-delay 1000000 + + # load a function + R 0 function load {#!lua name=test1 + redis.register_function('test1', function() return 'hello1' end) + } + + # migrate slot 0-100 to R 1 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + # migration is start, and in accumulating buffer stage + wait_for_condition 1000 50 { + [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] && + [string match {*accumulate-buffer*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not start" + } + + # append 99 times during migration + for {set i 0} {$i < 99} {incr i} { + R 0 multi + R 0 append $slot0_key "a" + R 0 exec + R 0 append $slot1_key "b" + R 0 append $slot101_key "c" + } + + # wait until migration of 0-100 successful + wait_for_asm_done + + # verify task state became completed + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + + # the appended 99 times should also be migrated + assert_equal [string repeat a 100] [R 1 get $slot0_key] + assert_equal [string repeat b 100] [R 1 get $slot1_key] + + # function should be migrated + assert_equal [R 0 function dump] [R 1 function dump] + # the slave should also get the data + wait_for_ofs_sync [Rn 1] [Rn 4] + + R 4 readonly + assert_equal [string repeat a 100] [R 4 get $slot0_key] + assert_equal [string repeat b 100] [R 4 get $slot1_key] + assert_equal [R 0 function dump] [R 4 function dump] + + # verify key that was not in the slot range is not migrated + assert_equal [string repeat c 100] [R 0 get $slot101_key] + # verify changes in replica + wait_for_ofs_sync [Rn 0] [Rn 3] + R 3 readonly + assert_equal [string repeat c 100] [R 3 get $slot101_key] + + # cleanup + R 0 config set rdb-key-save-delay 0 + R 0 flushall + R 0 function flush + R 1 flushall + R 1 function flush + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + } +} + +# Skip most of the tests when running under valgrind since it is hard to +# stabilize tests under valgrind. +if {!$::valgrind} { +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} { + test "Test CLUSTER MIGRATION IMPORT input validation" { + # invalid arguments + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100 200 300} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION UNKNOWN 1 2} + + # invalid slot range + assert_error {*greater than end slot number*} {R 0 CLUSTER MIGRATION IMPORT 200 100} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 17000 18000} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 14000 18000} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 16384} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 -1} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -1 2} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -2 -1} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 10 a} + assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT sd sd} + assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 200} + } + + test "Test CLUSTER MIGRATION CANCEL input validation" { + # invalid arguments + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID 12345 EXTRAARG} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ALL EXTRAARG} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL UNKNOWNARG} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL abc def} + # empty string id should not cancel any task + assert_equal 0 [R 0 CLUSTER MIGRATION CANCEL ID ""] + } + + test "Test CLUSTER MIGRATION STATUS input validation" { + # invalid arguments + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID id EXTRAARG} + assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ALL EXTRAARG} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS ABC DEF} + assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS UNKNOWNARG} + # empty string id should not list any task + assert_equal {} [R 0 CLUSTER MIGRATION STATUS ID ""] + } + + test "Test TRIMSLOTS input validation" { + # Wrong number of arguments + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 1} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 2 100} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 17000 1} + assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES abc} + + # Missing ranges argument + assert_error {*missing ranges argument*} {R 0 TRIMSLOTS UNKNOWN 1 100 200} + + # Invalid number of ranges + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 0 1 1} + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES -1 2 2} + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 17000 1 2} + assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 2 100 200 300} + + # Invalid slot numbers + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -1 0} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -2 -1} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 0 16384} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 abc def} + assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 100 abc} + + # Start slot greater than end slot + assert_error {*greater than end slot number*} {R 0 TRIMSLOTS RANGES 1 200 100} + } + + test "Test IMPORT not allowed on replica" { + assert_error {* not allowed on replica*} {R 4 CLUSTER MIGRATION IMPORT 100 200} + } + + test "Test IMPORT not allowed during manual migration" { + set dst_id [R 1 CLUSTER MYID] + + # Set a slot to IMPORTING + R 0 CLUSTER SETSLOT 15000 IMPORTING $dst_id + assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200} + # Revert the change + R 0 CLUSTER SETSLOT 15000 STABLE + + # Same test with setting a slot to MIGRATING + R 0 CLUSTER SETSLOT 5000 MIGRATING $dst_id + assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200} + # Revert the change + R 0 CLUSTER SETSLOT 5000 STABLE + } + + test "Test IMPORT not allowed if the node is already the owner" { + assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 100} + } + + test "Test IMPORT not allowed for a slot without an owner" { + # Slot will have no owner + R 0 CLUSTER DELSLOTS 5000 + + assert_error {*slot has no owner: 5000*} {R 0 CLUSTER MIGRATION IMPORT 5000 5000} + + # Revert the change + R 0 CLUSTER ADDSLOTS 5000 + } + + test "Test IMPORT not allowed if slot ranges belong to different nodes" { + assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 15000} + assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 14000 15000} + } + + test "Test IMPORT not allowed if slot is given multiple times" { + assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 8000 9000} + assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 7900 9000} + } + + test "Test CLUSTER MIGRATION STATUS ALL lists all tasks" { + # Create 3 completed tasks + R 0 CLUSTER MIGRATION IMPORT 7000 7001 + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 7002 7003 + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 7004 7005 + wait_for_asm_done + + # Get node IDs for verification + set node0_id [R 0 cluster myid] + set node1_id [R 1 cluster myid] + + # Verify CLUSTER MIGRATION STATUS ALL reply from both nodes + foreach node_idx {0 1} { + set tasks [R $node_idx CLUSTER MIGRATION STATUS ALL] + assert_equal 3 [llength $tasks] + + for {set i 0} {$i < 3} {incr i} { + set task [lindex $tasks $i] + + # Verify field order + set expected_fields {id slots source dest operation state + last_error retries create_time start_time + end_time write_pause_ms} + for {set j 0} {$j < [llength $expected_fields]} {incr j} { + set expected_field [lindex $expected_fields $j] + set actual_field [lindex $task [expr $j * 2]] + assert_equal $expected_field $actual_field + } + + # Verify basic fields + assert_equal "completed" [dict get $task state] + assert_equal "" [dict get $task last_error] + assert_equal 0 [dict get $task retries] + assert {[dict get $task write_pause_ms] >= 0} + + # Verify operation based on node + if {$node_idx == 0} { + assert_equal "import" [dict get $task operation] + } else { + assert_equal "migrate" [dict get $task operation] + } + + # Verify node IDs (all tasks: node1 -> node0) + assert_equal $node1_id [dict get $task source] + assert_equal $node0_id [dict get $task dest] + + # Verify timestamps exist and are reasonable + set create_time [dict get $task create_time] + set start_time [dict get $task start_time] + set end_time [dict get $task end_time] + assert {$create_time > 0} + assert {$start_time >= $create_time} + assert {$end_time >= $start_time} + + # Verify specific slot ranges for each task + set slots [dict get $task slots] + if {$i == 0} { + assert_equal "7004-7005" $slots + } elseif {$i == 1} { + assert_equal "7002-7003" $slots + } elseif {$i == 2} { + assert_equal "7000-7001" $slots + } + } + } + + # cleanup + R 1 CLUSTER MIGRATION IMPORT 7000 7005 + wait_for_asm_done + } + + test "Test IMPORT not allowed if there is an overlapping import" { + # Let slot migration take long time, so that we can test overlapping import + R 1 config set rdb-key-save-delay 1000000 + R 1 set tag22273 tag22273 ;# slot hash is 7000 + R 1 set tag9283 tag9283 ;# slot hash is 8000 + + set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 8000] + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 8000 9000} + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 7500 8500} + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6000 7000} + assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6500 7500} + + wait_for_condition 1000 50 { + [string match {*completed*} [migration_status 0 $task_id state]] && + [string match {*completed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not start" + } + assert_equal "tag22273" [R 0 get tag22273] + assert_equal "tag9283" [R 0 get tag9283] + R 1 config set rdb-key-save-delay 0 + + # revert the migration + R 1 CLUSTER MIGRATION IMPORT 7000 8000 + wait_for_asm_done + } + + test "Test IMPORT with unsorted and adjacent ranges" { + # Redis should sort and merge adjacent ranges + # Adjacent means: prev.end + 1 == next.start + # e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005 + + # Test with adjacent ranges + set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7000-7100" [migration_status 0 $task_id slots] + assert_equal "7000-7100" [migration_status 1 $task_id slots] + + # Test with unsorted and adjacent ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots] + assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots] + + # Another test with unsorted and adjacent ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7006-7009" [migration_status 0 $task_id slots] + assert_equal "7006-7009" [migration_status 1 $task_id slots] + } + + test "Simple slot migration with write load" { + # Perform slot migration while traffic is on and verify data consistency. + # Trimming is disabled on source nodes so, we can compare the dbs after + # migration via DEBUG DIGEST to ensure no data loss during migration. + # Steps: + # 1. Disable trimming on both nodes + # 2. Populate slot 0 on node-0 and slot 6000 on node-1 + # 2. Start write traffic on both nodes + # 3. Migrate slot 0 from node-0 to node-1 + # 4. Migrate slot 6000 from node-1 to node-0 + # 5. Stop write traffic, verify db's are identical. + + # This test runs slowly under the thread sanitizer. + # 1. Increase the lag threshold from the default 1 MB to 10 MB to let the destination catch up easily. + # 2. Increase the write pause timeout from the default 10s to 60s so the source can wait longer. + set prev_config_lag [lindex [R 0 config get cluster-slot-migration-handoff-max-lag-bytes] 1] + R 0 config set cluster-slot-migration-handoff-max-lag-bytes 10mb + R 1 config set cluster-slot-migration-handoff-max-lag-bytes 10mb + set prev_config_timeout [lindex [R 0 config get cluster-slot-migration-write-pause-timeout] 1] + R 0 config set cluster-slot-migration-write-pause-timeout 60000 + R 1 config set cluster-slot-migration-write-pause-timeout 60000 + + R 0 flushall + R 0 debug asm-trim-method none + populate_slot 10000 -idx 0 -slot 0 + + R 1 flushall + R 1 debug asm-trim-method none + populate_slot 10000 -idx 1 -slot 6000 + + # Start write traffic on node-0 + # Throws -MOVED error once asm is completed, catch block will ignore it. + catch { + # Start the slot 0 write load on the R 0 + set port [get_port 0] + set key [slot_key 0 mykey] + set load_handle0 [start_write_load "127.0.0.1" $port 100 $key 0 5] + } + + # Start write traffic on node-1 + # Throws -MOVED error once asm is completed, catch block will ignore it. + catch { + # Start the slot 6000 write load on the R 1 + set port [get_port 1] + set key [slot_key 6000 mykey] + set load_handle1 [start_write_load "127.0.0.1" $port 100 $key 0 5] + } + + # Migrate keys + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 6000 6100 + wait_for_asm_done + + stop_write_load $load_handle0 + stop_write_load $load_handle1 + + # verify data + assert_morethan [R 0 dbsize] 0 + assert_equal [R 0 debug digest] [R 1 debug digest] + + # cleanup + R 0 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag + R 0 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout + R 0 debug asm-trim-method default + R 0 flushall + R 1 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag + R 1 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout + R 1 debug asm-trim-method default + R 1 flushall + + R 1 CLUSTER MIGRATION IMPORT 6000 6100 + wait_for_asm_done + } + + test "Verify expire time is migrated correctly" { + R 0 flushall + R 1 flushall + + set string_key [slot_key 0 string_key] + set list_key [slot_key 0 list_key] + set hash_key [slot_key 0 hash_key] + set stream_key [slot_key 0 stream_key] + + for {set i 0} {$i < 20} {incr i} { + R 1 hset $hash_key $i $i + R 1 xadd $stream_key * item $i + } + for {set i 0} {$i < 2000} {incr i} { + R 1 lpush $list_key $i + } + + # set expire time of some keys + R 1 set $string_key "a" EX 1000 + R 1 EXPIRE $list_key 1000 + R 1 EXPIRE $hash_key 1000 + + # migrate slot 0-100 to R 0 + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + + # check expire times are migrated correctly + assert_range [R 0 ttl $string_key] 900 1000 + assert_range [R 0 ttl $list_key] 900 1000 + assert_range [R 0 ttl $hash_key] 900 1000 + assert_equal -1 [R 0 ttl $stream_key] + + # cleanup + R 0 flushall + R 1 flushall + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + + test "Slot migration with complex data types can work well" { + R 0 flushall + R 1 flushall + + set list_key [slot_key 0 list_key] + set set_key [slot_key 0 set_key] + set zset_key [slot_key 0 zset_key] + set hash_key [slot_key 0 hash_key] + set stream_key [slot_key 0 stream_key] + + # generate big keys for each data type + for {set i 0} {$i < 1000} {incr i} { + R 1 lpush $list_key $i + R 1 sadd $set_key $i + R 1 zadd $zset_key $i $i + R 1 hset $hash_key $i $i + R 1 xadd $stream_key * item $i + } + + # migrate slot 0-100 to R 0 + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + # check the data on destination node is correct + assert_equal 1000 [R 0 llen $list_key] + assert_equal 1000 [R 0 scard $set_key] + assert_equal 1000 [R 0 zcard $zset_key] + assert_equal 1000 [R 0 hlen $hash_key] + assert_equal 1000 [R 0 xlen $stream_key] + # migrate slot 0-100 to R 1 + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + + proc asm_basic_error_handling_test {operation channel all_states} { + foreach state $all_states { + if {$::verbose} { puts "Testing $operation $channel channel with state: $state"} + + # For states that need incremental data streaming, set a longer delay + set streaming_states [list "streaming-buffer" "accumulate-buffer" "send-bulk-and-stream" "send-stream"] + if {$state in $streaming_states} { + R 1 config set rdb-key-save-delay 1000000 + } + + # Let the destination node take time to stream buffer, so the source node will handle + # slot snapshot child process exit, and then enter "send-stream" state. + if {$state == "send-stream"} { + R 0 config set key-load-delay 100000 + } + + # Start the slot 0 write load on the R 1 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key 500] + + # clear old fail points and set the new fail point + assert_equal {OK} [R 0 debug asm-failpoint "" ""] + assert_equal {OK} [R 1 debug asm-failpoint "" ""] + if {$operation eq "import"} { + assert_equal {OK} [R 0 debug asm-failpoint "import-$channel-channel" $state] + } elseif {$operation eq "migrate"} { + assert_equal {OK} [R 1 debug asm-failpoint "migrate-$channel-channel" $state] + } else { + fail "Unknown operation: $operation" + } + + # Start the migration + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100] + + # The task should be failed due to the fail point + wait_for_condition 2000 10 { + [string match -nocase "*$channel*${state}*" [migration_status 0 $task_id last_error]] || + [string match -nocase "*$channel*${state}*" [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail with expected error - + (dst: [migration_status 0 $task_id last_error] + src: [migration_status 1 $task_id last_error] + expected: $channel $state)" + } + stop_write_load $load_handle + + # Cancel the task + R 0 CLUSTER MIGRATION CANCEL ID $task_id + R 1 CLUSTER MIGRATION CANCEL ID $task_id + + R 1 config set rdb-key-save-delay 0 + R 0 config set key-load-delay 0 + } + } + + test "Destination node main channel basic error-handling tests " { + set all_states [list \ + "connecting" \ + "auth-reply" \ + "handshake-reply" \ + "syncslots-reply" \ + "accumulate-buffer" \ + "streaming-buffer" \ + "wait-stream-eof" \ + ] + asm_basic_error_handling_test "import" "main" $all_states + } + + test "Destination node rdb channel basic error-handling tests" { + set all_states [list \ + "connecting" \ + "auth-reply" \ + "rdbchannel-reply" \ + "rdbchannel-transfer" \ + ] + asm_basic_error_handling_test "import" "rdb" $all_states + } + + test "Source node main channel basic error-handling tests " { + set all_states [list \ + "wait-rdbchannel" \ + "send-bulk-and-stream" \ + "send-stream" \ + "handoff" \ + ] + asm_basic_error_handling_test "migrate" "main" $all_states + } + + test "Source node rdb channel basic error-handling tests" { + set all_states [list \ + "wait-bgsave-start" \ + "send-bulk-and-stream" \ + ] + asm_basic_error_handling_test "migrate" "rdb" $all_states + } + + test "Migration will be successful after fail points are cleared" { + R 0 flushall + R 1 flushall + set slot0_key [slot_key 0 mykey] + set slot1_key [slot_key 1 mykey] + R 1 set $slot0_key "a" + R 1 set $slot1_key "b" + + # we set a delay to write incremental data + R 1 config set rdb-key-save-delay 1000000 + + # Start the slot 0 write load on the R 1 + set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key] + + # Clear all fail points + assert_equal {OK} [R 0 debug asm-failpoint "" ""] + assert_equal {OK} [R 1 debug asm-failpoint "" ""] + + # Start the migration + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100] + + # Wait for the migration to complete + wait_for_asm_done + + stop_write_load $load_handle + + # Verify the data is migrated, slot 0 and 1 should belong to R 1 + # slot 0 key should be changed by the write load + assert_not_equal "a" [R 0 get $slot0_key] + assert_equal "b" [R 0 get $slot1_key] + R 1 config set rdb-key-save-delay 0 + } + + test "Client output buffer limit is reached on source side" { + R 0 flushall + R 1 flushall + set r1_pid [S 1 process_id] + R 1 debug repl-pause on-streaming-repl-buf + + # Set a small output buffer limit to trigger the error + R 0 config set client-output-buffer-limit "replica 4mb 0 0" + + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # some write traffic is to have chance to enter streaming buffer state + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "a" + + # after 3 second, the slots snapshot (costs 2s to generate) should be transferred, + # then start streaming buffer + after 3000 + + set loglines [count_log_lines 0] + + # Start the slot 0 write load on the R 0 + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 1000] + + # verify the metric is accessible, it is transient, will be reset on disconnect + assert {[S 0 mem_cluster_slot_migration_output_buffer] >= 0} + + # After some time, the client output buffer limit should be reached + wait_for_log_messages 0 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 1000 10 + wait_for_condition 1000 10 { + [string match {*send*stream*} [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail as expected" + } + + stop_write_load $load_handle + + # Reset configurations + R 0 config set client-output-buffer-limit "replica 0 0 0" + R 0 config set rdb-key-save-delay 0 + + # resume server and clear pause point + resume_process $r1_pid + R 1 debug repl-pause clear + + # Wait for the migration to complete + wait_for_asm_done + } + + test "Full sync buffer limit is reached on destination side" { + # Set a small replication buffer limit to trigger the error + R 0 config set replica-full-sync-buffer-limit 1mb + + # start migration from 1 to 0, cost 4s to transfer slots snapshot + set task_id [setup_slot_migration_with_delay 1 0 0 100 2 2000000] + set loglines [count_log_lines 0] + + # Create some traffic on slot 0 + populate_slot 100 -idx 1 -slot 0 -size 100000 + + # After some time, slots sync buffer limit should be reached, but migration would not fail + # since the buffer will be accumulated on source side from now. + wait_for_log_messages 0 {"*Slots sync buffer limit has been reached*"} $loglines 1000 10 + + # verify the peak value, should be greater than 1mb + assert {[S 0 mem_cluster_slot_migration_input_buffer_peak] > 1000000} + # verify the metric is accessible, it is transient, will be reset on disconnect + assert {[S 0 mem_cluster_slot_migration_input_buffer] >= 0} + + wait_for_asm_done + + # Reset configurations + R 0 config set replica-full-sync-buffer-limit 0 + R 1 config set rdb-key-save-delay 0 + R 1 cluster migration import 0 100 + wait_for_asm_done + } + + test "Expired key is not deleted and SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT filter keys in importing slots" { + set slot0_key [slot_key 0 mykey] + set slot1_key [slot_key 1 mykey] + set slot2_key [slot_key 2 mykey] + R 1 flushall + R 0 flushall + + # we set a delay to write incremental data + R 1 config set rdb-key-save-delay 1000000 + + # set expire time 2s. Generating slot snapshot will 3s, so these + # three keys will be expired after slot snapshot is transferred + R 1 setex $slot0_key 2 "a" + R 1 setex $slot1_key 2 "b" + R 1 hset $slot2_key "f1" "1" + R 1 expire $slot2_key 2 + R 1 hexpire $slot2_key 2 FIELDS 1 "f1" + + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100] + wait_for_condition 2000 10 { + [string match {*send-bulk-and-stream*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not start" + } + + # update expire time during mirgration + R 1 setex $slot0_key 100 "a" + R 1 expire $slot1_key 80 + R 1 expire $slot2_key 60 + R 1 hincrbyfloat $slot2_key "f1" 1 + R 1 hexpire $slot2_key 60 FIELDS 1 "f1" + + # after 2s, at least a key should be transferred, and should not be deleted + # due to expired, neither active nor lazy expiration (SCAN) takes effect, + # Besides SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT command can not find them + after 2000 + R 3 readonly + foreach id {0 3} { ;# 0 is the master, 3 is the replica + assert_equal {0 {}} [R $id scan 0 count 10] + assert_equal {} [R $id keys "*"] + assert_equal {} [R $id keys "{06S}*"] + assert_equal {} [R $id randomkey] + assert_equal {} [R $id cluster getkeysinslot 0 100] + assert_equal [R $id cluster countkeysinslot 0] 0 + assert_equal [R $id dbsize] 0 + + # but we can see the number of keys is increased in INFO KEYSPACE + assert {[scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d] >= 1} + assert {[scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d] >= 1} + } + + wait_for_asm_done + + wait_for_ofs_sync [Rn 0] [Rn 3] + + foreach id {0 3} { ;# 0 is the master, 3 is the replica + # verify the keys are valid + assert_range [R $id ttl $slot0_key] 90 100 + assert_range [R $id ttl $slot1_key] 70 80 + assert_range [R $id ttl $slot2_key] 50 60 + assert_range [R $id httl $slot2_key FIELDS 1 "f1"] 50 60 + + # KEYS/SCAN/RANDOMKEY/CLUSTER GETKEYSINSLOT will find the keys after migration + assert_equal [list 0 [list $slot0_key $slot1_key $slot2_key]] [R $id scan 0 count 10] + assert_equal [list $slot0_key $slot1_key $slot2_key] [R $id keys "*"] + assert_equal [list $slot0_key] [R $id keys "{06S}*"] + assert_not_equal {} [R $id randomkey] + assert_equal [list $slot0_key] [R $id cluster getkeysinslot 0 100] + + # INFO KEYSPACE/DBSIZE/CLUSTER COUNTKEYSINSLOT will also reflect the keys + assert_equal 3 [scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d] + assert_equal 3 [scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d] + assert_equal 1 [scan [regexp -inline {subexpiry\=([\d]*)} [R $id info keyspace]] subexpiry=%d] + assert_equal 3 [R $id dbsize] + assert_equal 1 [R $id cluster countkeysinslot 0] + } + + # update expire time to 10ms, after some time, the keys should be deleted due to + # active expiration + R 0 pexpire $slot0_key 10 + R 0 pexpire $slot1_key 10 + R 0 hpexpire $slot2_key 10 FIELDS 1 "f1" ;# the last field is expired, the key will be deleted + wait_for_condition 100 50 { + [scan [regexp -inline {keys\=([\d]*)} [R 0 info keyspace]] keys=%d] == {} && + [scan [regexp -inline {keys\=([\d]*)} [R 3 info keyspace]] keys=%d] == {} + } else { + fail "keys did not expire" + } + + R 1 config set rdb-key-save-delay 0 + } + + test "Eviction does not evict keys in importing slots" { + set slot0_key [slot_key 0 mykey] + set slot1_key [slot_key 1 mykey] + set slot2_key [slot_key 2 mykey] + set slot5462_key [slot_key 5462 mykey] + set slot5463_key [slot_key 5463 mykey] + R 1 flushall + R 0 flushall + + # we set a delay to write incremental data + R 0 config set rdb-key-save-delay 1000000 + + set 1k_str [string repeat "a" 1024] + set 1m_str [string repeat "a" 1048576] + + # set two keys to be evicted + R 1 set $slot5462_key $1k_str + R 1 set $slot5463_key $1k_str + + # set maxmemory to 200kb more than current used memory, + # redis should evict some keys if importing some big keys + set r1_mem_used [S 1 used_memory] + set r1_max_mem [expr {$r1_mem_used + 200*1024}] + R 1 config set maxmemory $r1_max_mem + R 1 config set maxmemory-policy allkeys-lru + + # set 3 keys to be migrated + R 0 set $slot0_key $1m_str + R 0 set $slot1_key $1m_str + R 0 set $slot2_key $1m_str + + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + wait_for_condition 2000 10 { + [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not start" + } + + # after 2.2s, at least two keys should be transferred, they should not be evicted + # but other keys (slot5462_key and slot5463_key) should be evicted + after 2200 + for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction + assert_equal 0 [R 1 exists $slot5462_key] + assert_equal 0 [R 1 exists $slot5463_key] + assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 2} + + # current used memory should be more than the maxmemory, since the big keys that + # belong importing slots can not be evicted. + set r1_mem_used [S 1 used_memory] + assert {$r1_mem_used > $r1_max_mem + 1024*1024} + + wait_for_asm_done + + # after migration, these big keys should be evicted + for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction + assert_equal {} [scan [regexp -inline {expires\=([\d]*)} [R 1 info keyspace]] expires=%d] + } + + test "Failover will cancel slot migration tasks" { + # migrate slot 0-100 from 1 to 0 + set task_id [setup_slot_migration_with_delay 1 0 0 100] + + # FAILOVER happens on the destination node, instance #3 become master, #0 become slave + failover_and_wait_for_done 3 + + # the old master will cancel the importing task, and the migrating task on + # the source node will be failed + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*failover*} [migration_status 0 $task_id last_error]] && + [string match {*failed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not cancel" + } + + # We can restart ASM tasks on new master, migrate slot 0-100 from 1 to 3 + R 1 config set rdb-key-save-delay 0 + set task_id [R 3 CLUSTER MIGRATION IMPORT 0 100] + wait_for_asm_done + + # migrate slot 0-100 from 3 to 1 + set task_id [setup_slot_migration_with_delay 3 1 0 100] + + # FAILOVER happens on the source node, instance #3 become slave, #0 become master + failover_and_wait_for_done 0 + + # the old master will cancel the migrating task, but the destination node will + # retry the importing task, and then succeed. + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 3 $task_id state]] + } else { + fail "ASM task did not cancel" + } + wait_for_asm_done + } + + test "Flush-like command can cancel slot migration task" { + # flushall, flushdb + foreach flushcmd {flushall flushdb} { + # start slot migration from 1 to 0 + set task_id [setup_slot_migration_with_delay 1 0 0 100] + + if {$::verbose} { puts "Testing flush command: $flushcmd"} + R 0 $flushcmd + + # flush-like will cancel the task + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not cancel" + } + } + + R 1 config set rdb-key-save-delay 0 + R 0 cluster migration import 0 100 + wait_for_asm_done + } + + test "CLUSTER SETSLOT command when there is a slot migration task" { + # Setup slot migration test from node 0 to node 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # Cluster SETSLOT command is not allowed when there is a slot migration task + # on the slot. #0 and #1 are having migration task now. + foreach instance {0 1} { + set node_id [R $instance cluster myid] + + catch {R $instance cluster setslot 0 migrating $node_id} err + assert_match {*in an active atomic slot migration*} $err + + catch {R $instance cluster setslot 0 importing $node_id} err + assert_match {*in an active atomic slot migration*} $err + + catch {R $instance cluster setslot 0 stable} err + assert_match {*in an active atomic slot migration*} $err + + catch {R $instance cluster setslot 0 node $node_id} err + assert_match {*in an active atomic slot migration*} $err + } + + # CLUSTER SETSLOT on other node will cancel the migration task, we update + # the owner of slot 0 (that is migrating from #0 to #1) to #2 on #2, we + # bump the config epoch to make sure the change can update #0 and #1 + # slot configuration, so #0 and #1 will cancel the migration task. + # BTW, if config epoch is not bumped, the slot config of #2 may be + # updated by #0 and #1. + R 2 cluster bumpepoch + R 2 cluster setslot 0 node [R 2 cluster myid] + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] && + [string match {*canceled*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not cancel" + } + + # set slot 0 back to #0 + R 0 cluster bumpepoch + R 0 cluster setslot 0 node [R 0 cluster myid] + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + test "CLUSTER DELSLOTSRANGE command cancels a slot migration task" { + # start slot migration from 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + R 0 cluster delslotsrange 0 100 + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] && + [string match {*failed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not cancel" + } + R 1 cluster migration cancel id $task_id + + # add the slots back + R 0 cluster addslotsrange 0 100 + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + # NOTE: this test needs more than 60s, maybe you can skip when testing + test "CLUSTER FORGET command cancels a slot migration task" { + R 0 config set rdb-key-save-delay 0 + # Migrate all slot on #0 to #1, so we can forget #0 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 5461] + wait_for_asm_done + + # start slot migration from 1 to 0 + set task_id [setup_slot_migration_with_delay 1 0 0 5461] + + # Forget #0 on #1, the migration task on #1 will be canceled due to node deleted, + # and the importing task on #0 will be failed + R 1 cluster forget [R 0 cluster myid] + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 1 $task_id state]] && + [string match {*node deleted*} [migration_status 1 $task_id last_error]] && + [string match {*failed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not cancel" + } + + # Add #0 back into cluster + # NOTE: this will cost 60s to let #0 join the cluster since + # other nodes add #0 into black list for 60s after FORGET. + R 1 config set rdb-key-save-delay 0 + R 1 cluster meet "127.0.0.1" [lindex [R 0 config get port] 1] + + # the importing task on #0 will be retried, and eventually succeed + # since now #0 is back in the cluster + wait_for_condition 3000 50 { + [string match {*completed*} [migration_status 0 $task_id state]] && + [string match {*completed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not finish" + } + + # make sure #0 is completely back to the cluster + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + test "CLIENT PAUSE can cancel slot migration task" { + # start slot migration from 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # CLIENT PAUSE happens on the destination node, #1 will cancel the importing task + R 1 client pause 100000 write ;# pause 100s + wait_for_condition 1000 50 { + [string match {*canceled*} [migration_status 1 $task_id state]] && + [string match {*client pause*} [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not cancel" + } + + # start task again + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + after 200 ;# give some time to have chance to schedule the task + # the task should not start since server is paused + assert {[string match {*none*} [migration_status 1 $task_id state]]} + + # unpause the server, the task should start + R 1 client unpause + wait_for_asm_done + + # migrate back to original node #0 + R 0 config set rdb-key-save-delay 0 + R 1 config set rdb-key-save-delay 0 + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } + + test "Server shutdown can cancel slot migration task, exit with success" { + # start slot migration from 0 to 1 + setup_slot_migration_with_delay 0 1 0 100 + + set loglines [count_log_lines -1] + + # Shutdown the server, it should cancel the migration task + restart_server -1 true false true nosave + + wait_for_log_messages -1 {"*Cancelled due to server shutdown*"} $loglines 100 100 + + wait_for_cluster_propagation + wait_for_cluster_state "ok" + } + + test "Cancel import task when streaming buffer into db" { + # set a delay to have time to cancel import task that is streaming buf to db + R 1 config set key-load-delay 50000 + # start slot migration from 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100 5] + + # start the slot 0 write load on the node 0 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 500] + + # wait for entering streaming buffer state + wait_for_condition 1000 10 { + [string match {*streaming-buffer*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not enter streaming buffer state" + } + stop_write_load $load_handle + + # cancel the import task on #1, the destination node works fine + R 1 cluster migration cancel id $task_id + assert_match {*canceled*} [migration_status 1 $task_id state] + + # reset config + R 0 config set key-load-delay 0 + R 1 config set key-load-delay 0 + } + + test "Destination node main channel timeout when waiting stream EOF" { + set task_id [setup_slot_migration_with_delay 0 1 0 100] + R 1 config set repl-timeout 5 + + # pause the source node to make EOF wait timeout. Do not pause + # the child process, so it can deliver slot snapshot to destination + set r0_process_id [S 0 process_id] + pause_process $r0_process_id + + # the destination node will fail after 7s, 5s for EOF wait and 2s for slot snapshot + wait_for_condition 1000 20 { + [string match {*failed*} [migration_status 1 $task_id state]] && + [string match {*Main channel*Connection timeout*wait-stream-eof*} \ + [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + # resume the source node + resume_process $r0_process_id + + # After the source node is resumed, the task on source node may receive + # ACKs from destination and consider the task is stream-done. In this case, + # the task on source node will be failed after several seconds + if {[string match {*stream-done*} [migration_status 0 $task_id state]]} { + wait_for_condition 1000 20 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Server paused*} [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + } + + R 1 config set repl-timeout 60 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Destination node rdb channel timeout when transferring slots snapshot" { + # cost 10s to transfer each key + set task_id [setup_slot_migration_with_delay 0 1 0 100 2 10000000] + R 1 config set repl-timeout 3 + + # the destination node will fail after 3s + wait_for_condition 1000 20 { + [string match {*failed*} [migration_status 1 $task_id state]] && + [string match {*RDB channel*Connection timeout*rdbchannel-transfer*} \ + [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + R 1 config set repl-timeout 60 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Source node rdb channel timeout when transferring slots snapshot" { + set r1_pid [S 1 process_id] + R 0 flushall + R 0 config set save "" + # generate several large keys, make sure the memory usage is more than + # socket buffer size, so the rdb channel will block and timeout if + # no data is received by destination. + set val [string repeat "a" 102400] ;# 100kb + for {set i 0} {$i < 1000} {incr i} { + set key [slot_key 0 "key$i"] + R 0 set $key $val + } + R 0 config set repl-timeout 3 ;# 3s for rdb channel timeout + R 0 config set rdb-key-save-delay 10000 ;# 1000 keys cost 10s to save + + # start migration from #0 to #1 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + wait_for_condition 1000 20 { + [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not start" + } + + # pause the destination node to make rdb channel timeout + pause_process $r1_pid + + # the source node will fail, the rdb child process can not + # write data to destination, so it will timeout + wait_for_condition 1000 30 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*RDB channel*Failed to send slots snapshot*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + resume_process $r1_pid + + R 0 config set repl-timeout 60 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Source node main channel timeout when sending incremental stream" { + R 0 flushall + R 0 config set repl-timeout 2 ;# 2s for main channel timeout + + set r1_pid [S 1 process_id] + # in order to have time to pause the destination node + R 1 config set key-load-delay 50000 ;# 50ms each 16k data + + # start migration from #0 to #1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # Create 200 keys of 16k size traffic on slot 0, streaming buffer need 10s (200*50ms) + populate_slot 200 -idx 0 -slot 0 -size 16384 + + # wait for streaming buffer state, then pause the destination node + wait_for_condition 1000 20 { + [string match {*streaming-buffer*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not stream buffer, state: [migration_status 1 $task_id state]" + } + pause_process $r1_pid + + # Start the slot 0 write load on the R 0 + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 [slot_key 0 mykey] 500] + + # the source node will fail after several seconds (including the time + # to fill the socket buffer of source node), the main channel can not + # write data to destination since the destination is paused + wait_for_condition 1000 30 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Main channel*Connection timeout*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + stop_write_load $load_handle + resume_process $r1_pid + + R 0 config set repl-timeout 60 + R 1 config set key-load-delay 0 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + R 0 flushall + } + + test "Source server paused timeout" { + # set timeout to 0, so the task will fail immediately when checking timeout + R 0 config set cluster-slot-migration-write-pause-timeout 0 + + # start migration from node 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # start the slot 0 write load on the node 0 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key] + + # node 0 will fail since server paused timeout + wait_for_condition 2000 10 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Server paused timeout*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + stop_write_load $load_handle + + # reset config + R 0 config set cluster-slot-migration-write-pause-timeout 10000 + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Sync buffer drain timeout" { + # set a fail point to avoid the source node to enter handoff prep state + # to test the sync buffer drain timeout + R 0 debug asm-failpoint "migrate-main-channel" "handoff-prep" + R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 5000 + + set r1_pid [S 1 process_id] + + # start migration from node 0 to 1 + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + # start the slot 0 write load on the node 0 + set slot0_key [slot_key 0 mykey] + set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key] + + # wait for entering streaming buffer state + wait_for_condition 1000 10 { + [string match {*wait-stream-eof*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not enter wait-stream-eof state" + } + + pause_process $r1_pid ;# avoid the destination to apply commands + + # node 0 will fail since sync buffer drain timeout + wait_for_condition 2000 10 { + [string match {*failed*} [migration_status 0 $task_id state]] && + [string match {*Sync buffer drain timeout*} \ + [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + + stop_write_load $load_handle + resume_process $r1_pid + + # reset config + R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 60000 + R 0 debug asm-failpoint "" "" + R 0 cluster migration cancel id $task_id + R 1 cluster migration cancel id $task_id + } + + test "Cluster implementation cannot start migrate task temporarily" { + # Inject a fail point to make the source node not ready + R 0 debug asm-failpoint "migrate-main-channel" "none" + + # start migration from node 0 to 1 + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100] + + # verify source node replies SYNCSLOTS with -NOTREADY + set loglines [count_log_lines -1] + wait_for_log_messages -1 {"*Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later*"} $loglines 100 100 + + # clear the fail point and verify the task is completed + R 0 debug asm-failpoint "" "" + wait_for_asm_done + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} { + test "Test bgtrim after a successful migration" { + R 0 debug asm-trim-method bg + R 3 debug asm-trim-method bg + R 0 CONFIG RESETSTAT + R 3 CONFIG RESETSTAT + + R 0 flushall + # Fill slot 0 + populate_slot 1000 -idx 0 -slot 0 + # Fill slot 1 with keys that have TTL + populate_slot 1000 -idx 0 -slot 1 -prefix "expirekey" -expires 100 + # HFE key on slot 2 + set slot2_hfekey [slot_key 2 hfekey] + R 0 HSETEX $slot2_hfekey EX 10 FIELDS 1 f1 v1 + + # Fill slot 101, these keys won't be migrated + populate_slot 1000 -idx 0 -slot 101 + # Fill slot 102 with keys that have TTL + populate_slot 1000 -idx 0 -slot 102 -prefix "expirekey" -expires 100 + # HFE key on slot 103 + set slot103_hfekey [slot_key 103 hfekey] + R 0 HSETEX $slot103_hfekey EX 10 FIELDS 1 f1 v1 + + # migrate slot 0 to node-1 + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + + # Verify the data is migrated + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 2001 [R 0 dbsize] + assert_equal 2001 [R 3 dbsize] + wait_for_ofs_sync [Rn 1] [Rn 4] + assert_equal 2001 [R 1 dbsize] + assert_equal 2001 [R 4 dbsize] + + # Verify the keys are trimmed lazily + wait_for_condition 1000 10 { + [S 0 lazyfreed_objects] == 2001 && + [S 3 lazyfreed_objects] == 2001 + } else { + puts "lazyfreed_objects: [S 0 lazyfreed_objects] [S 3 lazyfreed_objects]" + fail "Background trim did not happen" + } + + # Cleanup + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + } + + test "Test bgtrim after a failed migration" { + R 0 debug asm-trim-method bg + R 3 debug asm-trim-method bg + R 1 CONFIG RESETSTAT + R 4 CONFIG RESETSTAT + + # Fill slot 0 on node-0 and migrate it to node-1 (with some delay) + R 0 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000] + after 1000 ;# wait some time so that some keys are moved + + # Fail the migration + R 1 CLUSTER MIGRATION CANCEL ID $task_id + wait_for_asm_done + + # Verify the data is not migrated + assert_equal 10000 [R 0 dbsize] + assert_equal 10000 [R 3 dbsize] + + # Verify the keys are trimmed lazily after a failed import on dest side. + wait_for_condition 1000 20 { + [R 1 dbsize] == 0 && + [R 4 dbsize] == 0 && + [S 1 lazyfreed_objects] > 0 && + [S 4 lazyfreed_objects] > 0 + } else { + fail "Background trim did not happen" + } + + # Cleanup + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + } + + test "Test bgtrim unblocks stream client" { + # Two clients waiting for data on two different streams which are in + # different slots. We are going to migrate one slot, which will unblock + # the client. The other client should still be blocked. + R 0 debug asm-trim-method bg + + set key0 [slot_key 0 mystream] + set key1 [slot_key 1 mystream] + + # First client waits on slot-0 key + R 0 DEL $key0 + R 0 XADD $key0 666 f v + R 0 XGROUP CREATE $key0 mygroup $ + set rd0 [redis_deferring_client] + $rd0 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key0 ">" + wait_for_blocked_clients_count 1 + + # Second client waits on slot-1 key + R 0 DEL $key1 + R 0 XADD $key1 666 f v + R 0 XGROUP CREATE $key1 mygroup $ + set rd1 [redis_deferring_client] + $rd1 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key1 ">" + wait_for_blocked_clients_count 2 + + # Migrate slot 0 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + + # First client should get MOVED error + assert_error "*MOVED*" {$rd0 read} + $rd0 close + + # Second client should operate normally + R 0 XADD $key1 667 f v + set res [$rd1 read] + assert_equal [lindex $res 0 1 0] {667-0 {f v}} + $rd1 close + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + + test "Test bgtrim touches watched keys" { + R 0 debug asm-trim-method bg + + # bgtrim should touch watched keys on migrated slots + set key0 [slot_key 0 key] + R 0 set $key0 30 + R 0 watch $key0 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + R 0 multi + R 0 ping + assert_equal {} [R 0 exec] + + # bgtrim should not touch watched keys on other slots + set key2 [slot_key 2 key] + R 0 set $key2 30 + R 0 watch $key2 + R 1 CLUSTER MIGRATION IMPORT 1 1 + wait_for_asm_done + R 0 multi + R 0 ping + assert_equal PONG [R 0 exec] + + # cleanup + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_asm_done + R 0 flushall + R 0 debug asm-trim-method default + } + + test "Test bgtrim after a FAILOVER on destination side" { + R 1 debug asm-trim-method bg + R 4 debug asm-trim-method bg + + set loglines [count_log_lines -4] + + # Fill slot 0 on node-0 and migrate it to node-1 (with some delay) + R 0 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000] + after 1000 ;# wait some time so that some keys are moved + + # Trigger a failover with force to simulate unreachable master and + # verify unowned keys are trimmed once replica becomes master. + failover_and_wait_for_done 4 force + wait_for_log_messages -4 {"*Detected keys in slots that do not belong*Scheduling trim*"} $loglines 1000 10 + wait_for_condition 1000 10 { + [R 1 dbsize] == 0 && + [R 4 dbsize] == 0 + } else { + fail "Background trim did not happen" + } + + # cleanup + wait_for_cluster_propagation + failover_and_wait_for_done 1 + R 0 config set rdb-key-save-delay 0 + R 1 debug asm-trim-method default + R 4 debug asm-trim-method default + wait_for_asm_done + } + + test "CLUSTER SETSLOT is not allowed if there is a pending trim job" { + R 0 debug asm-trim-method bg + R 3 debug asm-trim-method bg + + # Fill slot 0 on node-0 and migrate it to node-1 (with some delay) + R 0 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000] + + # Pause will cancel the task and there will be a pending trim job + # until writes are allowed again. + R 1 client pause 100000 write ;# pause 100s + wait_for_asm_done + + # CLUSTER SETSLOT is not allowed if there is a pending trim job. + assert_error {*There is a pending trim job for slot 0*} {R 1 CLUSTER SETSLOT 0 STABLE} + + # Unpause the server, trim will be triggered and SETSLOT will be allowed + R 1 client unpause + R 1 CLUSTER SETSLOT 0 STABLE + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no save ""}} { + test "Test active trim after a successful migration" { + R 0 debug asm-trim-method active + R 3 debug asm-trim-method active + populate_slot 500 -slot 0 + populate_slot 500 -slot 1 + populate_slot 500 -slot 3 + populate_slot 500 -slot 4 + + # Migrate 1500 keys + R 1 CLUSTER MIGRATION IMPORT 0 1 3 3 + wait_for_asm_done + + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] == 1500 && + [CI 3 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_current_job_trimmed] == 1500 + } else { + fail "trim failed" + } + + assert_equal 1500 [CI 0 cluster_slot_migration_active_trim_current_job_keys] + assert_equal 1500 [CI 3 cluster_slot_migration_active_trim_current_job_keys] + + assert_equal 500 [R 0 dbsize] + assert_equal 500 [R 3 dbsize] + assert_equal 1500 [R 1 dbsize] + assert_equal 1500 [R 4 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 0 [R 0 cluster countkeysinslot 3] + assert_equal 500 [R 0 cluster countkeysinslot 4] + + # cleanup + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 1 3 3 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test multiple active trim jobs can be scheduled" { + # Active trim will be scheduled but it won't run + R 0 debug asm-trim-method active -1 + R 3 debug asm-trim-method active -1 + + populate_slot 500 -slot 0 + populate_slot 500 -slot 1 + populate_slot 500 -slot 3 + populate_slot 500 -slot 4 + + # Migrate 1500 keys + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # Migrate another slot and verify there are two trim tasks on the source + R 1 CLUSTER MIGRATION IMPORT 3 3 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 2 && + [CI 3 cluster_slot_migration_active_trim_running] == 2 + } else { + fail "migrate failed" + } + + # Enabled active trim and wait until it is completed. + R 0 debug asm-trim-method active 0 + R 3 debug asm-trim-method active 0 + wait_for_asm_done + + assert_equal 500 [R 0 dbsize] + assert_equal 500 [R 3 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 0 [R 0 cluster countkeysinslot 3] + assert_equal 500 [R 0 cluster countkeysinslot 4] + + # cleanup + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 1 3 3 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test active-trim clears partially imported keys on cancel" { + R 1 debug asm-trim-method active + R 4 debug asm-trim-method active + + # Rdb delivery will take 10 seconds + R 0 config set rdb-key-save-delay 10000 + populate_slot 250 -slot 0 + populate_slot 250 -slot 1 + populate_slot 250 -slot 3 + populate_slot 250 -slot 4 + + R 1 CLUSTER MIGRATION IMPORT 0 100 + after 2000 + R 1 CLUSTER MIGRATION CANCEL ALL + wait_for_asm_done + + assert_morethan [CI 1 cluster_slot_migration_active_trim_current_job_keys] 0 + assert_morethan [CI 4 cluster_slot_migration_active_trim_current_job_trimmed] 0 + + assert_equal 1000 [R 0 dbsize] + assert_equal 1000 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + + # Cleanup + R 1 debug asm-trim-method default + R 4 debug asm-trim-method default + R 0 config set rdb-key-save-delay 0 + } + + test "Test active-trim clears partially imported keys on failover" { + R 1 debug asm-trim-method active + R 4 debug asm-trim-method active + + # Rdb delivery will take 10 seconds + R 0 config set rdb-key-save-delay 10000 + + populate_slot 250 -slot 0 + populate_slot 250 -slot 1 + populate_slot 250 -slot 3 + populate_slot 250 -slot 4 + + set prev_trim_started_1 [CI 1 cluster_slot_migration_stats_active_trim_started] + set prev_trim_started_4 [CI 4 cluster_slot_migration_stats_active_trim_started] + + R 1 CLUSTER MIGRATION IMPORT 0 100 + after 2000 + failover_and_wait_for_done 4 + wait_for_asm_done + + # Verify there is at least one trim job started + assert_morethan [CI 1 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_1 + assert_morethan [CI 4 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_4 + + assert_equal 1000 [R 0 dbsize] + assert_equal 1000 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + + # Cleanup + failover_and_wait_for_done 1 + R 1 debug asm-trim-method default + R 4 debug asm-trim-method default + R 0 config set rdb-key-save-delay 0 + R 0 flushall + R 1 flushall + } + + test "Test import task does not start if active trim is in progress for the same slots" { + # Active trim will be scheduled but it won't run + R 0 flushall + R 1 flushall + R 0 debug asm-trim-method active -1 + + populate_slot 500 -slot 0 + populate_slot 500 -slot 1 + + # Migrate 1000 keys + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # Try to migrate slots back + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_log_messages 0 {"*Can not start import task*trim in progress for some of the slots*"} 0 1000 10 + + # Enabled active trim and verify slots are imported back + R 0 debug asm-trim-method active 0 + wait_for_asm_done + + assert_equal 1000 [R 0 dbsize] + assert_equal 500 [R 0 cluster countkeysinslot 0] + assert_equal 500 [R 0 cluster countkeysinslot 1] + + # cleanup + R 0 debug asm-trim-method default + R 0 flushall + } + + test "Rdb save during active trim should skip keys in trimmed slots" { + # Insert some delay to activate trim + R 0 debug asm-trim-method active 1000 + R 0 config set repl-diskless-sync-delay 0 + R 0 flushall + + populate_slot 5000 -idx 0 -slot 0 + populate_slot 5000 -idx 0 -slot 1 + populate_slot 5000 -idx 0 -slot 2 + + # Start migration and wait until trim is in progress + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 && + [S 0 rdb_bgsave_in_progress] == 0 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + # Trigger save during active trim + R 0 save + # Wait until the log contains a "keys skipped" message with a non-zero value + wait_for_log_messages 0 {"*BGSAVE done, 5000 keys saved, [1-9]* keys skipped*"} 0 1000 10 + + restart_server 0 yes no yes nosave + assert_equal 5000 [R 0 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 5000 [R 0 cluster countkeysinslot 2] + + # Cleanup + wait_for_cluster_propagation + wait_for_cluster_state "ok" + R 0 flushall + R 1 flushall + R 0 save + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_asm_done + } + + test "AOF rewrite during active trim should skip keys in trimmed slots" { + R 0 debug asm-trim-method active 1000 + R 0 config set repl-diskless-sync-delay 0 + R 0 config set aof-use-rdb-preamble no + R 0 config set appendonly yes + R 0 config rewrite + R 0 flushall + populate_slot 5000 -idx 0 -slot 0 + populate_slot 5000 -idx 0 -slot 1 + populate_slot 5000 -idx 0 -slot 2 + + R 1 CLUSTER MIGRATION IMPORT 0 1 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + wait_for_condition 50 100 { + [S 0 rdb_bgsave_in_progress] == 0 + } else { + fail "bgsave is in progress" + } + + R 0 bgrewriteaof + # Wait until the log contains a "keys skipped" message with a non-zero value + wait_for_log_messages 0 {"*AOF rewrite done, [1-9]* keys saved, [1-9]* keys skipped*"} 0 1000 10 + + restart_server 0 yes no yes nosave + assert_equal 5000 [R 0 dbsize] + assert_equal 0 [R 0 cluster countkeysinslot 0] + assert_equal 0 [R 0 cluster countkeysinslot 1] + assert_equal 5000 [R 0 cluster countkeysinslot 2] + + # cleanup + R 0 config set appendonly no + R 0 config rewrite + restart_server 0 yes no yes nosave + wait_for_cluster_propagation + wait_for_cluster_state "ok" + R 0 flushall + R 1 flushall + R 0 save + R 0 CLUSTER MIGRATION IMPORT 0 1 + wait_for_asm_done + } + + test "Pause actions will stop active trimming" { + R 0 debug asm-trim-method active 1000 + R 0 config set repl-diskless-sync-delay 0 + R 0 flushall + populate_slot 10000 -idx 0 -slot 0 + + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + # Pause the server and verify no keys are trimmed + R 0 client pause 100000 write ;# pause 100s + set prev [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] + after 1000 ; # wait some time to see if any keys are trimmed + set curr [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] + assert_equal $prev $curr + + R 0 client unpause + R 0 debug asm-trim-method default + wait_for_asm_done + assert_equal 0 [R 0 dbsize] + + # revert + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + assert_equal 10000 [R 0 dbsize] + } + + foreach diskless_load {"disabled" "swapdb" "on-empty-db"} { + test "Test fullsync cancels active trim (repl-diskless-load $diskless_load)" { + R 3 debug asm-trim-method active -10 + R 3 config set repl-diskless-load $diskless_load + R 0 flushall + + R 0 config set repl-diskless-sync-delay 0 + populate_slot 10000 -idx 0 -slot 0 + + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + puts "[CI 3 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + set prev_cancelled [CI 3 cluster_slot_migration_stats_active_trim_cancelled] + R 0 config set client-output-buffer-limit "replica 1024 0 0" + + # Trigger a fullsync + populate_slot 1 -idx 0 -size 2000000 -slot 2 + + wait_for_condition 1000 10 { + [CI 3 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_stats_active_trim_cancelled] == $prev_cancelled + 1 + } else { + puts "[CI 3 cluster_slot_migration_active_trim_running]" + puts "[CI 3 cluster_slot_migration_stats_active_trim_cancelled]" + fail "trim failed" + } + + R 3 debug asm-trim-method active 0 + R 3 config set repl-diskless-load disabled + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 10001 [R 0 dbsize] + assert_equal 10001 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + R 0 flushall + } + } + + test "Test importing slots while active-trim is in progress for the same slots on replica" { + R 3 debug asm-trim-method active 10000 + R 0 flushall + populate_slot 10000 -slot 0 + wait_for_ofs_sync [Rn 0] [Rn 3] + + # Wait until active trim is in progress on replica + R 1 CLUSTER MIGRATION IMPORT 0 100 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + puts "[CI 0 cluster_slot_migration_active_tasks]" + puts "[CI 0 cluster_slot_migration_active_trim_running]" + puts "[CI 3 cluster_slot_migration_active_trim_running]" + fail "trim failed" + } + + set loglines [count_log_lines -3] + + # Get slots back + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_condition 1000 20 { + [CI 0 cluster_slot_migration_active_tasks] == 1 && + [CI 0 cluster_slot_migration_active_trim_running] == 0 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "trim failed" + } + + # Verify replica blocks master until trim is done + wait_for_log_messages -3 {"*Blocking master client until trim job is done*"} $loglines 1000 30 + R 3 debug asm-trim-method active 0 + wait_for_log_messages -3 {"*Unblocking master client after active trim*"} $loglines 1000 30 + + wait_for_asm_done + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 10000 [R 0 dbsize] + assert_equal 10000 [R 3 dbsize] + assert_equal 0 [R 1 dbsize] + assert_equal 0 [R 4 dbsize] + } + + test "TRIMSLOTS should not trim slots that this node is serving" { + assert_error {*the slot 0 is served by this node*} {R 0 trimslots ranges 1 0 0} + assert_error {*READONLY*} {R 3 trimslots ranges 1 0 100} + assert_equal {OK} [R 0 trimslots ranges 1 16383 16383] + assert_error {*READONLY*} {R 3 trimslots ranges 1 16383 16383} + } + + test "Trigger multiple active trim jobs at the same time" { + R 1 debug asm-trim-method active 0 + R 1 flushall + + set prev_trim_done [CI 1 cluster_slot_migration_stats_active_trim_completed] + + R 1 debug populate 1000 [slot_prefix 0] 100 + R 1 debug populate 1000 [slot_prefix 1] 100 + R 1 debug populate 1000 [slot_prefix 2] 100 + + R 1 multi + R 1 trimslots ranges 1 0 0 + R 1 trimslots ranges 1 1 1 + R 1 trimslots ranges 1 2 2 + R 1 exec + + wait_for_condition 1000 10 { + [CI 1 cluster_slot_migration_stats_active_trim_completed] == $prev_trim_done + 3 + } else { + fail "active trim failed" + } + + R 1 flushall + R 1 debug asm-trim-method default + } + + test "Restart will clean up unowned slot keys" { + R 1 flushall + + # generate 1000 keys belonging to slot 0 + R 1 debug populate 1000 [slot_prefix 0] 100 + assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1000} + + # restart node-1 + restart_server -1 true false true save + wait_for_cluster_propagation + wait_for_cluster_state "ok" + + # Node-1 has no keys since unowned slot 0 keys were cleaned up during restart + assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] == {}} + + R 1 flushall + } + + test "Test active trim is used when client tracking is used" { + R 0 flushall + R 1 flushall + R 0 debug asm-trim-method default + R 1 debug asm-trim-method default + + set prev_active_trim [CI 0 cluster_slot_migration_stats_active_trim_completed] + + # Setup a tracking client that is redirected to a pubsub client + set rd_redirection [redis_deferring_client] + $rd_redirection client id + set redir_id [$rd_redirection read] + $rd_redirection subscribe __redis__:invalidate + $rd_redirection read ; # Consume the SUBSCRIBE reply. + + # setup tracking + set key0 [slot_key 0 key] + R 0 CLIENT TRACKING on REDIRECT $redir_id + R 0 SET $key0 1 + R 0 GET $key0 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_stats_active_trim_completed] == [expr $prev_active_trim + 1] + } else { + fail "active trim did not happen" + } + + # Verify the tracking client received the invalidation message + set msg [$rd_redirection read] + set head [lindex $msg 0] + + if {$head eq "message"} { + # RESP 2 + set got_key [lindex [lindex $msg 2] 0] + } elseif {$head eq "invalidate"} { + # RESP 3 + set got_key [lindex $msg 1 0] + } else { + fail "unexpected invalidation message: $msg" + } + assert_equal $got_key $key0 + + # cleanup + $rd_redirection close + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + R 0 flushall + } +} + +set testmodule [file normalize tests/modules/atomicslotmigration.so] + +start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no]] { + test "Module api sanity" { + R 0 asm.sanity ;# on master + R 3 asm.sanity ;# on replica + } + + test "Module replicate cross slot command" { + set task_id [setup_slot_migration_with_delay 0 1 0 100] + set listkey [slot_key 0 "asmlist"] + # replicate cross slot command during migrating + R 0 asm.lpush_replicate_crossslot_command $listkey "item1" + + # node 0 will fail due to cross slot + wait_for_condition 2000 10 { + [string match {*canceled*} [migration_status 0 $task_id state]] && + [string match {*cross slot*} [migration_status 0 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + R 1 CLUSTER MIGRATION CANCEL ID $task_id + + # sanity check if lpush replicated correctly to the replica + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal {item1} [R 0 lrange $listkey 0 -1] + R 3 readonly + assert_equal {item1} [R 3 lrange $listkey 0 -1] + } + + test "Test RM_ClusterCanAccessKeysInSlot" { + # Test invalid slots + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot -1] + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 20000] + assert_equal 0 [R 2 asm.cluster_can_access_keys_in_slot 16384] + assert_equal 0 [R 5 asm.cluster_can_access_keys_in_slot 16384] + + # Test on a master-replica pair + assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 100] + assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 100] + + # Test on a master-replica pair + assert_equal 1 [R 2 asm.cluster_can_access_keys_in_slot 16383] + assert_equal 1 [R 5 asm.cluster_can_access_keys_in_slot 16383] + } + + test "Test RM_ClusterCanAccessKeysInSlot returns false for unowned slots" { + # Active trim will be scheduled but it won't run + R 0 debug asm-trim-method active -1 + R 3 debug asm-trim-method active -1 + + setup_slot_migration_with_delay 0 1 0 100 3 1000000 + + # Verify importing slots are not local + assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 100] + assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 100] + + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 && + [CI 3 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # Wait for config propagation before checking the slot ownership on replica + wait_for_cluster_propagation + + # Verify slots that are being trimmed are not local + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 100] + assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 0] + assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 100] + + # Enabled active trim and wait until it is completed. + R 0 debug asm-trim-method active 0 + R 3 debug asm-trim-method active 0 + wait_for_asm_done + wait_for_ofs_sync [Rn 0] [Rn 3] + + # Verify slots are local after migration + assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 100] + assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 0] + assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 100] + + # cleanup + R 0 debug asm-trim-method default + R 3 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + foreach trim_method {"active" "bg"} { + test "Test cluster module notifications on a successful migration ($trim_method-trim)" { + clear_module_event_log + R 0 debug asm-trim-method $trim_method + R 3 debug asm-trim-method $trim_method + R 6 debug asm-trim-method $trim_method + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Migrate the slot ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100 200 300] + wait_for_asm_done + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Verify the events on source, both master and replica + set migrate_event_log [list \ + "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + "sub: cluster-slot-migration-migrate-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + ] + assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log + assert_equal [R 3 asm.get_cluster_event_log] {} + assert_equal [R 6 asm.get_cluster_event_log] {} + + # Verify the events on destination, both master and replica + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \ + ] + wait_for_condition 500 20 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + puts "R1: [R 1 asm.get_cluster_event_log]" + puts "R4: [R 4 asm.get_cluster_event_log]" + puts "R7: [R 7 asm.get_cluster_event_log]" + fail "ASM import event not received" + } + + # Verify the trim events + if {$trim_method eq "active"} { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-100,200-300" \ + "keyspace: key_trimmed, key: $key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-100,200-300" \ + ] + } else { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-background, slots:0-100,200-300" \ + ] + } + wait_for_condition 500 10 { + [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + fail "ASM source trim event not received" + } + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 100 200 300 + wait_for_asm_done + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + + test "Test cluster module notifications on a failed migration ($trim_method-trim)" { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration and cancel it + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + R 1 CLUSTER MIGRATION CANCEL ID $task_id + wait_for_asm_done + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Verify the events on source, both master and replica + set migrate_event_log [list \ + "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log + assert_equal [R 3 asm.get_cluster_event_log] {} + assert_equal [R 6 asm.get_cluster_event_log] {} + + # Verify the events on destination, both master and replica + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 10 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + fail "ASM import event not received" + } + + # Verify the trim events on destination (partially imported keys are trimmed) + if {$trim_method eq "active"} { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-100" \ + "keyspace: key_trimmed, key: $key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-100" \ + ] + } else { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-background, slots:0-100" \ + ] + } + wait_for_condition 500 10 { + [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + fail "ASM destination trim event not received" + } + + # cleanup + clear_module_event_log + reset_default_trim_method + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test cluster module notifications on failover ($trim_method-trim)" { + # NOTE: cluster legacy may have a bug, multiple manual failover will fail, + # so only perform one round of failover test, fix it later + if {$trim_method eq "bg"} { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + + failover_and_wait_for_done 4 + wait_for_asm_done + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Verify the events on source, both master and replica + set migrate_event_log [list \ + "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log + assert_equal [R 3 asm.get_cluster_event_log] {} + assert_equal [R 6 asm.get_cluster_event_log] {} + + # Verify the events on destination, both master and replica + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 20 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + puts "R1: [R 1 asm.get_cluster_event_log]" + puts "R4: [R 4 asm.get_cluster_event_log]" + puts "R7: [R 7 asm.get_cluster_event_log]" + fail "ASM import event not received" + } + + # Verify the trim events on destination (partially imported keys are trimmed) + # NOTE: after failover, the new master will initiate the slot trimming, + # and only slot 0 has data, so only slot 0 is trimmed + if {$trim_method eq "active"} { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-0" \ + "keyspace: key_trimmed, key: $key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-0" \ + ] + } else { + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-background, slots:0-0" \ + ] + } + wait_for_condition 500 20 { + [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + puts "R1: [R 1 asm.get_cluster_trim_event_log]" + puts "R4: [R 4 asm.get_cluster_trim_event_log]" + puts "R7: [R 7 asm.get_cluster_trim_event_log]" + fail "ASM destination trim event not received" + } + + # cleanup + failover_and_wait_for_done 1 + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + } + } + + foreach with_rdb {"with" "without"} { + test "Test cluster module notifications when replica restart $with_rdb RDB during importing" { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + R 4 config set save "" + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration, 2s delay + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + wait_for_ofs_sync [Rn 1] [Rn 4] + + # restart node 4 + if {$with_rdb eq "with"} { + restart_server -4 true false true save ;# rdb save + } else { + restart_server -4 true false true nosave ;# no rdb saved + } + wait_for_cluster_propagation + + wait_for_asm_done + + # started and completed are paired, and not duplicated + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 10 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + fail "ASM import event not received" + } + + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 4 save ;# save an empty rdb to override previous one + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + } + + test "Test cluster module notifications when replica is disconnected and full resync after importing" { + clear_module_event_log + R 1 debug asm-trim-method $trim_method + R 4 debug asm-trim-method $trim_method + R 7 debug asm-trim-method $trim_method + + set src_id [R 0 cluster myid] + set dest_id [R 1 cluster myid] + + # Set a key in the slot range + set key [slot_key 0 mykey] + R 0 set $key "value" + + # Start migration, 2s delay + set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000] + # Wait until at least one key is moved to destination + wait_for_condition 1000 10 { + [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1 + } else { + fail "Key not moved to destination" + } + wait_for_ofs_sync [Rn 1] [Rn 4] + + # puase node-4 + set r4_pid [S 4 process_id] + pause_process $r4_pid + + # set a small repl-backlog-size and write some commands to make node-4 + # full resync when reconnecting after waking up + set r1_full_sync [S 1 sync_full] + R 1 config set repl-backlog-size 16kb + R 1 client kill type replica + set 1k_str [string repeat "a" 1024] + for {set i 0} {$i < 2000} {incr i} { + R 1 set [slot_key 6000] $1k_str + } + + # after ASM task is completed, wake up node-4 + wait_for_condition 1000 10 { + [CI 1 cluster_slot_migration_active_tasks] == 0 && + [CI 1 cluster_slot_migration_active_trim_running] == 0 + } else { + fail "ASM tasks did not completed" + } + resume_process $r4_pid + + # make sure full resync happens + wait_for_sync [Rn 4] + wait_for_ofs_sync [Rn 1] [Rn 4] + assert_morethan [S 1 sync_full] $r1_full_sync + + # started and completed are paired, and not duplicated + set import_event_log [list \ + "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \ + ] + wait_for_condition 500 10 { + [R 1 asm.get_cluster_event_log] eq $import_event_log && + [R 4 asm.get_cluster_event_log] eq $import_event_log && + [R 7 asm.get_cluster_event_log] eq $import_event_log + } else { + fail "ASM import event not received" + } + + # since ASM task is completed on node-1 before node-4 reconnects, + # no trim event should be received on node-4 + assert_equal {} [R 4 asm.get_cluster_trim_event_log] + + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + + test "Test new master can trim slots when migration is completed and failover occurs on source side" { + R 0 asm.disable_trim ;# can not start slot trimming on source side + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "value" + + # migrate slot 0 from #0 to #1, and wait it completed, but not allow to trim slots + # on source node + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] && + [string match {*completed*} [migration_status 1 $task_id state]] + } else { + fail "ASM task did not complete" + } + # verify trim is not allowed on source node, and replica node doesn't have trim job either + wait_for_ofs_sync [Rn 0] [Rn 3] + assert_equal 1 [R 0 asm.trim_in_progress] + assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key] + assert_equal 0 [R 3 asm.trim_in_progress] + assert_equal "value" [R 3 asm.read_pending_trim_key $slot0_key] + + set loglines [count_log_lines 0] + + # failover happens on source node, instance #3 become slave, #0 become master + failover_and_wait_for_done 3 + R 0 asm.enable_trim ;# enable trim on old master + + # old master should cancel the pending trim job + wait_for_log_messages 0 {"*Cancelling the pending trim job*"} $loglines 1000 10 + + wait_for_ofs_sync [Rn 3] [Rn 0] + # verify trim is allowed on new master, and the key is trimmed + wait_for_condition 1000 10 { + [R 3 asm.trim_in_progress] == 0 && + [R 3 asm.read_pending_trim_key $slot0_key] eq "" && + [R 0 asm.trim_in_progress] == 0 && + [R 0 asm.read_pending_trim_key $slot0_key] eq "" + } else { + fail "Trim did not complete" + } + + # verify the trim events, use active trim since module is subscribed to trimmed event + set trim_event_log [list \ + "sub: cluster-slot-migration-trim-started, slots:0-0" \ + "keyspace: key_trimmed, key: $slot0_key" \ + "sub: cluster-slot-migration-trim-completed, slots:0-0" \ + ] + wait_for_condition 500 20 { + [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log && + [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log + } else { + fail "ASM destination trim event not received" + } + + # cleanup + failover_and_wait_for_done 0 + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + clear_module_event_log + reset_default_trim_method + R 0 flushall + R 1 flushall + } + + test "Test module replicates commands at the beginning of slot migration " { + R 0 flushall + R 1 flushall + + # Sanity check + assert_equal 0 [R 1 asm.read_keyless_cmd_val] + assert_equal 0 [R 4 asm.read_keyless_cmd_val] + + # Enable module command replication and set a key to be replicated + # Module will replicate two commands: + # 1- A keyless command: asm.keyless_cmd + # 2- SET command for the given key and value + set keyname [slot_key 0 modulekey] + R 0 asm.replicate_module_command 1 $keyname "value" + + setup_slot_migration_with_delay 0 1 0 100 + wait_for_asm_done + wait_for_ofs_sync [Rn 1] [Rn 4] + + # Verify the commands are replicated + assert_equal 1 [R 1 asm.read_keyless_cmd_val] + assert_equal value [R 1 get $keyname] + + # Verify the commands are replicated to replica + R 4 readonly + assert_equal 1 [R 4 asm.read_keyless_cmd_val] + assert_equal value [R 4 get $keyname] + + # cleanup + R 0 asm.replicate_module_command 0 "" "" + R 0 CLUSTER MIGRATION IMPORT 0 100 + wait_for_asm_done + R 0 flushall + R 1 flushall + } + + test "Test subcommand propagation during slot migration" { + R 0 flushall + R 1 flushall + set task_id [setup_slot_migration_with_delay 0 1 0 100] + + set key [slot_key 0 mykey] + R 0 asm.parent set $key "value" ;# execute a module subcommand + wait_for_asm_done + assert_equal "value" [R 1 GET $key] + + # cleanup + R 0 cluster migration import 0 100 + wait_for_asm_done + } + + test "Test trim method selection based on module keyspace subscription" { + R 0 debug asm-trim-method default + R 1 debug asm-trim-method default + + R 0 flushall + R 1 flushall + + populate_slot 10 -idx 0 -slot 0 + + # Make sure module is subscribed to NOTIFY_KEY_TRIMMED event. In this + # case, active trim must be used. + R 0 asm.subscribe_trimmed_event 1 + set loglines [count_log_lines 0] + R 1 CLUSTER MIGRATION IMPORT 0 15 + wait_for_asm_done + wait_for_log_messages 0 {"*Active trim scheduled for slots: 0-15*"} $loglines 1000 10 + + # Move slots back to node-0. Make sure module is not subscribed to + # NOTIFY_KEY_TRIMMED event. In this case, background trim must be used. + R 1 asm.subscribe_trimmed_event 0 + set loglines [count_log_lines -1] + R 0 CLUSTER MIGRATION IMPORT 0 15 + wait_for_asm_done + wait_for_log_messages -1 {"*Background trim started for slots: 0-15*"} $loglines 1000 10 + + # cleanup + wait_for_asm_done + R 0 asm.subscribe_trimmed_event 1 + R 1 asm.subscribe_trimmed_event 1 + R 0 flushall + R 1 flushall + } + + test "Verify trimmed key value can be read in the server event callback" { + R 0 flushall + set key [slot_key 0] + set value "value123random" + R 0 set $key $value + + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + wait_for_condition 1000 10 { + [R 0 asm.get_last_deleted_key] eq "keyevent: key: $key, value: $value" + } else { + fail "Last deleted key event not received" + } + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + } + + test "Verify module cannot open a key in a slot that is being trimmed" { + R 0 flushall + R 0 debug asm-trim-method active -1 ;# disable active trim + + set key [slot_key 0] + R 0 set $key value + + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_condition 1000 10 { + [CI 0 cluster_slot_migration_active_tasks] == 0 && + [CI 1 cluster_slot_migration_active_tasks] == 0 && + [CI 0 cluster_slot_migration_active_trim_running] == 1 + } else { + fail "migrate failed" + } + + # We cannot open the key since it is in a slot being trimmed + assert_equal {} [R 0 asm.get $key] + + # cleanup + R 0 debug asm-trim-method default + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + } + + test "Test RM_ClusterGetLocalSlotRanges" { + assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461}} + assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461}} + + R 0 cluster migration import 5463 6000 + wait_for_asm_done + wait_for_cluster_propagation + assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}} + assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}} + + R 0 cluster migration import 5462 5462 6001 10922 + wait_for_asm_done + wait_for_cluster_propagation + assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 10922}} + assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 10922}} + assert_equal [R 1 asm.cluster_get_local_slot_ranges] {} + assert_equal [R 4 asm.cluster_get_local_slot_ranges] {} + } +} + +set testmodule [file normalize tests/modules/atomicslotmigration.so] + +start_cluster 2 0 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no appendonly yes]] { + test "TRIMSLOTS in AOF will work synchronously on restart" { + # When TRIMSLOTS is replayed from AOF during restart, it must execute + # synchronously rather than using active trim. This prevents race + # conditions where subsequent AOF commands might operate on keys + # that should have been trimmed. + + # Subscribe to key trimmed event to force active trim + R 0 asm.subscribe_trimmed_event 1 + populate_slot 1000 -slot 0 + populate_slot 1000 -slot 1 + R 1 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + + # verify active trim is used + assert_equal 1 [CI 0 cluster_slot_migration_stats_active_trim_completed] + + # restart server and verify aof is loaded + restart_server 0 yes no yes nosave + assert {[scan [regexp -inline {aof_current_size:([\d]*)} [R 0 info persistence]] aof_current_size=%d] > 0} + wait_for_cluster_state "ok" + + # verify TRIMSLOTS in AOF is executed synchronously + assert_equal 0 [CI 0 cluster_slot_migration_stats_active_trim_completed] + assert_equal 1000 [R 0 dbsize] + + # cleanup + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + assert_equal 2000 [R 0 dbsize] + R 0 flushall + R 1 flushall + clear_module_event_log + + } + + test "Test trim is disabled when module requests it" { + R 0 asm.disable_trim + + set slot0_key [slot_key 0 mykey] + R 0 set $slot0_key "value" + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not complete" + } + # since we disable trim, the key should still exist on source, + # we can read it with REDISMODULE_OPEN_KEY_ACCESS_TRIMMED flag + assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key] + assert_equal 1 [R 0 asm.trim_in_progress] + + # enable trim and verify the key is trimmed + R 0 asm.enable_trim + wait_for_condition 1000 10 { + [R 0 asm.read_pending_trim_key $slot0_key] eq "" && + [R 0 asm.trim_in_progress] == 0 + } else { + fail "Trim did not complete" + } + wait_for_asm_done + R 0 CLUSTER MIGRATION IMPORT 0 0 + wait_for_asm_done + clear_module_event_log + } + + test "Can not start new asm task when trim is not allowed" { + # start a migration task, wait it completed but not allow to trim slots + R 0 asm.disable_trim + set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not complete" + } + # Can not start new migrating task since trim is disabled + set task_id [R 1 CLUSTER MIGRATION IMPORT 1 1] + wait_for_condition 1000 10 { + [string match {*fail*} [migration_status 1 $task_id state]] && + [string match {*Trim is disabled by module*} [migration_status 1 $task_id last_error]] + } else { + fail "ASM task did not fail" + } + R 0 asm.enable_trim + wait_for_asm_done + + # start a migration task, wait it completed but not allow to trim slots + R 0 asm.disable_trim + set task_id [R 1 CLUSTER MIGRATION IMPORT 2 2] + wait_for_condition 1000 10 { + [string match {*completed*} [migration_status 0 $task_id state]] + } else { + fail "ASM task did not complete" + } + set logline [count_log_lines 0] + # Can not start new importing task since trim is disabled + set task_id [R 0 CLUSTER MIGRATION IMPORT 0 1] + wait_for_log_messages 0 {"*Can not start import task*trim is disabled by module*"} $logline 1000 10 + R 0 asm.enable_trim + wait_for_asm_done + } +} + +start_server {tags "cluster external:skip"} { + test "Test RM_ClusterGetLocalSlotRanges without cluster" { + r module load $testmodule + assert_equal [r asm.cluster_get_local_slot_ranges] {{0 16383}} + } +} +} |
