aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl')
-rw-r--r--examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl3063
1 files changed, 3063 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl
new file mode 100644
index 0000000..f04257f
--- /dev/null
+++ b/examples/redis-unstable/tests/unit/cluster/atomic-slot-migration.tcl
@@ -0,0 +1,3063 @@
1set ::slot_prefixes [dict create \
2 0 "{06S}" \
3 1 "{Qi}" \
4 2 "{5L5}" \
5 3 "{4Iu}" \
6 4 "{4gY}" \
7 5 "{460}" \
8 6 "{1Y7}" \
9 7 "{1LV}" \
10 101 "{1j2}" \
11 102 "{75V}" \
12 103 "{bno}" \
13 5462 "{450}"\
14 5463 "{4dY}"\
15 6000 "{4L7}" \
16 6001 "{4YV}" \
17 6002 "{0bx}" \
18 6003 "{AJ}" \
19 6004 "{of}" \
20 16383 "{6ZJ}" \
21]
22
23# Helper functions
24proc get_port {node_id} {
25 if {$::tls} {
26 return [lindex [R $node_id config get tls-port] 1]
27 } else {
28 return [lindex [R $node_id config get port] 1]
29 }
30}
31
32# return the prefix for the given slot
33proc slot_prefix {slot} {
34 return [dict get $::slot_prefixes $slot]
35}
36
37# return a key for the given slot
38proc slot_key {slot {suffix ""}} {
39 return "[slot_prefix $slot]$suffix"
40}
41
42# Populate a slot with keys
43# TODO: Consider merging with populate()
44proc populate_slot {num args} {
45 # Default values
46 set prefix "key:"
47 set size 3
48 set idx 0
49 set prints false
50 set expires 0
51 set slot -1
52
53 # Parse named arguments
54 foreach {key value} $args {
55 switch -- $key {
56 -prefix { set prefix $value }
57 -size { set size $value }
58 -idx { set idx $value }
59 -prints { set prints $value }
60 -expires { set expires $value }
61 -slot { set slot $value }
62 default { error "Unknown option: $key" }
63 }
64 }
65
66 # If slot is specified, use slot prefix from table
67 if {$slot >= 0} {
68 if {[dict exists $::slot_prefixes $slot]} {
69 set prefix [dict get $::slot_prefixes $slot]
70 } else {
71 error "Slot $slot not supported in slot_prefixes table, add it manually"
72 }
73 }
74
75 R $idx deferred 1
76 if {$num > 16} {set pipeline 16} else {set pipeline $num}
77 set val [string repeat A $size]
78 for {set j 0} {$j < $pipeline} {incr j} {
79 if {$expires > 0} {
80 R $idx set $prefix$j $val ex $expires
81 } else {
82 R $idx set $prefix$j $val
83 }
84 if {$prints} {puts $j}
85 }
86 for {} {$j < $num} {incr j} {
87 if {$expires > 0} {
88 R $idx set $prefix$j $val ex $expires
89 } else {
90 R $idx set $prefix$j $val
91 }
92 R $idx read
93 if {$prints} {puts $j}
94 }
95 for {set j 0} {$j < $pipeline} {incr j} {
96 R $idx read
97 if {$prints} {puts $j}
98 }
99 R $idx deferred 0
100}
101
102# Return 1 if all instances are idle
103proc asm_all_instances_idle {total} {
104 for {set i 0} {$i < $total} {incr i} {
105 if {[CI $i cluster_slot_migration_active_tasks] != 0} { return 0 }
106 if {[CI $i cluster_slot_migration_active_trim_running] != 0} { return 0 }
107 }
108 return 1
109}
110
111# Wait for all ASM tasks to complete in the cluster
112proc wait_for_asm_done {} {
113 set total_instances [expr {$::cluster_master_nodes + $::cluster_replica_nodes}]
114
115 wait_for_condition 1000 10 {
116 [asm_all_instances_idle $total_instances] == 1
117 } else {
118 # Print the number of active tasks on each instance
119 for {set i 0} {$i < $total_instances} {incr i} {
120 set migration_count [CI $i cluster_slot_migration_active_tasks]
121 set trim_count [CI $i cluster_slot_migration_active_trim_running]
122 puts "Instance $i: migration_tasks=$migration_count, trim_tasks=$trim_count"
123 }
124 fail "ASM tasks did not complete on all instances"
125 }
126 # wait all nodes to reach the same cluster config after ASM
127 wait_for_cluster_propagation
128}
129
130proc failover_and_wait_for_done {node_id {failover_arg ""}} {
131 set max_attempts 5
132 for {set attempt 1} {$attempt <= $max_attempts} {incr attempt} {
133 if {$failover_arg eq ""} {
134 R $node_id cluster failover
135 } else {
136 R $node_id cluster failover $failover_arg
137 }
138
139 set completed 1
140 wait_for_condition 1000 10 {
141 [string match "*master*" [R $node_id role]]
142 } else {
143 set completed 0
144 }
145
146 if {$completed} {
147 wait_for_cluster_propagation
148 return
149 }
150 }
151 fail "Failover did not complete after $max_attempts attempts for node $node_id"
152}
153
154proc migration_status {node_id task_id field} {
155 set status [R $node_id CLUSTER MIGRATION STATUS ID $task_id]
156
157 # STATUS ID returns single task, so get first element
158 if {[llength $status] == 0} {
159 return ""
160 }
161
162 set task_status [lindex $status 0]
163 set field_value ""
164
165 # Parse the key-value pairs in the task
166 for {set i 0} {$i < [llength $task_status]} {incr i 2} {
167 set key [lindex $task_status $i]
168 set value [lindex $task_status [expr $i + 1]]
169
170 if {$key eq $field} {
171 set field_value $value
172 break
173 }
174 }
175
176 return $field_value
177}
178
179# Setup slot migration test with keys and delay, then start migration
180# Returns the task_id for the migration
181proc setup_slot_migration_with_delay {src_node dst_node start_slot end_slot {keys 2} {delay 1000000}} {
182 # Two keys on the start slot
183 populate_slot $keys -idx $src_node -slot $start_slot
184
185 # we set a delay to ensure migration takes time for testing,
186 # with default parameters, two keys cost 2s to save
187 R $src_node config set rdb-key-save-delay $delay
188
189 # migrate slot range from src_node to dst_node
190 set task_id [R $dst_node CLUSTER MIGRATION IMPORT $start_slot $end_slot]
191 wait_for_condition 2000 10 {
192 [string match {*send-bulk-and-stream*} [migration_status $src_node $task_id state]]
193 } else {
194 fail "ASM task did not start"
195 }
196
197 return $task_id
198}
199
200# Helper function to clear module internal event logs
201proc clear_module_event_log {} {
202 for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
203 R $i asm.clear_event_log
204 }
205}
206
207proc reset_default_trim_method {} {
208 for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
209 R $i debug asm-trim-method default
210 }
211}
212
213start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
214 foreach trim_method {"active" "bg"} {
215 test "Simple slot migration (trim method: $trim_method)" {
216 R 0 debug asm-trim-method $trim_method
217 R 3 debug asm-trim-method $trim_method
218
219 set slot0_key [slot_key 0 mykey]
220 R 0 set $slot0_key "a"
221 set slot1_key [slot_key 1 mykey]
222 R 0 set $slot1_key "b"
223 set slot101_key [slot_key 101 mykey]
224 R 0 set $slot101_key "c"
225 # 3 keys cost 3s to save
226 R 0 config set rdb-key-save-delay 1000000
227
228 # load a function
229 R 0 function load {#!lua name=test1
230 redis.register_function('test1', function() return 'hello1' end)
231 }
232
233 # migrate slot 0-100 to R 1
234 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
235 # migration is start, and in accumulating buffer stage
236 wait_for_condition 1000 50 {
237 [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]] &&
238 [string match {*accumulate-buffer*} [migration_status 1 $task_id state]]
239 } else {
240 fail "ASM task did not start"
241 }
242
243 # append 99 times during migration
244 for {set i 0} {$i < 99} {incr i} {
245 R 0 multi
246 R 0 append $slot0_key "a"
247 R 0 exec
248 R 0 append $slot1_key "b"
249 R 0 append $slot101_key "c"
250 }
251
252 # wait until migration of 0-100 successful
253 wait_for_asm_done
254
255 # verify task state became completed
256 assert_equal "completed" [migration_status 0 $task_id state]
257 assert_equal "completed" [migration_status 1 $task_id state]
258
259 # the appended 99 times should also be migrated
260 assert_equal [string repeat a 100] [R 1 get $slot0_key]
261 assert_equal [string repeat b 100] [R 1 get $slot1_key]
262
263 # function should be migrated
264 assert_equal [R 0 function dump] [R 1 function dump]
265 # the slave should also get the data
266 wait_for_ofs_sync [Rn 1] [Rn 4]
267
268 R 4 readonly
269 assert_equal [string repeat a 100] [R 4 get $slot0_key]
270 assert_equal [string repeat b 100] [R 4 get $slot1_key]
271 assert_equal [R 0 function dump] [R 4 function dump]
272
273 # verify key that was not in the slot range is not migrated
274 assert_equal [string repeat c 100] [R 0 get $slot101_key]
275 # verify changes in replica
276 wait_for_ofs_sync [Rn 0] [Rn 3]
277 R 3 readonly
278 assert_equal [string repeat c 100] [R 3 get $slot101_key]
279
280 # cleanup
281 R 0 config set rdb-key-save-delay 0
282 R 0 flushall
283 R 0 function flush
284 R 1 flushall
285 R 1 function flush
286 R 0 CLUSTER MIGRATION IMPORT 0 100
287 wait_for_asm_done
288 }
289 }
290}
291
292# Skip most of the tests when running under valgrind since it is hard to
293# stabilize tests under valgrind.
294if {!$::valgrind} {
295start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
296 test "Test CLUSTER MIGRATION IMPORT input validation" {
297 # invalid arguments
298 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION}
299 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT}
300 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100}
301 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION IMPORT 100 200 300}
302 assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION UNKNOWN 1 2}
303
304 # invalid slot range
305 assert_error {*greater than end slot number*} {R 0 CLUSTER MIGRATION IMPORT 200 100}
306 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 17000 18000}
307 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 14000 18000}
308 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 16384}
309 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 0 -1}
310 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -1 2}
311 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT -2 -1}
312 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT 10 a}
313 assert_error {*out of range slot*} {R 0 CLUSTER MIGRATION IMPORT sd sd}
314 assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 200}
315 }
316
317 test "Test CLUSTER MIGRATION CANCEL input validation" {
318 # invalid arguments
319 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL}
320 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID}
321 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ID 12345 EXTRAARG}
322 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION CANCEL ALL EXTRAARG}
323 assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL UNKNOWNARG}
324 assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION CANCEL abc def}
325 # empty string id should not cancel any task
326 assert_equal 0 [R 0 CLUSTER MIGRATION CANCEL ID ""]
327 }
328
329 test "Test CLUSTER MIGRATION STATUS input validation" {
330 # invalid arguments
331 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS}
332 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID}
333 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ID id EXTRAARG}
334 assert_error {*wrong number of arguments*} {R 0 CLUSTER MIGRATION STATUS ALL EXTRAARG}
335 assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS ABC DEF}
336 assert_error {*unknown argument*} {R 0 CLUSTER MIGRATION STATUS UNKNOWNARG}
337 # empty string id should not list any task
338 assert_equal {} [R 0 CLUSTER MIGRATION STATUS ID ""]
339 }
340
341 test "Test TRIMSLOTS input validation" {
342 # Wrong number of arguments
343 assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS}
344 assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES}
345 assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 1}
346 assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 2 100}
347 assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES 17000 1}
348 assert_error {*wrong number of arguments*} {R 0 TRIMSLOTS RANGES abc}
349
350 # Missing ranges argument
351 assert_error {*missing ranges argument*} {R 0 TRIMSLOTS UNKNOWN 1 100 200}
352
353 # Invalid number of ranges
354 assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 0 1 1}
355 assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES -1 2 2}
356 assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 17000 1 2}
357 assert_error {*invalid number of ranges*} {R 0 TRIMSLOTS RANGES 2 100 200 300}
358
359 # Invalid slot numbers
360 assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -1 0}
361 assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 -2 -1}
362 assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 0 16384}
363 assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 abc def}
364 assert_error {*out of range slot*} {R 0 TRIMSLOTS RANGES 1 100 abc}
365
366 # Start slot greater than end slot
367 assert_error {*greater than end slot number*} {R 0 TRIMSLOTS RANGES 1 200 100}
368 }
369
370 test "Test IMPORT not allowed on replica" {
371 assert_error {* not allowed on replica*} {R 4 CLUSTER MIGRATION IMPORT 100 200}
372 }
373
374 test "Test IMPORT not allowed during manual migration" {
375 set dst_id [R 1 CLUSTER MYID]
376
377 # Set a slot to IMPORTING
378 R 0 CLUSTER SETSLOT 15000 IMPORTING $dst_id
379 assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200}
380 # Revert the change
381 R 0 CLUSTER SETSLOT 15000 STABLE
382
383 # Same test with setting a slot to MIGRATING
384 R 0 CLUSTER SETSLOT 5000 MIGRATING $dst_id
385 assert_error {*must be STABLE to start*slot migration*} {R 0 CLUSTER MIGRATION IMPORT 100 200}
386 # Revert the change
387 R 0 CLUSTER SETSLOT 5000 STABLE
388 }
389
390 test "Test IMPORT not allowed if the node is already the owner" {
391 assert_error {*already the owner of the slot*} {R 0 CLUSTER MIGRATION IMPORT 100 100}
392 }
393
394 test "Test IMPORT not allowed for a slot without an owner" {
395 # Slot will have no owner
396 R 0 CLUSTER DELSLOTS 5000
397
398 assert_error {*slot has no owner: 5000*} {R 0 CLUSTER MIGRATION IMPORT 5000 5000}
399
400 # Revert the change
401 R 0 CLUSTER ADDSLOTS 5000
402 }
403
404 test "Test IMPORT not allowed if slot ranges belong to different nodes" {
405 assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 15000}
406 assert_error {*slots belong to different source nodes*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 14000 15000}
407 }
408
409 test "Test IMPORT not allowed if slot is given multiple times" {
410 assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 8000 9000}
411 assert_error {*Slot*specified multiple times*} {R 0 CLUSTER MIGRATION IMPORT 7000 8000 7900 9000}
412 }
413
414 test "Test CLUSTER MIGRATION STATUS ALL lists all tasks" {
415 # Create 3 completed tasks
416 R 0 CLUSTER MIGRATION IMPORT 7000 7001
417 wait_for_asm_done
418 R 0 CLUSTER MIGRATION IMPORT 7002 7003
419 wait_for_asm_done
420 R 0 CLUSTER MIGRATION IMPORT 7004 7005
421 wait_for_asm_done
422
423 # Get node IDs for verification
424 set node0_id [R 0 cluster myid]
425 set node1_id [R 1 cluster myid]
426
427 # Verify CLUSTER MIGRATION STATUS ALL reply from both nodes
428 foreach node_idx {0 1} {
429 set tasks [R $node_idx CLUSTER MIGRATION STATUS ALL]
430 assert_equal 3 [llength $tasks]
431
432 for {set i 0} {$i < 3} {incr i} {
433 set task [lindex $tasks $i]
434
435 # Verify field order
436 set expected_fields {id slots source dest operation state
437 last_error retries create_time start_time
438 end_time write_pause_ms}
439 for {set j 0} {$j < [llength $expected_fields]} {incr j} {
440 set expected_field [lindex $expected_fields $j]
441 set actual_field [lindex $task [expr $j * 2]]
442 assert_equal $expected_field $actual_field
443 }
444
445 # Verify basic fields
446 assert_equal "completed" [dict get $task state]
447 assert_equal "" [dict get $task last_error]
448 assert_equal 0 [dict get $task retries]
449 assert {[dict get $task write_pause_ms] >= 0}
450
451 # Verify operation based on node
452 if {$node_idx == 0} {
453 assert_equal "import" [dict get $task operation]
454 } else {
455 assert_equal "migrate" [dict get $task operation]
456 }
457
458 # Verify node IDs (all tasks: node1 -> node0)
459 assert_equal $node1_id [dict get $task source]
460 assert_equal $node0_id [dict get $task dest]
461
462 # Verify timestamps exist and are reasonable
463 set create_time [dict get $task create_time]
464 set start_time [dict get $task start_time]
465 set end_time [dict get $task end_time]
466 assert {$create_time > 0}
467 assert {$start_time >= $create_time}
468 assert {$end_time >= $start_time}
469
470 # Verify specific slot ranges for each task
471 set slots [dict get $task slots]
472 if {$i == 0} {
473 assert_equal "7004-7005" $slots
474 } elseif {$i == 1} {
475 assert_equal "7002-7003" $slots
476 } elseif {$i == 2} {
477 assert_equal "7000-7001" $slots
478 }
479 }
480 }
481
482 # cleanup
483 R 1 CLUSTER MIGRATION IMPORT 7000 7005
484 wait_for_asm_done
485 }
486
487 test "Test IMPORT not allowed if there is an overlapping import" {
488 # Let slot migration take long time, so that we can test overlapping import
489 R 1 config set rdb-key-save-delay 1000000
490 R 1 set tag22273 tag22273 ;# slot hash is 7000
491 R 1 set tag9283 tag9283 ;# slot hash is 8000
492
493 set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 8000]
494 assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 8000 9000}
495 assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 7500 8500}
496 assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6000 7000}
497 assert_error {*overlapping import exists*} {R 0 CLUSTER MIGRATION IMPORT 6500 7500}
498
499 wait_for_condition 1000 50 {
500 [string match {*completed*} [migration_status 0 $task_id state]] &&
501 [string match {*completed*} [migration_status 1 $task_id state]]
502 } else {
503 fail "ASM task did not start"
504 }
505 assert_equal "tag22273" [R 0 get tag22273]
506 assert_equal "tag9283" [R 0 get tag9283]
507 R 1 config set rdb-key-save-delay 0
508
509 # revert the migration
510 R 1 CLUSTER MIGRATION IMPORT 7000 8000
511 wait_for_asm_done
512 }
513
514 test "Test IMPORT with unsorted and adjacent ranges" {
515 # Redis should sort and merge adjacent ranges
516 # Adjacent means: prev.end + 1 == next.start
517 # e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005
518
519 # Test with adjacent ranges
520 set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100]
521 wait_for_asm_done
522 # verify migration is successfully completed on both nodes
523 assert_equal "completed" [migration_status 0 $task_id state]
524 assert_equal "completed" [migration_status 1 $task_id state]
525 # verify slot ranges are merged correctly
526 assert_equal "7000-7100" [migration_status 0 $task_id slots]
527 assert_equal "7000-7100" [migration_status 1 $task_id slots]
528
529 # Test with unsorted and adjacent ranges
530 set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005]
531 wait_for_asm_done
532 # verify migration is successfully completed on both nodes
533 assert_equal "completed" [migration_status 0 $task_id state]
534 assert_equal "completed" [migration_status 1 $task_id state]
535 # verify slot ranges are merged correctly
536 assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots]
537 assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots]
538
539 # Another test with unsorted and adjacent ranges
540 set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006]
541 wait_for_asm_done
542 # verify migration is successfully completed on both nodes
543 assert_equal "completed" [migration_status 0 $task_id state]
544 assert_equal "completed" [migration_status 1 $task_id state]
545 # verify slot ranges are merged correctly
546 assert_equal "7006-7009" [migration_status 0 $task_id slots]
547 assert_equal "7006-7009" [migration_status 1 $task_id slots]
548 }
549
550 test "Simple slot migration with write load" {
551 # Perform slot migration while traffic is on and verify data consistency.
552 # Trimming is disabled on source nodes so, we can compare the dbs after
553 # migration via DEBUG DIGEST to ensure no data loss during migration.
554 # Steps:
555 # 1. Disable trimming on both nodes
556 # 2. Populate slot 0 on node-0 and slot 6000 on node-1
557 # 2. Start write traffic on both nodes
558 # 3. Migrate slot 0 from node-0 to node-1
559 # 4. Migrate slot 6000 from node-1 to node-0
560 # 5. Stop write traffic, verify db's are identical.
561
562 # This test runs slowly under the thread sanitizer.
563 # 1. Increase the lag threshold from the default 1 MB to 10 MB to let the destination catch up easily.
564 # 2. Increase the write pause timeout from the default 10s to 60s so the source can wait longer.
565 set prev_config_lag [lindex [R 0 config get cluster-slot-migration-handoff-max-lag-bytes] 1]
566 R 0 config set cluster-slot-migration-handoff-max-lag-bytes 10mb
567 R 1 config set cluster-slot-migration-handoff-max-lag-bytes 10mb
568 set prev_config_timeout [lindex [R 0 config get cluster-slot-migration-write-pause-timeout] 1]
569 R 0 config set cluster-slot-migration-write-pause-timeout 60000
570 R 1 config set cluster-slot-migration-write-pause-timeout 60000
571
572 R 0 flushall
573 R 0 debug asm-trim-method none
574 populate_slot 10000 -idx 0 -slot 0
575
576 R 1 flushall
577 R 1 debug asm-trim-method none
578 populate_slot 10000 -idx 1 -slot 6000
579
580 # Start write traffic on node-0
581 # Throws -MOVED error once asm is completed, catch block will ignore it.
582 catch {
583 # Start the slot 0 write load on the R 0
584 set port [get_port 0]
585 set key [slot_key 0 mykey]
586 set load_handle0 [start_write_load "127.0.0.1" $port 100 $key 0 5]
587 }
588
589 # Start write traffic on node-1
590 # Throws -MOVED error once asm is completed, catch block will ignore it.
591 catch {
592 # Start the slot 6000 write load on the R 1
593 set port [get_port 1]
594 set key [slot_key 6000 mykey]
595 set load_handle1 [start_write_load "127.0.0.1" $port 100 $key 0 5]
596 }
597
598 # Migrate keys
599 R 1 CLUSTER MIGRATION IMPORT 0 100
600 wait_for_asm_done
601 R 0 CLUSTER MIGRATION IMPORT 6000 6100
602 wait_for_asm_done
603
604 stop_write_load $load_handle0
605 stop_write_load $load_handle1
606
607 # verify data
608 assert_morethan [R 0 dbsize] 0
609 assert_equal [R 0 debug digest] [R 1 debug digest]
610
611 # cleanup
612 R 0 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag
613 R 0 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout
614 R 0 debug asm-trim-method default
615 R 0 flushall
616 R 1 config set cluster-slot-migration-handoff-max-lag-bytes $prev_config_lag
617 R 1 config set cluster-slot-migration-write-pause-timeout $prev_config_timeout
618 R 1 debug asm-trim-method default
619 R 1 flushall
620
621 R 1 CLUSTER MIGRATION IMPORT 6000 6100
622 wait_for_asm_done
623 }
624
625 test "Verify expire time is migrated correctly" {
626 R 0 flushall
627 R 1 flushall
628
629 set string_key [slot_key 0 string_key]
630 set list_key [slot_key 0 list_key]
631 set hash_key [slot_key 0 hash_key]
632 set stream_key [slot_key 0 stream_key]
633
634 for {set i 0} {$i < 20} {incr i} {
635 R 1 hset $hash_key $i $i
636 R 1 xadd $stream_key * item $i
637 }
638 for {set i 0} {$i < 2000} {incr i} {
639 R 1 lpush $list_key $i
640 }
641
642 # set expire time of some keys
643 R 1 set $string_key "a" EX 1000
644 R 1 EXPIRE $list_key 1000
645 R 1 EXPIRE $hash_key 1000
646
647 # migrate slot 0-100 to R 0
648 R 0 CLUSTER MIGRATION IMPORT 0 100
649 wait_for_asm_done
650
651 # check expire times are migrated correctly
652 assert_range [R 0 ttl $string_key] 900 1000
653 assert_range [R 0 ttl $list_key] 900 1000
654 assert_range [R 0 ttl $hash_key] 900 1000
655 assert_equal -1 [R 0 ttl $stream_key]
656
657 # cleanup
658 R 0 flushall
659 R 1 flushall
660 R 1 CLUSTER MIGRATION IMPORT 0 100
661 wait_for_asm_done
662 }
663
664 test "Slot migration with complex data types can work well" {
665 R 0 flushall
666 R 1 flushall
667
668 set list_key [slot_key 0 list_key]
669 set set_key [slot_key 0 set_key]
670 set zset_key [slot_key 0 zset_key]
671 set hash_key [slot_key 0 hash_key]
672 set stream_key [slot_key 0 stream_key]
673
674 # generate big keys for each data type
675 for {set i 0} {$i < 1000} {incr i} {
676 R 1 lpush $list_key $i
677 R 1 sadd $set_key $i
678 R 1 zadd $zset_key $i $i
679 R 1 hset $hash_key $i $i
680 R 1 xadd $stream_key * item $i
681 }
682
683 # migrate slot 0-100 to R 0
684 R 0 CLUSTER MIGRATION IMPORT 0 100
685 wait_for_asm_done
686 # check the data on destination node is correct
687 assert_equal 1000 [R 0 llen $list_key]
688 assert_equal 1000 [R 0 scard $set_key]
689 assert_equal 1000 [R 0 zcard $zset_key]
690 assert_equal 1000 [R 0 hlen $hash_key]
691 assert_equal 1000 [R 0 xlen $stream_key]
692 # migrate slot 0-100 to R 1
693 R 1 CLUSTER MIGRATION IMPORT 0 100
694 wait_for_asm_done
695 }
696
697 proc asm_basic_error_handling_test {operation channel all_states} {
698 foreach state $all_states {
699 if {$::verbose} { puts "Testing $operation $channel channel with state: $state"}
700
701 # For states that need incremental data streaming, set a longer delay
702 set streaming_states [list "streaming-buffer" "accumulate-buffer" "send-bulk-and-stream" "send-stream"]
703 if {$state in $streaming_states} {
704 R 1 config set rdb-key-save-delay 1000000
705 }
706
707 # Let the destination node take time to stream buffer, so the source node will handle
708 # slot snapshot child process exit, and then enter "send-stream" state.
709 if {$state == "send-stream"} {
710 R 0 config set key-load-delay 100000
711 }
712
713 # Start the slot 0 write load on the R 1
714 set slot0_key [slot_key 0 mykey]
715 set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key 500]
716
717 # clear old fail points and set the new fail point
718 assert_equal {OK} [R 0 debug asm-failpoint "" ""]
719 assert_equal {OK} [R 1 debug asm-failpoint "" ""]
720 if {$operation eq "import"} {
721 assert_equal {OK} [R 0 debug asm-failpoint "import-$channel-channel" $state]
722 } elseif {$operation eq "migrate"} {
723 assert_equal {OK} [R 1 debug asm-failpoint "migrate-$channel-channel" $state]
724 } else {
725 fail "Unknown operation: $operation"
726 }
727
728 # Start the migration
729 set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100]
730
731 # The task should be failed due to the fail point
732 wait_for_condition 2000 10 {
733 [string match -nocase "*$channel*${state}*" [migration_status 0 $task_id last_error]] ||
734 [string match -nocase "*$channel*${state}*" [migration_status 1 $task_id last_error]]
735 } else {
736 fail "ASM task did not fail with expected error -
737 (dst: [migration_status 0 $task_id last_error]
738 src: [migration_status 1 $task_id last_error]
739 expected: $channel $state)"
740 }
741 stop_write_load $load_handle
742
743 # Cancel the task
744 R 0 CLUSTER MIGRATION CANCEL ID $task_id
745 R 1 CLUSTER MIGRATION CANCEL ID $task_id
746
747 R 1 config set rdb-key-save-delay 0
748 R 0 config set key-load-delay 0
749 }
750 }
751
752 test "Destination node main channel basic error-handling tests " {
753 set all_states [list \
754 "connecting" \
755 "auth-reply" \
756 "handshake-reply" \
757 "syncslots-reply" \
758 "accumulate-buffer" \
759 "streaming-buffer" \
760 "wait-stream-eof" \
761 ]
762 asm_basic_error_handling_test "import" "main" $all_states
763 }
764
765 test "Destination node rdb channel basic error-handling tests" {
766 set all_states [list \
767 "connecting" \
768 "auth-reply" \
769 "rdbchannel-reply" \
770 "rdbchannel-transfer" \
771 ]
772 asm_basic_error_handling_test "import" "rdb" $all_states
773 }
774
775 test "Source node main channel basic error-handling tests " {
776 set all_states [list \
777 "wait-rdbchannel" \
778 "send-bulk-and-stream" \
779 "send-stream" \
780 "handoff" \
781 ]
782 asm_basic_error_handling_test "migrate" "main" $all_states
783 }
784
785 test "Source node rdb channel basic error-handling tests" {
786 set all_states [list \
787 "wait-bgsave-start" \
788 "send-bulk-and-stream" \
789 ]
790 asm_basic_error_handling_test "migrate" "rdb" $all_states
791 }
792
793 test "Migration will be successful after fail points are cleared" {
794 R 0 flushall
795 R 1 flushall
796 set slot0_key [slot_key 0 mykey]
797 set slot1_key [slot_key 1 mykey]
798 R 1 set $slot0_key "a"
799 R 1 set $slot1_key "b"
800
801 # we set a delay to write incremental data
802 R 1 config set rdb-key-save-delay 1000000
803
804 # Start the slot 0 write load on the R 1
805 set load_handle [start_write_load "127.0.0.1" [get_port 1] 100 $slot0_key]
806
807 # Clear all fail points
808 assert_equal {OK} [R 0 debug asm-failpoint "" ""]
809 assert_equal {OK} [R 1 debug asm-failpoint "" ""]
810
811 # Start the migration
812 set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100]
813
814 # Wait for the migration to complete
815 wait_for_asm_done
816
817 stop_write_load $load_handle
818
819 # Verify the data is migrated, slot 0 and 1 should belong to R 1
820 # slot 0 key should be changed by the write load
821 assert_not_equal "a" [R 0 get $slot0_key]
822 assert_equal "b" [R 0 get $slot1_key]
823 R 1 config set rdb-key-save-delay 0
824 }
825
826 test "Client output buffer limit is reached on source side" {
827 R 0 flushall
828 R 1 flushall
829 set r1_pid [S 1 process_id]
830 R 1 debug repl-pause on-streaming-repl-buf
831
832 # Set a small output buffer limit to trigger the error
833 R 0 config set client-output-buffer-limit "replica 4mb 0 0"
834
835 set task_id [setup_slot_migration_with_delay 0 1 0 100]
836
837 # some write traffic is to have chance to enter streaming buffer state
838 set slot0_key [slot_key 0 mykey]
839 R 0 set $slot0_key "a"
840
841 # after 3 second, the slots snapshot (costs 2s to generate) should be transferred,
842 # then start streaming buffer
843 after 3000
844
845 set loglines [count_log_lines 0]
846
847 # Start the slot 0 write load on the R 0
848 set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 1000]
849
850 # verify the metric is accessible, it is transient, will be reset on disconnect
851 assert {[S 0 mem_cluster_slot_migration_output_buffer] >= 0}
852
853 # After some time, the client output buffer limit should be reached
854 wait_for_log_messages 0 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 1000 10
855 wait_for_condition 1000 10 {
856 [string match {*send*stream*} [migration_status 0 $task_id last_error]]
857 } else {
858 fail "ASM task did not fail as expected"
859 }
860
861 stop_write_load $load_handle
862
863 # Reset configurations
864 R 0 config set client-output-buffer-limit "replica 0 0 0"
865 R 0 config set rdb-key-save-delay 0
866
867 # resume server and clear pause point
868 resume_process $r1_pid
869 R 1 debug repl-pause clear
870
871 # Wait for the migration to complete
872 wait_for_asm_done
873 }
874
875 test "Full sync buffer limit is reached on destination side" {
876 # Set a small replication buffer limit to trigger the error
877 R 0 config set replica-full-sync-buffer-limit 1mb
878
879 # start migration from 1 to 0, cost 4s to transfer slots snapshot
880 set task_id [setup_slot_migration_with_delay 1 0 0 100 2 2000000]
881 set loglines [count_log_lines 0]
882
883 # Create some traffic on slot 0
884 populate_slot 100 -idx 1 -slot 0 -size 100000
885
886 # After some time, slots sync buffer limit should be reached, but migration would not fail
887 # since the buffer will be accumulated on source side from now.
888 wait_for_log_messages 0 {"*Slots sync buffer limit has been reached*"} $loglines 1000 10
889
890 # verify the peak value, should be greater than 1mb
891 assert {[S 0 mem_cluster_slot_migration_input_buffer_peak] > 1000000}
892 # verify the metric is accessible, it is transient, will be reset on disconnect
893 assert {[S 0 mem_cluster_slot_migration_input_buffer] >= 0}
894
895 wait_for_asm_done
896
897 # Reset configurations
898 R 0 config set replica-full-sync-buffer-limit 0
899 R 1 config set rdb-key-save-delay 0
900 R 1 cluster migration import 0 100
901 wait_for_asm_done
902 }
903
904 test "Expired key is not deleted and SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT filter keys in importing slots" {
905 set slot0_key [slot_key 0 mykey]
906 set slot1_key [slot_key 1 mykey]
907 set slot2_key [slot_key 2 mykey]
908 R 1 flushall
909 R 0 flushall
910
911 # we set a delay to write incremental data
912 R 1 config set rdb-key-save-delay 1000000
913
914 # set expire time 2s. Generating slot snapshot will 3s, so these
915 # three keys will be expired after slot snapshot is transferred
916 R 1 setex $slot0_key 2 "a"
917 R 1 setex $slot1_key 2 "b"
918 R 1 hset $slot2_key "f1" "1"
919 R 1 expire $slot2_key 2
920 R 1 hexpire $slot2_key 2 FIELDS 1 "f1"
921
922 set task_id [R 0 CLUSTER MIGRATION IMPORT 0 100]
923 wait_for_condition 2000 10 {
924 [string match {*send-bulk-and-stream*} [migration_status 1 $task_id state]]
925 } else {
926 fail "ASM task did not start"
927 }
928
929 # update expire time during mirgration
930 R 1 setex $slot0_key 100 "a"
931 R 1 expire $slot1_key 80
932 R 1 expire $slot2_key 60
933 R 1 hincrbyfloat $slot2_key "f1" 1
934 R 1 hexpire $slot2_key 60 FIELDS 1 "f1"
935
936 # after 2s, at least a key should be transferred, and should not be deleted
937 # due to expired, neither active nor lazy expiration (SCAN) takes effect,
938 # Besides SCAN/KEYS/RANDOMKEY/CLUSTER GETKEYSINSLOT command can not find them
939 after 2000
940 R 3 readonly
941 foreach id {0 3} { ;# 0 is the master, 3 is the replica
942 assert_equal {0 {}} [R $id scan 0 count 10]
943 assert_equal {} [R $id keys "*"]
944 assert_equal {} [R $id keys "{06S}*"]
945 assert_equal {} [R $id randomkey]
946 assert_equal {} [R $id cluster getkeysinslot 0 100]
947 assert_equal [R $id cluster countkeysinslot 0] 0
948 assert_equal [R $id dbsize] 0
949
950 # but we can see the number of keys is increased in INFO KEYSPACE
951 assert {[scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d] >= 1}
952 assert {[scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d] >= 1}
953 }
954
955 wait_for_asm_done
956
957 wait_for_ofs_sync [Rn 0] [Rn 3]
958
959 foreach id {0 3} { ;# 0 is the master, 3 is the replica
960 # verify the keys are valid
961 assert_range [R $id ttl $slot0_key] 90 100
962 assert_range [R $id ttl $slot1_key] 70 80
963 assert_range [R $id ttl $slot2_key] 50 60
964 assert_range [R $id httl $slot2_key FIELDS 1 "f1"] 50 60
965
966 # KEYS/SCAN/RANDOMKEY/CLUSTER GETKEYSINSLOT will find the keys after migration
967 assert_equal [list 0 [list $slot0_key $slot1_key $slot2_key]] [R $id scan 0 count 10]
968 assert_equal [list $slot0_key $slot1_key $slot2_key] [R $id keys "*"]
969 assert_equal [list $slot0_key] [R $id keys "{06S}*"]
970 assert_not_equal {} [R $id randomkey]
971 assert_equal [list $slot0_key] [R $id cluster getkeysinslot 0 100]
972
973 # INFO KEYSPACE/DBSIZE/CLUSTER COUNTKEYSINSLOT will also reflect the keys
974 assert_equal 3 [scan [regexp -inline {keys\=([\d]*)} [R $id info keyspace]] keys=%d]
975 assert_equal 3 [scan [regexp -inline {expires\=([\d]*)} [R $id info keyspace]] expires=%d]
976 assert_equal 1 [scan [regexp -inline {subexpiry\=([\d]*)} [R $id info keyspace]] subexpiry=%d]
977 assert_equal 3 [R $id dbsize]
978 assert_equal 1 [R $id cluster countkeysinslot 0]
979 }
980
981 # update expire time to 10ms, after some time, the keys should be deleted due to
982 # active expiration
983 R 0 pexpire $slot0_key 10
984 R 0 pexpire $slot1_key 10
985 R 0 hpexpire $slot2_key 10 FIELDS 1 "f1" ;# the last field is expired, the key will be deleted
986 wait_for_condition 100 50 {
987 [scan [regexp -inline {keys\=([\d]*)} [R 0 info keyspace]] keys=%d] == {} &&
988 [scan [regexp -inline {keys\=([\d]*)} [R 3 info keyspace]] keys=%d] == {}
989 } else {
990 fail "keys did not expire"
991 }
992
993 R 1 config set rdb-key-save-delay 0
994 }
995
996 test "Eviction does not evict keys in importing slots" {
997 set slot0_key [slot_key 0 mykey]
998 set slot1_key [slot_key 1 mykey]
999 set slot2_key [slot_key 2 mykey]
1000 set slot5462_key [slot_key 5462 mykey]
1001 set slot5463_key [slot_key 5463 mykey]
1002 R 1 flushall
1003 R 0 flushall
1004
1005 # we set a delay to write incremental data
1006 R 0 config set rdb-key-save-delay 1000000
1007
1008 set 1k_str [string repeat "a" 1024]
1009 set 1m_str [string repeat "a" 1048576]
1010
1011 # set two keys to be evicted
1012 R 1 set $slot5462_key $1k_str
1013 R 1 set $slot5463_key $1k_str
1014
1015 # set maxmemory to 200kb more than current used memory,
1016 # redis should evict some keys if importing some big keys
1017 set r1_mem_used [S 1 used_memory]
1018 set r1_max_mem [expr {$r1_mem_used + 200*1024}]
1019 R 1 config set maxmemory $r1_max_mem
1020 R 1 config set maxmemory-policy allkeys-lru
1021
1022 # set 3 keys to be migrated
1023 R 0 set $slot0_key $1m_str
1024 R 0 set $slot1_key $1m_str
1025 R 0 set $slot2_key $1m_str
1026
1027 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
1028 wait_for_condition 2000 10 {
1029 [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]]
1030 } else {
1031 fail "ASM task did not start"
1032 }
1033
1034 # after 2.2s, at least two keys should be transferred, they should not be evicted
1035 # but other keys (slot5462_key and slot5463_key) should be evicted
1036 after 2200
1037 for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction
1038 assert_equal 0 [R 1 exists $slot5462_key]
1039 assert_equal 0 [R 1 exists $slot5463_key]
1040 assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 2}
1041
1042 # current used memory should be more than the maxmemory, since the big keys that
1043 # belong importing slots can not be evicted.
1044 set r1_mem_used [S 1 used_memory]
1045 assert {$r1_mem_used > $r1_max_mem + 1024*1024}
1046
1047 wait_for_asm_done
1048
1049 # after migration, these big keys should be evicted
1050 for {set j 0} {$j < 100} {incr j} { R 1 ping } ;# trigger eviction
1051 assert_equal {} [scan [regexp -inline {expires\=([\d]*)} [R 1 info keyspace]] expires=%d]
1052 }
1053
1054 test "Failover will cancel slot migration tasks" {
1055 # migrate slot 0-100 from 1 to 0
1056 set task_id [setup_slot_migration_with_delay 1 0 0 100]
1057
1058 # FAILOVER happens on the destination node, instance #3 become master, #0 become slave
1059 failover_and_wait_for_done 3
1060
1061 # the old master will cancel the importing task, and the migrating task on
1062 # the source node will be failed
1063 wait_for_condition 1000 50 {
1064 [string match {*canceled*} [migration_status 0 $task_id state]] &&
1065 [string match {*failover*} [migration_status 0 $task_id last_error]] &&
1066 [string match {*failed*} [migration_status 1 $task_id state]]
1067 } else {
1068 fail "ASM task did not cancel"
1069 }
1070
1071 # We can restart ASM tasks on new master, migrate slot 0-100 from 1 to 3
1072 R 1 config set rdb-key-save-delay 0
1073 set task_id [R 3 CLUSTER MIGRATION IMPORT 0 100]
1074 wait_for_asm_done
1075
1076 # migrate slot 0-100 from 3 to 1
1077 set task_id [setup_slot_migration_with_delay 3 1 0 100]
1078
1079 # FAILOVER happens on the source node, instance #3 become slave, #0 become master
1080 failover_and_wait_for_done 0
1081
1082 # the old master will cancel the migrating task, but the destination node will
1083 # retry the importing task, and then succeed.
1084 wait_for_condition 1000 50 {
1085 [string match {*canceled*} [migration_status 3 $task_id state]]
1086 } else {
1087 fail "ASM task did not cancel"
1088 }
1089 wait_for_asm_done
1090 }
1091
1092 test "Flush-like command can cancel slot migration task" {
1093 # flushall, flushdb
1094 foreach flushcmd {flushall flushdb} {
1095 # start slot migration from 1 to 0
1096 set task_id [setup_slot_migration_with_delay 1 0 0 100]
1097
1098 if {$::verbose} { puts "Testing flush command: $flushcmd"}
1099 R 0 $flushcmd
1100
1101 # flush-like will cancel the task
1102 wait_for_condition 1000 50 {
1103 [string match {*canceled*} [migration_status 0 $task_id state]]
1104 } else {
1105 fail "ASM task did not cancel"
1106 }
1107 }
1108
1109 R 1 config set rdb-key-save-delay 0
1110 R 0 cluster migration import 0 100
1111 wait_for_asm_done
1112 }
1113
1114 test "CLUSTER SETSLOT command when there is a slot migration task" {
1115 # Setup slot migration test from node 0 to node 1
1116 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1117
1118 # Cluster SETSLOT command is not allowed when there is a slot migration task
1119 # on the slot. #0 and #1 are having migration task now.
1120 foreach instance {0 1} {
1121 set node_id [R $instance cluster myid]
1122
1123 catch {R $instance cluster setslot 0 migrating $node_id} err
1124 assert_match {*in an active atomic slot migration*} $err
1125
1126 catch {R $instance cluster setslot 0 importing $node_id} err
1127 assert_match {*in an active atomic slot migration*} $err
1128
1129 catch {R $instance cluster setslot 0 stable} err
1130 assert_match {*in an active atomic slot migration*} $err
1131
1132 catch {R $instance cluster setslot 0 node $node_id} err
1133 assert_match {*in an active atomic slot migration*} $err
1134 }
1135
1136 # CLUSTER SETSLOT on other node will cancel the migration task, we update
1137 # the owner of slot 0 (that is migrating from #0 to #1) to #2 on #2, we
1138 # bump the config epoch to make sure the change can update #0 and #1
1139 # slot configuration, so #0 and #1 will cancel the migration task.
1140 # BTW, if config epoch is not bumped, the slot config of #2 may be
1141 # updated by #0 and #1.
1142 R 2 cluster bumpepoch
1143 R 2 cluster setslot 0 node [R 2 cluster myid]
1144 wait_for_condition 1000 50 {
1145 [string match {*canceled*} [migration_status 0 $task_id state]] &&
1146 [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] &&
1147 [string match {*canceled*} [migration_status 1 $task_id state]]
1148 } else {
1149 fail "ASM task did not cancel"
1150 }
1151
1152 # set slot 0 back to #0
1153 R 0 cluster bumpepoch
1154 R 0 cluster setslot 0 node [R 0 cluster myid]
1155 wait_for_cluster_propagation
1156 wait_for_cluster_state "ok"
1157 }
1158
1159 test "CLUSTER DELSLOTSRANGE command cancels a slot migration task" {
1160 # start slot migration from 0 to 1
1161 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1162
1163 R 0 cluster delslotsrange 0 100
1164 wait_for_condition 1000 50 {
1165 [string match {*canceled*} [migration_status 0 $task_id state]] &&
1166 [string match {*slots configuration updated*} [migration_status 0 $task_id last_error]] &&
1167 [string match {*failed*} [migration_status 1 $task_id state]]
1168 } else {
1169 fail "ASM task did not cancel"
1170 }
1171 R 1 cluster migration cancel id $task_id
1172
1173 # add the slots back
1174 R 0 cluster addslotsrange 0 100
1175 wait_for_cluster_propagation
1176 wait_for_cluster_state "ok"
1177 }
1178
1179 # NOTE: this test needs more than 60s, maybe you can skip when testing
1180 test "CLUSTER FORGET command cancels a slot migration task" {
1181 R 0 config set rdb-key-save-delay 0
1182 # Migrate all slot on #0 to #1, so we can forget #0
1183 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 5461]
1184 wait_for_asm_done
1185
1186 # start slot migration from 1 to 0
1187 set task_id [setup_slot_migration_with_delay 1 0 0 5461]
1188
1189 # Forget #0 on #1, the migration task on #1 will be canceled due to node deleted,
1190 # and the importing task on #0 will be failed
1191 R 1 cluster forget [R 0 cluster myid]
1192 wait_for_condition 1000 50 {
1193 [string match {*canceled*} [migration_status 1 $task_id state]] &&
1194 [string match {*node deleted*} [migration_status 1 $task_id last_error]] &&
1195 [string match {*failed*} [migration_status 0 $task_id state]]
1196 } else {
1197 fail "ASM task did not cancel"
1198 }
1199
1200 # Add #0 back into cluster
1201 # NOTE: this will cost 60s to let #0 join the cluster since
1202 # other nodes add #0 into black list for 60s after FORGET.
1203 R 1 config set rdb-key-save-delay 0
1204 R 1 cluster meet "127.0.0.1" [lindex [R 0 config get port] 1]
1205
1206 # the importing task on #0 will be retried, and eventually succeed
1207 # since now #0 is back in the cluster
1208 wait_for_condition 3000 50 {
1209 [string match {*completed*} [migration_status 0 $task_id state]] &&
1210 [string match {*completed*} [migration_status 1 $task_id state]]
1211 } else {
1212 fail "ASM task did not finish"
1213 }
1214
1215 # make sure #0 is completely back to the cluster
1216 wait_for_cluster_propagation
1217 wait_for_cluster_state "ok"
1218 }
1219
1220 test "CLIENT PAUSE can cancel slot migration task" {
1221 # start slot migration from 0 to 1
1222 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1223
1224 # CLIENT PAUSE happens on the destination node, #1 will cancel the importing task
1225 R 1 client pause 100000 write ;# pause 100s
1226 wait_for_condition 1000 50 {
1227 [string match {*canceled*} [migration_status 1 $task_id state]] &&
1228 [string match {*client pause*} [migration_status 1 $task_id last_error]]
1229 } else {
1230 fail "ASM task did not cancel"
1231 }
1232
1233 # start task again
1234 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
1235 after 200 ;# give some time to have chance to schedule the task
1236 # the task should not start since server is paused
1237 assert {[string match {*none*} [migration_status 1 $task_id state]]}
1238
1239 # unpause the server, the task should start
1240 R 1 client unpause
1241 wait_for_asm_done
1242
1243 # migrate back to original node #0
1244 R 0 config set rdb-key-save-delay 0
1245 R 1 config set rdb-key-save-delay 0
1246 R 0 CLUSTER MIGRATION IMPORT 0 100
1247 wait_for_asm_done
1248 }
1249
1250 test "Server shutdown can cancel slot migration task, exit with success" {
1251 # start slot migration from 0 to 1
1252 setup_slot_migration_with_delay 0 1 0 100
1253
1254 set loglines [count_log_lines -1]
1255
1256 # Shutdown the server, it should cancel the migration task
1257 restart_server -1 true false true nosave
1258
1259 wait_for_log_messages -1 {"*Cancelled due to server shutdown*"} $loglines 100 100
1260
1261 wait_for_cluster_propagation
1262 wait_for_cluster_state "ok"
1263 }
1264
1265 test "Cancel import task when streaming buffer into db" {
1266 # set a delay to have time to cancel import task that is streaming buf to db
1267 R 1 config set key-load-delay 50000
1268 # start slot migration from 0 to 1
1269 set task_id [setup_slot_migration_with_delay 0 1 0 100 5]
1270
1271 # start the slot 0 write load on the node 0
1272 set slot0_key [slot_key 0 mykey]
1273 set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key 500]
1274
1275 # wait for entering streaming buffer state
1276 wait_for_condition 1000 10 {
1277 [string match {*streaming-buffer*} [migration_status 1 $task_id state]]
1278 } else {
1279 fail "ASM task did not enter streaming buffer state"
1280 }
1281 stop_write_load $load_handle
1282
1283 # cancel the import task on #1, the destination node works fine
1284 R 1 cluster migration cancel id $task_id
1285 assert_match {*canceled*} [migration_status 1 $task_id state]
1286
1287 # reset config
1288 R 0 config set key-load-delay 0
1289 R 1 config set key-load-delay 0
1290 }
1291
1292 test "Destination node main channel timeout when waiting stream EOF" {
1293 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1294 R 1 config set repl-timeout 5
1295
1296 # pause the source node to make EOF wait timeout. Do not pause
1297 # the child process, so it can deliver slot snapshot to destination
1298 set r0_process_id [S 0 process_id]
1299 pause_process $r0_process_id
1300
1301 # the destination node will fail after 7s, 5s for EOF wait and 2s for slot snapshot
1302 wait_for_condition 1000 20 {
1303 [string match {*failed*} [migration_status 1 $task_id state]] &&
1304 [string match {*Main channel*Connection timeout*wait-stream-eof*} \
1305 [migration_status 1 $task_id last_error]]
1306 } else {
1307 fail "ASM task did not fail"
1308 }
1309
1310 # resume the source node
1311 resume_process $r0_process_id
1312
1313 # After the source node is resumed, the task on source node may receive
1314 # ACKs from destination and consider the task is stream-done. In this case,
1315 # the task on source node will be failed after several seconds
1316 if {[string match {*stream-done*} [migration_status 0 $task_id state]]} {
1317 wait_for_condition 1000 20 {
1318 [string match {*failed*} [migration_status 0 $task_id state]] &&
1319 [string match {*Server paused*} [migration_status 0 $task_id last_error]]
1320 } else {
1321 fail "ASM task did not fail"
1322 }
1323 }
1324
1325 R 1 config set repl-timeout 60
1326 R 0 cluster migration cancel id $task_id
1327 R 1 cluster migration cancel id $task_id
1328 }
1329
1330 test "Destination node rdb channel timeout when transferring slots snapshot" {
1331 # cost 10s to transfer each key
1332 set task_id [setup_slot_migration_with_delay 0 1 0 100 2 10000000]
1333 R 1 config set repl-timeout 3
1334
1335 # the destination node will fail after 3s
1336 wait_for_condition 1000 20 {
1337 [string match {*failed*} [migration_status 1 $task_id state]] &&
1338 [string match {*RDB channel*Connection timeout*rdbchannel-transfer*} \
1339 [migration_status 1 $task_id last_error]]
1340 } else {
1341 fail "ASM task did not fail"
1342 }
1343
1344 R 1 config set repl-timeout 60
1345 R 0 cluster migration cancel id $task_id
1346 R 1 cluster migration cancel id $task_id
1347 }
1348
1349 test "Source node rdb channel timeout when transferring slots snapshot" {
1350 set r1_pid [S 1 process_id]
1351 R 0 flushall
1352 R 0 config set save ""
1353 # generate several large keys, make sure the memory usage is more than
1354 # socket buffer size, so the rdb channel will block and timeout if
1355 # no data is received by destination.
1356 set val [string repeat "a" 102400] ;# 100kb
1357 for {set i 0} {$i < 1000} {incr i} {
1358 set key [slot_key 0 "key$i"]
1359 R 0 set $key $val
1360 }
1361 R 0 config set repl-timeout 3 ;# 3s for rdb channel timeout
1362 R 0 config set rdb-key-save-delay 10000 ;# 1000 keys cost 10s to save
1363
1364 # start migration from #0 to #1
1365 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
1366 wait_for_condition 1000 20 {
1367 [string match {*send-bulk-and-stream*} [migration_status 0 $task_id state]]
1368 } else {
1369 fail "ASM task did not start"
1370 }
1371
1372 # pause the destination node to make rdb channel timeout
1373 pause_process $r1_pid
1374
1375 # the source node will fail, the rdb child process can not
1376 # write data to destination, so it will timeout
1377 wait_for_condition 1000 30 {
1378 [string match {*failed*} [migration_status 0 $task_id state]] &&
1379 [string match {*RDB channel*Failed to send slots snapshot*} \
1380 [migration_status 0 $task_id last_error]]
1381 } else {
1382 fail "ASM task did not fail"
1383 }
1384 resume_process $r1_pid
1385
1386 R 0 config set repl-timeout 60
1387 R 0 cluster migration cancel id $task_id
1388 R 1 cluster migration cancel id $task_id
1389 }
1390
1391 test "Source node main channel timeout when sending incremental stream" {
1392 R 0 flushall
1393 R 0 config set repl-timeout 2 ;# 2s for main channel timeout
1394
1395 set r1_pid [S 1 process_id]
1396 # in order to have time to pause the destination node
1397 R 1 config set key-load-delay 50000 ;# 50ms each 16k data
1398
1399 # start migration from #0 to #1
1400 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1401
1402 # Create 200 keys of 16k size traffic on slot 0, streaming buffer need 10s (200*50ms)
1403 populate_slot 200 -idx 0 -slot 0 -size 16384
1404
1405 # wait for streaming buffer state, then pause the destination node
1406 wait_for_condition 1000 20 {
1407 [string match {*streaming-buffer*} [migration_status 1 $task_id state]]
1408 } else {
1409 fail "ASM task did not stream buffer, state: [migration_status 1 $task_id state]"
1410 }
1411 pause_process $r1_pid
1412
1413 # Start the slot 0 write load on the R 0
1414 set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 [slot_key 0 mykey] 500]
1415
1416 # the source node will fail after several seconds (including the time
1417 # to fill the socket buffer of source node), the main channel can not
1418 # write data to destination since the destination is paused
1419 wait_for_condition 1000 30 {
1420 [string match {*failed*} [migration_status 0 $task_id state]] &&
1421 [string match {*Main channel*Connection timeout*} \
1422 [migration_status 0 $task_id last_error]]
1423 } else {
1424 fail "ASM task did not fail"
1425 }
1426 stop_write_load $load_handle
1427 resume_process $r1_pid
1428
1429 R 0 config set repl-timeout 60
1430 R 1 config set key-load-delay 0
1431 R 0 cluster migration cancel id $task_id
1432 R 1 cluster migration cancel id $task_id
1433 R 0 flushall
1434 }
1435
1436 test "Source server paused timeout" {
1437 # set timeout to 0, so the task will fail immediately when checking timeout
1438 R 0 config set cluster-slot-migration-write-pause-timeout 0
1439
1440 # start migration from node 0 to 1
1441 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1442
1443 # start the slot 0 write load on the node 0
1444 set slot0_key [slot_key 0 mykey]
1445 set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key]
1446
1447 # node 0 will fail since server paused timeout
1448 wait_for_condition 2000 10 {
1449 [string match {*failed*} [migration_status 0 $task_id state]] &&
1450 [string match {*Server paused timeout*} \
1451 [migration_status 0 $task_id last_error]]
1452 } else {
1453 fail "ASM task did not fail"
1454 }
1455
1456 stop_write_load $load_handle
1457
1458 # reset config
1459 R 0 config set cluster-slot-migration-write-pause-timeout 10000
1460 R 0 cluster migration cancel id $task_id
1461 R 1 cluster migration cancel id $task_id
1462 }
1463
1464 test "Sync buffer drain timeout" {
1465 # set a fail point to avoid the source node to enter handoff prep state
1466 # to test the sync buffer drain timeout
1467 R 0 debug asm-failpoint "migrate-main-channel" "handoff-prep"
1468 R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 5000
1469
1470 set r1_pid [S 1 process_id]
1471
1472 # start migration from node 0 to 1
1473 set task_id [setup_slot_migration_with_delay 0 1 0 100]
1474
1475 # start the slot 0 write load on the node 0
1476 set slot0_key [slot_key 0 mykey]
1477 set load_handle [start_write_load "127.0.0.1" [get_port 0] 100 $slot0_key]
1478
1479 # wait for entering streaming buffer state
1480 wait_for_condition 1000 10 {
1481 [string match {*wait-stream-eof*} [migration_status 1 $task_id state]]
1482 } else {
1483 fail "ASM task did not enter wait-stream-eof state"
1484 }
1485
1486 pause_process $r1_pid ;# avoid the destination to apply commands
1487
1488 # node 0 will fail since sync buffer drain timeout
1489 wait_for_condition 2000 10 {
1490 [string match {*failed*} [migration_status 0 $task_id state]] &&
1491 [string match {*Sync buffer drain timeout*} \
1492 [migration_status 0 $task_id last_error]]
1493 } else {
1494 fail "ASM task did not fail"
1495 }
1496
1497 stop_write_load $load_handle
1498 resume_process $r1_pid
1499
1500 # reset config
1501 R 0 config set cluster-slot-migration-sync-buffer-drain-timeout 60000
1502 R 0 debug asm-failpoint "" ""
1503 R 0 cluster migration cancel id $task_id
1504 R 1 cluster migration cancel id $task_id
1505 }
1506
1507 test "Cluster implementation cannot start migrate task temporarily" {
1508 # Inject a fail point to make the source node not ready
1509 R 0 debug asm-failpoint "migrate-main-channel" "none"
1510
1511 # start migration from node 0 to 1
1512 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100]
1513
1514 # verify source node replies SYNCSLOTS with -NOTREADY
1515 set loglines [count_log_lines -1]
1516 wait_for_log_messages -1 {"*Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later*"} $loglines 100 100
1517
1518 # clear the fail point and verify the task is completed
1519 R 0 debug asm-failpoint "" ""
1520 wait_for_asm_done
1521 assert_equal "completed" [migration_status 0 $task_id state]
1522 assert_equal "completed" [migration_status 1 $task_id state]
1523
1524 # cleanup
1525 R 0 CLUSTER MIGRATION IMPORT 0 100
1526 wait_for_asm_done
1527 }
1528}
1529
1530start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
1531 test "Test bgtrim after a successful migration" {
1532 R 0 debug asm-trim-method bg
1533 R 3 debug asm-trim-method bg
1534 R 0 CONFIG RESETSTAT
1535 R 3 CONFIG RESETSTAT
1536
1537 R 0 flushall
1538 # Fill slot 0
1539 populate_slot 1000 -idx 0 -slot 0
1540 # Fill slot 1 with keys that have TTL
1541 populate_slot 1000 -idx 0 -slot 1 -prefix "expirekey" -expires 100
1542 # HFE key on slot 2
1543 set slot2_hfekey [slot_key 2 hfekey]
1544 R 0 HSETEX $slot2_hfekey EX 10 FIELDS 1 f1 v1
1545
1546 # Fill slot 101, these keys won't be migrated
1547 populate_slot 1000 -idx 0 -slot 101
1548 # Fill slot 102 with keys that have TTL
1549 populate_slot 1000 -idx 0 -slot 102 -prefix "expirekey" -expires 100
1550 # HFE key on slot 103
1551 set slot103_hfekey [slot_key 103 hfekey]
1552 R 0 HSETEX $slot103_hfekey EX 10 FIELDS 1 f1 v1
1553
1554 # migrate slot 0 to node-1
1555 R 1 CLUSTER MIGRATION IMPORT 0 100
1556 wait_for_asm_done
1557
1558 # Verify the data is migrated
1559 wait_for_ofs_sync [Rn 0] [Rn 3]
1560 assert_equal 2001 [R 0 dbsize]
1561 assert_equal 2001 [R 3 dbsize]
1562 wait_for_ofs_sync [Rn 1] [Rn 4]
1563 assert_equal 2001 [R 1 dbsize]
1564 assert_equal 2001 [R 4 dbsize]
1565
1566 # Verify the keys are trimmed lazily
1567 wait_for_condition 1000 10 {
1568 [S 0 lazyfreed_objects] == 2001 &&
1569 [S 3 lazyfreed_objects] == 2001
1570 } else {
1571 puts "lazyfreed_objects: [S 0 lazyfreed_objects] [S 3 lazyfreed_objects]"
1572 fail "Background trim did not happen"
1573 }
1574
1575 # Cleanup
1576 R 0 CLUSTER MIGRATION IMPORT 0 100
1577 wait_for_asm_done
1578 R 0 flushall
1579 R 0 debug asm-trim-method default
1580 R 3 debug asm-trim-method default
1581 }
1582
1583 test "Test bgtrim after a failed migration" {
1584 R 0 debug asm-trim-method bg
1585 R 3 debug asm-trim-method bg
1586 R 1 CONFIG RESETSTAT
1587 R 4 CONFIG RESETSTAT
1588
1589 # Fill slot 0 on node-0 and migrate it to node-1 (with some delay)
1590 R 0 flushall
1591 set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000]
1592 after 1000 ;# wait some time so that some keys are moved
1593
1594 # Fail the migration
1595 R 1 CLUSTER MIGRATION CANCEL ID $task_id
1596 wait_for_asm_done
1597
1598 # Verify the data is not migrated
1599 assert_equal 10000 [R 0 dbsize]
1600 assert_equal 10000 [R 3 dbsize]
1601
1602 # Verify the keys are trimmed lazily after a failed import on dest side.
1603 wait_for_condition 1000 20 {
1604 [R 1 dbsize] == 0 &&
1605 [R 4 dbsize] == 0 &&
1606 [S 1 lazyfreed_objects] > 0 &&
1607 [S 4 lazyfreed_objects] > 0
1608 } else {
1609 fail "Background trim did not happen"
1610 }
1611
1612 # Cleanup
1613 wait_for_asm_done
1614 R 0 flushall
1615 R 0 debug asm-trim-method default
1616 R 3 debug asm-trim-method default
1617 }
1618
1619 test "Test bgtrim unblocks stream client" {
1620 # Two clients waiting for data on two different streams which are in
1621 # different slots. We are going to migrate one slot, which will unblock
1622 # the client. The other client should still be blocked.
1623 R 0 debug asm-trim-method bg
1624
1625 set key0 [slot_key 0 mystream]
1626 set key1 [slot_key 1 mystream]
1627
1628 # First client waits on slot-0 key
1629 R 0 DEL $key0
1630 R 0 XADD $key0 666 f v
1631 R 0 XGROUP CREATE $key0 mygroup $
1632 set rd0 [redis_deferring_client]
1633 $rd0 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key0 ">"
1634 wait_for_blocked_clients_count 1
1635
1636 # Second client waits on slot-1 key
1637 R 0 DEL $key1
1638 R 0 XADD $key1 666 f v
1639 R 0 XGROUP CREATE $key1 mygroup $
1640 set rd1 [redis_deferring_client]
1641 $rd1 XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS $key1 ">"
1642 wait_for_blocked_clients_count 2
1643
1644 # Migrate slot 0
1645 R 1 CLUSTER MIGRATION IMPORT 0 0
1646 wait_for_asm_done
1647
1648 # First client should get MOVED error
1649 assert_error "*MOVED*" {$rd0 read}
1650 $rd0 close
1651
1652 # Second client should operate normally
1653 R 0 XADD $key1 667 f v
1654 set res [$rd1 read]
1655 assert_equal [lindex $res 0 1 0] {667-0 {f v}}
1656 $rd1 close
1657
1658 # cleanup
1659 wait_for_asm_done
1660 R 0 CLUSTER MIGRATION IMPORT 0 0
1661 wait_for_asm_done
1662 R 0 flushall
1663 R 0 debug asm-trim-method default
1664 }
1665
1666 test "Test bgtrim touches watched keys" {
1667 R 0 debug asm-trim-method bg
1668
1669 # bgtrim should touch watched keys on migrated slots
1670 set key0 [slot_key 0 key]
1671 R 0 set $key0 30
1672 R 0 watch $key0
1673 R 1 CLUSTER MIGRATION IMPORT 0 0
1674 wait_for_asm_done
1675 R 0 multi
1676 R 0 ping
1677 assert_equal {} [R 0 exec]
1678
1679 # bgtrim should not touch watched keys on other slots
1680 set key2 [slot_key 2 key]
1681 R 0 set $key2 30
1682 R 0 watch $key2
1683 R 1 CLUSTER MIGRATION IMPORT 1 1
1684 wait_for_asm_done
1685 R 0 multi
1686 R 0 ping
1687 assert_equal PONG [R 0 exec]
1688
1689 # cleanup
1690 wait_for_asm_done
1691 R 0 CLUSTER MIGRATION IMPORT 0 1
1692 wait_for_asm_done
1693 R 0 flushall
1694 R 0 debug asm-trim-method default
1695 }
1696
1697 test "Test bgtrim after a FAILOVER on destination side" {
1698 R 1 debug asm-trim-method bg
1699 R 4 debug asm-trim-method bg
1700
1701 set loglines [count_log_lines -4]
1702
1703 # Fill slot 0 on node-0 and migrate it to node-1 (with some delay)
1704 R 0 flushall
1705 set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000]
1706 after 1000 ;# wait some time so that some keys are moved
1707
1708 # Trigger a failover with force to simulate unreachable master and
1709 # verify unowned keys are trimmed once replica becomes master.
1710 failover_and_wait_for_done 4 force
1711 wait_for_log_messages -4 {"*Detected keys in slots that do not belong*Scheduling trim*"} $loglines 1000 10
1712 wait_for_condition 1000 10 {
1713 [R 1 dbsize] == 0 &&
1714 [R 4 dbsize] == 0
1715 } else {
1716 fail "Background trim did not happen"
1717 }
1718
1719 # cleanup
1720 wait_for_cluster_propagation
1721 failover_and_wait_for_done 1
1722 R 0 config set rdb-key-save-delay 0
1723 R 1 debug asm-trim-method default
1724 R 4 debug asm-trim-method default
1725 wait_for_asm_done
1726 }
1727
1728 test "CLUSTER SETSLOT is not allowed if there is a pending trim job" {
1729 R 0 debug asm-trim-method bg
1730 R 3 debug asm-trim-method bg
1731
1732 # Fill slot 0 on node-0 and migrate it to node-1 (with some delay)
1733 R 0 flushall
1734 set task_id [setup_slot_migration_with_delay 0 1 0 100 10000 1000]
1735
1736 # Pause will cancel the task and there will be a pending trim job
1737 # until writes are allowed again.
1738 R 1 client pause 100000 write ;# pause 100s
1739 wait_for_asm_done
1740
1741 # CLUSTER SETSLOT is not allowed if there is a pending trim job.
1742 assert_error {*There is a pending trim job for slot 0*} {R 1 CLUSTER SETSLOT 0 STABLE}
1743
1744 # Unpause the server, trim will be triggered and SETSLOT will be allowed
1745 R 1 client unpause
1746 R 1 CLUSTER SETSLOT 0 STABLE
1747 }
1748}
1749
1750start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no save ""}} {
1751 test "Test active trim after a successful migration" {
1752 R 0 debug asm-trim-method active
1753 R 3 debug asm-trim-method active
1754 populate_slot 500 -slot 0
1755 populate_slot 500 -slot 1
1756 populate_slot 500 -slot 3
1757 populate_slot 500 -slot 4
1758
1759 # Migrate 1500 keys
1760 R 1 CLUSTER MIGRATION IMPORT 0 1 3 3
1761 wait_for_asm_done
1762
1763 wait_for_condition 1000 10 {
1764 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
1765 [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
1766 [CI 0 cluster_slot_migration_active_trim_current_job_trimmed] == 1500 &&
1767 [CI 3 cluster_slot_migration_active_trim_running] == 0 &&
1768 [CI 3 cluster_slot_migration_active_trim_current_job_trimmed] == 1500
1769 } else {
1770 fail "trim failed"
1771 }
1772
1773 assert_equal 1500 [CI 0 cluster_slot_migration_active_trim_current_job_keys]
1774 assert_equal 1500 [CI 3 cluster_slot_migration_active_trim_current_job_keys]
1775
1776 assert_equal 500 [R 0 dbsize]
1777 assert_equal 500 [R 3 dbsize]
1778 assert_equal 1500 [R 1 dbsize]
1779 assert_equal 1500 [R 4 dbsize]
1780 assert_equal 0 [R 0 cluster countkeysinslot 0]
1781 assert_equal 0 [R 0 cluster countkeysinslot 1]
1782 assert_equal 0 [R 0 cluster countkeysinslot 3]
1783 assert_equal 500 [R 0 cluster countkeysinslot 4]
1784
1785 # cleanup
1786 R 0 debug asm-trim-method default
1787 R 3 debug asm-trim-method default
1788 R 0 CLUSTER MIGRATION IMPORT 0 1 3 3
1789 wait_for_asm_done
1790 R 0 flushall
1791 R 1 flushall
1792 }
1793
1794 test "Test multiple active trim jobs can be scheduled" {
1795 # Active trim will be scheduled but it won't run
1796 R 0 debug asm-trim-method active -1
1797 R 3 debug asm-trim-method active -1
1798
1799 populate_slot 500 -slot 0
1800 populate_slot 500 -slot 1
1801 populate_slot 500 -slot 3
1802 populate_slot 500 -slot 4
1803
1804 # Migrate 1500 keys
1805 R 1 CLUSTER MIGRATION IMPORT 0 1
1806 wait_for_condition 1000 10 {
1807 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
1808 [CI 0 cluster_slot_migration_active_trim_running] == 1 &&
1809 [CI 3 cluster_slot_migration_active_trim_running] == 1
1810 } else {
1811 fail "migrate failed"
1812 }
1813
1814 # Migrate another slot and verify there are two trim tasks on the source
1815 R 1 CLUSTER MIGRATION IMPORT 3 3
1816 wait_for_condition 1000 10 {
1817 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
1818 [CI 0 cluster_slot_migration_active_trim_running] == 2 &&
1819 [CI 3 cluster_slot_migration_active_trim_running] == 2
1820 } else {
1821 fail "migrate failed"
1822 }
1823
1824 # Enabled active trim and wait until it is completed.
1825 R 0 debug asm-trim-method active 0
1826 R 3 debug asm-trim-method active 0
1827 wait_for_asm_done
1828
1829 assert_equal 500 [R 0 dbsize]
1830 assert_equal 500 [R 3 dbsize]
1831 assert_equal 0 [R 0 cluster countkeysinslot 0]
1832 assert_equal 0 [R 0 cluster countkeysinslot 1]
1833 assert_equal 0 [R 0 cluster countkeysinslot 3]
1834 assert_equal 500 [R 0 cluster countkeysinslot 4]
1835
1836 # cleanup
1837 R 0 debug asm-trim-method default
1838 R 3 debug asm-trim-method default
1839 R 0 CLUSTER MIGRATION IMPORT 0 1 3 3
1840 wait_for_asm_done
1841 R 0 flushall
1842 R 1 flushall
1843 }
1844
1845 test "Test active-trim clears partially imported keys on cancel" {
1846 R 1 debug asm-trim-method active
1847 R 4 debug asm-trim-method active
1848
1849 # Rdb delivery will take 10 seconds
1850 R 0 config set rdb-key-save-delay 10000
1851 populate_slot 250 -slot 0
1852 populate_slot 250 -slot 1
1853 populate_slot 250 -slot 3
1854 populate_slot 250 -slot 4
1855
1856 R 1 CLUSTER MIGRATION IMPORT 0 100
1857 after 2000
1858 R 1 CLUSTER MIGRATION CANCEL ALL
1859 wait_for_asm_done
1860
1861 assert_morethan [CI 1 cluster_slot_migration_active_trim_current_job_keys] 0
1862 assert_morethan [CI 4 cluster_slot_migration_active_trim_current_job_trimmed] 0
1863
1864 assert_equal 1000 [R 0 dbsize]
1865 assert_equal 1000 [R 3 dbsize]
1866 assert_equal 0 [R 1 dbsize]
1867 assert_equal 0 [R 4 dbsize]
1868
1869 # Cleanup
1870 R 1 debug asm-trim-method default
1871 R 4 debug asm-trim-method default
1872 R 0 config set rdb-key-save-delay 0
1873 }
1874
1875 test "Test active-trim clears partially imported keys on failover" {
1876 R 1 debug asm-trim-method active
1877 R 4 debug asm-trim-method active
1878
1879 # Rdb delivery will take 10 seconds
1880 R 0 config set rdb-key-save-delay 10000
1881
1882 populate_slot 250 -slot 0
1883 populate_slot 250 -slot 1
1884 populate_slot 250 -slot 3
1885 populate_slot 250 -slot 4
1886
1887 set prev_trim_started_1 [CI 1 cluster_slot_migration_stats_active_trim_started]
1888 set prev_trim_started_4 [CI 4 cluster_slot_migration_stats_active_trim_started]
1889
1890 R 1 CLUSTER MIGRATION IMPORT 0 100
1891 after 2000
1892 failover_and_wait_for_done 4
1893 wait_for_asm_done
1894
1895 # Verify there is at least one trim job started
1896 assert_morethan [CI 1 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_1
1897 assert_morethan [CI 4 cluster_slot_migration_stats_active_trim_started] $prev_trim_started_4
1898
1899 assert_equal 1000 [R 0 dbsize]
1900 assert_equal 1000 [R 3 dbsize]
1901 assert_equal 0 [R 1 dbsize]
1902 assert_equal 0 [R 4 dbsize]
1903
1904 # Cleanup
1905 failover_and_wait_for_done 1
1906 R 1 debug asm-trim-method default
1907 R 4 debug asm-trim-method default
1908 R 0 config set rdb-key-save-delay 0
1909 R 0 flushall
1910 R 1 flushall
1911 }
1912
1913 test "Test import task does not start if active trim is in progress for the same slots" {
1914 # Active trim will be scheduled but it won't run
1915 R 0 flushall
1916 R 1 flushall
1917 R 0 debug asm-trim-method active -1
1918
1919 populate_slot 500 -slot 0
1920 populate_slot 500 -slot 1
1921
1922 # Migrate 1000 keys
1923 R 1 CLUSTER MIGRATION IMPORT 0 1
1924 wait_for_condition 1000 10 {
1925 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
1926 [CI 0 cluster_slot_migration_active_trim_running] == 1
1927 } else {
1928 fail "migrate failed"
1929 }
1930
1931 # Try to migrate slots back
1932 R 0 CLUSTER MIGRATION IMPORT 0 1
1933 wait_for_log_messages 0 {"*Can not start import task*trim in progress for some of the slots*"} 0 1000 10
1934
1935 # Enabled active trim and verify slots are imported back
1936 R 0 debug asm-trim-method active 0
1937 wait_for_asm_done
1938
1939 assert_equal 1000 [R 0 dbsize]
1940 assert_equal 500 [R 0 cluster countkeysinslot 0]
1941 assert_equal 500 [R 0 cluster countkeysinslot 1]
1942
1943 # cleanup
1944 R 0 debug asm-trim-method default
1945 R 0 flushall
1946 }
1947
1948 test "Rdb save during active trim should skip keys in trimmed slots" {
1949 # Insert some delay to activate trim
1950 R 0 debug asm-trim-method active 1000
1951 R 0 config set repl-diskless-sync-delay 0
1952 R 0 flushall
1953
1954 populate_slot 5000 -idx 0 -slot 0
1955 populate_slot 5000 -idx 0 -slot 1
1956 populate_slot 5000 -idx 0 -slot 2
1957
1958 # Start migration and wait until trim is in progress
1959 R 1 CLUSTER MIGRATION IMPORT 0 1
1960 wait_for_condition 1000 10 {
1961 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
1962 [CI 0 cluster_slot_migration_active_trim_running] == 1 &&
1963 [S 0 rdb_bgsave_in_progress] == 0
1964 } else {
1965 puts "[CI 0 cluster_slot_migration_active_tasks]"
1966 puts "[CI 0 cluster_slot_migration_active_trim_running]"
1967 fail "trim failed"
1968 }
1969
1970 # Trigger save during active trim
1971 R 0 save
1972 # Wait until the log contains a "keys skipped" message with a non-zero value
1973 wait_for_log_messages 0 {"*BGSAVE done, 5000 keys saved, [1-9]* keys skipped*"} 0 1000 10
1974
1975 restart_server 0 yes no yes nosave
1976 assert_equal 5000 [R 0 dbsize]
1977 assert_equal 0 [R 0 cluster countkeysinslot 0]
1978 assert_equal 0 [R 0 cluster countkeysinslot 1]
1979 assert_equal 5000 [R 0 cluster countkeysinslot 2]
1980
1981 # Cleanup
1982 wait_for_cluster_propagation
1983 wait_for_cluster_state "ok"
1984 R 0 flushall
1985 R 1 flushall
1986 R 0 save
1987 R 0 CLUSTER MIGRATION IMPORT 0 1
1988 wait_for_asm_done
1989 }
1990
1991 test "AOF rewrite during active trim should skip keys in trimmed slots" {
1992 R 0 debug asm-trim-method active 1000
1993 R 0 config set repl-diskless-sync-delay 0
1994 R 0 config set aof-use-rdb-preamble no
1995 R 0 config set appendonly yes
1996 R 0 config rewrite
1997 R 0 flushall
1998 populate_slot 5000 -idx 0 -slot 0
1999 populate_slot 5000 -idx 0 -slot 1
2000 populate_slot 5000 -idx 0 -slot 2
2001
2002 R 1 CLUSTER MIGRATION IMPORT 0 1
2003 wait_for_condition 1000 10 {
2004 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
2005 [CI 0 cluster_slot_migration_active_trim_running] == 1
2006 } else {
2007 puts "[CI 0 cluster_slot_migration_active_tasks]"
2008 puts "[CI 0 cluster_slot_migration_active_trim_running]"
2009 fail "trim failed"
2010 }
2011
2012 wait_for_condition 50 100 {
2013 [S 0 rdb_bgsave_in_progress] == 0
2014 } else {
2015 fail "bgsave is in progress"
2016 }
2017
2018 R 0 bgrewriteaof
2019 # Wait until the log contains a "keys skipped" message with a non-zero value
2020 wait_for_log_messages 0 {"*AOF rewrite done, [1-9]* keys saved, [1-9]* keys skipped*"} 0 1000 10
2021
2022 restart_server 0 yes no yes nosave
2023 assert_equal 5000 [R 0 dbsize]
2024 assert_equal 0 [R 0 cluster countkeysinslot 0]
2025 assert_equal 0 [R 0 cluster countkeysinslot 1]
2026 assert_equal 5000 [R 0 cluster countkeysinslot 2]
2027
2028 # cleanup
2029 R 0 config set appendonly no
2030 R 0 config rewrite
2031 restart_server 0 yes no yes nosave
2032 wait_for_cluster_propagation
2033 wait_for_cluster_state "ok"
2034 R 0 flushall
2035 R 1 flushall
2036 R 0 save
2037 R 0 CLUSTER MIGRATION IMPORT 0 1
2038 wait_for_asm_done
2039 }
2040
2041 test "Pause actions will stop active trimming" {
2042 R 0 debug asm-trim-method active 1000
2043 R 0 config set repl-diskless-sync-delay 0
2044 R 0 flushall
2045 populate_slot 10000 -idx 0 -slot 0
2046
2047 R 1 CLUSTER MIGRATION IMPORT 0 100
2048 wait_for_condition 1000 10 {
2049 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
2050 [CI 0 cluster_slot_migration_active_trim_running] == 1
2051 } else {
2052 puts "[CI 0 cluster_slot_migration_active_tasks]"
2053 puts "[CI 0 cluster_slot_migration_active_trim_running]"
2054 fail "trim failed"
2055 }
2056
2057 # Pause the server and verify no keys are trimmed
2058 R 0 client pause 100000 write ;# pause 100s
2059 set prev [CI 0 cluster_slot_migration_active_trim_current_job_trimmed]
2060 after 1000 ; # wait some time to see if any keys are trimmed
2061 set curr [CI 0 cluster_slot_migration_active_trim_current_job_trimmed]
2062 assert_equal $prev $curr
2063
2064 R 0 client unpause
2065 R 0 debug asm-trim-method default
2066 wait_for_asm_done
2067 assert_equal 0 [R 0 dbsize]
2068
2069 # revert
2070 R 0 CLUSTER MIGRATION IMPORT 0 100
2071 wait_for_asm_done
2072 assert_equal 10000 [R 0 dbsize]
2073 }
2074
2075 foreach diskless_load {"disabled" "swapdb" "on-empty-db"} {
2076 test "Test fullsync cancels active trim (repl-diskless-load $diskless_load)" {
2077 R 3 debug asm-trim-method active -10
2078 R 3 config set repl-diskless-load $diskless_load
2079 R 0 flushall
2080
2081 R 0 config set repl-diskless-sync-delay 0
2082 populate_slot 10000 -idx 0 -slot 0
2083
2084 R 1 CLUSTER MIGRATION IMPORT 0 0
2085 wait_for_condition 1000 10 {
2086 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
2087 [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
2088 [CI 3 cluster_slot_migration_active_trim_running] == 1
2089 } else {
2090 puts "[CI 0 cluster_slot_migration_active_tasks]"
2091 puts "[CI 0 cluster_slot_migration_active_trim_running]"
2092 puts "[CI 3 cluster_slot_migration_active_trim_running]"
2093 fail "trim failed"
2094 }
2095
2096 set prev_cancelled [CI 3 cluster_slot_migration_stats_active_trim_cancelled]
2097 R 0 config set client-output-buffer-limit "replica 1024 0 0"
2098
2099 # Trigger a fullsync
2100 populate_slot 1 -idx 0 -size 2000000 -slot 2
2101
2102 wait_for_condition 1000 10 {
2103 [CI 3 cluster_slot_migration_active_trim_running] == 0 &&
2104 [CI 3 cluster_slot_migration_stats_active_trim_cancelled] == $prev_cancelled + 1
2105 } else {
2106 puts "[CI 3 cluster_slot_migration_active_trim_running]"
2107 puts "[CI 3 cluster_slot_migration_stats_active_trim_cancelled]"
2108 fail "trim failed"
2109 }
2110
2111 R 3 debug asm-trim-method active 0
2112 R 3 config set repl-diskless-load disabled
2113 R 0 CLUSTER MIGRATION IMPORT 0 0
2114 wait_for_asm_done
2115 wait_for_ofs_sync [Rn 0] [Rn 3]
2116 assert_equal 10001 [R 0 dbsize]
2117 assert_equal 10001 [R 3 dbsize]
2118 assert_equal 0 [R 1 dbsize]
2119 assert_equal 0 [R 4 dbsize]
2120 R 0 flushall
2121 }
2122 }
2123
2124 test "Test importing slots while active-trim is in progress for the same slots on replica" {
2125 R 3 debug asm-trim-method active 10000
2126 R 0 flushall
2127 populate_slot 10000 -slot 0
2128 wait_for_ofs_sync [Rn 0] [Rn 3]
2129
2130 # Wait until active trim is in progress on replica
2131 R 1 CLUSTER MIGRATION IMPORT 0 100
2132 wait_for_condition 1000 10 {
2133 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
2134 [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
2135 [CI 3 cluster_slot_migration_active_trim_running] == 1
2136 } else {
2137 puts "[CI 0 cluster_slot_migration_active_tasks]"
2138 puts "[CI 0 cluster_slot_migration_active_trim_running]"
2139 puts "[CI 3 cluster_slot_migration_active_trim_running]"
2140 fail "trim failed"
2141 }
2142
2143 set loglines [count_log_lines -3]
2144
2145 # Get slots back
2146 R 0 CLUSTER MIGRATION IMPORT 0 100
2147 wait_for_condition 1000 20 {
2148 [CI 0 cluster_slot_migration_active_tasks] == 1 &&
2149 [CI 0 cluster_slot_migration_active_trim_running] == 0 &&
2150 [CI 3 cluster_slot_migration_active_trim_running] == 1
2151 } else {
2152 fail "trim failed"
2153 }
2154
2155 # Verify replica blocks master until trim is done
2156 wait_for_log_messages -3 {"*Blocking master client until trim job is done*"} $loglines 1000 30
2157 R 3 debug asm-trim-method active 0
2158 wait_for_log_messages -3 {"*Unblocking master client after active trim*"} $loglines 1000 30
2159
2160 wait_for_asm_done
2161 wait_for_ofs_sync [Rn 0] [Rn 3]
2162 assert_equal 10000 [R 0 dbsize]
2163 assert_equal 10000 [R 3 dbsize]
2164 assert_equal 0 [R 1 dbsize]
2165 assert_equal 0 [R 4 dbsize]
2166 }
2167
2168 test "TRIMSLOTS should not trim slots that this node is serving" {
2169 assert_error {*the slot 0 is served by this node*} {R 0 trimslots ranges 1 0 0}
2170 assert_error {*READONLY*} {R 3 trimslots ranges 1 0 100}
2171 assert_equal {OK} [R 0 trimslots ranges 1 16383 16383]
2172 assert_error {*READONLY*} {R 3 trimslots ranges 1 16383 16383}
2173 }
2174
2175 test "Trigger multiple active trim jobs at the same time" {
2176 R 1 debug asm-trim-method active 0
2177 R 1 flushall
2178
2179 set prev_trim_done [CI 1 cluster_slot_migration_stats_active_trim_completed]
2180
2181 R 1 debug populate 1000 [slot_prefix 0] 100
2182 R 1 debug populate 1000 [slot_prefix 1] 100
2183 R 1 debug populate 1000 [slot_prefix 2] 100
2184
2185 R 1 multi
2186 R 1 trimslots ranges 1 0 0
2187 R 1 trimslots ranges 1 1 1
2188 R 1 trimslots ranges 1 2 2
2189 R 1 exec
2190
2191 wait_for_condition 1000 10 {
2192 [CI 1 cluster_slot_migration_stats_active_trim_completed] == $prev_trim_done + 3
2193 } else {
2194 fail "active trim failed"
2195 }
2196
2197 R 1 flushall
2198 R 1 debug asm-trim-method default
2199 }
2200
2201 test "Restart will clean up unowned slot keys" {
2202 R 1 flushall
2203
2204 # generate 1000 keys belonging to slot 0
2205 R 1 debug populate 1000 [slot_prefix 0] 100
2206 assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1000}
2207
2208 # restart node-1
2209 restart_server -1 true false true save
2210 wait_for_cluster_propagation
2211 wait_for_cluster_state "ok"
2212
2213 # Node-1 has no keys since unowned slot 0 keys were cleaned up during restart
2214 assert {[scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] == {}}
2215
2216 R 1 flushall
2217 }
2218
2219 test "Test active trim is used when client tracking is used" {
2220 R 0 flushall
2221 R 1 flushall
2222 R 0 debug asm-trim-method default
2223 R 1 debug asm-trim-method default
2224
2225 set prev_active_trim [CI 0 cluster_slot_migration_stats_active_trim_completed]
2226
2227 # Setup a tracking client that is redirected to a pubsub client
2228 set rd_redirection [redis_deferring_client]
2229 $rd_redirection client id
2230 set redir_id [$rd_redirection read]
2231 $rd_redirection subscribe __redis__:invalidate
2232 $rd_redirection read ; # Consume the SUBSCRIBE reply.
2233
2234 # setup tracking
2235 set key0 [slot_key 0 key]
2236 R 0 CLIENT TRACKING on REDIRECT $redir_id
2237 R 0 SET $key0 1
2238 R 0 GET $key0
2239 R 1 CLUSTER MIGRATION IMPORT 0 0
2240 wait_for_asm_done
2241
2242 wait_for_condition 1000 10 {
2243 [CI 0 cluster_slot_migration_stats_active_trim_completed] == [expr $prev_active_trim + 1]
2244 } else {
2245 fail "active trim did not happen"
2246 }
2247
2248 # Verify the tracking client received the invalidation message
2249 set msg [$rd_redirection read]
2250 set head [lindex $msg 0]
2251
2252 if {$head eq "message"} {
2253 # RESP 2
2254 set got_key [lindex [lindex $msg 2] 0]
2255 } elseif {$head eq "invalidate"} {
2256 # RESP 3
2257 set got_key [lindex $msg 1 0]
2258 } else {
2259 fail "unexpected invalidation message: $msg"
2260 }
2261 assert_equal $got_key $key0
2262
2263 # cleanup
2264 $rd_redirection close
2265 wait_for_asm_done
2266 R 0 CLUSTER MIGRATION IMPORT 0 0
2267 wait_for_asm_done
2268 R 0 flushall
2269 }
2270}
2271
2272set testmodule [file normalize tests/modules/atomicslotmigration.so]
2273
2274start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no]] {
2275 test "Module api sanity" {
2276 R 0 asm.sanity ;# on master
2277 R 3 asm.sanity ;# on replica
2278 }
2279
2280 test "Module replicate cross slot command" {
2281 set task_id [setup_slot_migration_with_delay 0 1 0 100]
2282 set listkey [slot_key 0 "asmlist"]
2283 # replicate cross slot command during migrating
2284 R 0 asm.lpush_replicate_crossslot_command $listkey "item1"
2285
2286 # node 0 will fail due to cross slot
2287 wait_for_condition 2000 10 {
2288 [string match {*canceled*} [migration_status 0 $task_id state]] &&
2289 [string match {*cross slot*} [migration_status 0 $task_id last_error]]
2290 } else {
2291 fail "ASM task did not fail"
2292 }
2293 R 1 CLUSTER MIGRATION CANCEL ID $task_id
2294
2295 # sanity check if lpush replicated correctly to the replica
2296 wait_for_ofs_sync [Rn 0] [Rn 3]
2297 assert_equal {item1} [R 0 lrange $listkey 0 -1]
2298 R 3 readonly
2299 assert_equal {item1} [R 3 lrange $listkey 0 -1]
2300 }
2301
2302 test "Test RM_ClusterCanAccessKeysInSlot" {
2303 # Test invalid slots
2304 assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot -1]
2305 assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 20000]
2306 assert_equal 0 [R 2 asm.cluster_can_access_keys_in_slot 16384]
2307 assert_equal 0 [R 5 asm.cluster_can_access_keys_in_slot 16384]
2308
2309 # Test on a master-replica pair
2310 assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 0]
2311 assert_equal 1 [R 0 asm.cluster_can_access_keys_in_slot 100]
2312 assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 0]
2313 assert_equal 1 [R 3 asm.cluster_can_access_keys_in_slot 100]
2314
2315 # Test on a master-replica pair
2316 assert_equal 1 [R 2 asm.cluster_can_access_keys_in_slot 16383]
2317 assert_equal 1 [R 5 asm.cluster_can_access_keys_in_slot 16383]
2318 }
2319
2320 test "Test RM_ClusterCanAccessKeysInSlot returns false for unowned slots" {
2321 # Active trim will be scheduled but it won't run
2322 R 0 debug asm-trim-method active -1
2323 R 3 debug asm-trim-method active -1
2324
2325 setup_slot_migration_with_delay 0 1 0 100 3 1000000
2326
2327 # Verify importing slots are not local
2328 assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 0]
2329 assert_equal 0 [R 1 asm.cluster_can_access_keys_in_slot 100]
2330 assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 0]
2331 assert_equal 0 [R 4 asm.cluster_can_access_keys_in_slot 100]
2332
2333 wait_for_condition 1000 10 {
2334 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
2335 [CI 0 cluster_slot_migration_active_trim_running] == 1 &&
2336 [CI 3 cluster_slot_migration_active_trim_running] == 1
2337 } else {
2338 fail "migrate failed"
2339 }
2340
2341 # Wait for config propagation before checking the slot ownership on replica
2342 wait_for_cluster_propagation
2343
2344 # Verify slots that are being trimmed are not local
2345 assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 0]
2346 assert_equal 0 [R 0 asm.cluster_can_access_keys_in_slot 100]
2347 assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 0]
2348 assert_equal 0 [R 3 asm.cluster_can_access_keys_in_slot 100]
2349
2350 # Enabled active trim and wait until it is completed.
2351 R 0 debug asm-trim-method active 0
2352 R 3 debug asm-trim-method active 0
2353 wait_for_asm_done
2354 wait_for_ofs_sync [Rn 0] [Rn 3]
2355
2356 # Verify slots are local after migration
2357 assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 0]
2358 assert_equal 1 [R 1 asm.cluster_can_access_keys_in_slot 100]
2359 assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 0]
2360 assert_equal 1 [R 4 asm.cluster_can_access_keys_in_slot 100]
2361
2362 # cleanup
2363 R 0 debug asm-trim-method default
2364 R 3 debug asm-trim-method default
2365 R 0 CLUSTER MIGRATION IMPORT 0 100
2366 wait_for_asm_done
2367 R 0 flushall
2368 R 1 flushall
2369 }
2370
2371 foreach trim_method {"active" "bg"} {
2372 test "Test cluster module notifications on a successful migration ($trim_method-trim)" {
2373 clear_module_event_log
2374 R 0 debug asm-trim-method $trim_method
2375 R 3 debug asm-trim-method $trim_method
2376 R 6 debug asm-trim-method $trim_method
2377
2378 # Set a key in the slot range
2379 set key [slot_key 0 mykey]
2380 R 0 set $key "value"
2381
2382 # Migrate the slot ranges
2383 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 100 200 300]
2384 wait_for_asm_done
2385
2386 set src_id [R 0 cluster myid]
2387 set dest_id [R 1 cluster myid]
2388
2389 # Verify the events on source, both master and replica
2390 set migrate_event_log [list \
2391 "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
2392 "sub: cluster-slot-migration-migrate-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
2393 ]
2394 assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log
2395 assert_equal [R 3 asm.get_cluster_event_log] {}
2396 assert_equal [R 6 asm.get_cluster_event_log] {}
2397
2398 # Verify the events on destination, both master and replica
2399 set import_event_log [list \
2400 "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
2401 "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100,200-300" \
2402 ]
2403 wait_for_condition 500 20 {
2404 [R 1 asm.get_cluster_event_log] eq $import_event_log &&
2405 [R 4 asm.get_cluster_event_log] eq $import_event_log &&
2406 [R 7 asm.get_cluster_event_log] eq $import_event_log
2407 } else {
2408 puts "R1: [R 1 asm.get_cluster_event_log]"
2409 puts "R4: [R 4 asm.get_cluster_event_log]"
2410 puts "R7: [R 7 asm.get_cluster_event_log]"
2411 fail "ASM import event not received"
2412 }
2413
2414 # Verify the trim events
2415 if {$trim_method eq "active"} {
2416 set trim_event_log [list \
2417 "sub: cluster-slot-migration-trim-started, slots:0-100,200-300" \
2418 "keyspace: key_trimmed, key: $key" \
2419 "sub: cluster-slot-migration-trim-completed, slots:0-100,200-300" \
2420 ]
2421 } else {
2422 set trim_event_log [list \
2423 "sub: cluster-slot-migration-trim-background, slots:0-100,200-300" \
2424 ]
2425 }
2426 wait_for_condition 500 10 {
2427 [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2428 [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2429 [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log
2430 } else {
2431 fail "ASM source trim event not received"
2432 }
2433
2434 # cleanup
2435 R 0 CLUSTER MIGRATION IMPORT 0 100 200 300
2436 wait_for_asm_done
2437 clear_module_event_log
2438 reset_default_trim_method
2439 R 0 flushall
2440 R 1 flushall
2441 }
2442
2443 test "Test cluster module notifications on a failed migration ($trim_method-trim)" {
2444 clear_module_event_log
2445 R 1 debug asm-trim-method $trim_method
2446 R 4 debug asm-trim-method $trim_method
2447 R 7 debug asm-trim-method $trim_method
2448
2449 # Set a key in the slot range
2450 set key [slot_key 0 mykey]
2451 R 0 set $key "value"
2452
2453 # Start migration and cancel it
2454 set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
2455 # Wait until at least one key is moved to destination
2456 wait_for_condition 1000 10 {
2457 [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
2458 } else {
2459 fail "Key not moved to destination"
2460 }
2461 R 1 CLUSTER MIGRATION CANCEL ID $task_id
2462 wait_for_asm_done
2463
2464 set src_id [R 0 cluster myid]
2465 set dest_id [R 1 cluster myid]
2466
2467 # Verify the events on source, both master and replica
2468 set migrate_event_log [list \
2469 "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2470 "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2471 ]
2472 assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log
2473 assert_equal [R 3 asm.get_cluster_event_log] {}
2474 assert_equal [R 6 asm.get_cluster_event_log] {}
2475
2476 # Verify the events on destination, both master and replica
2477 set import_event_log [list \
2478 "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2479 "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2480 ]
2481 wait_for_condition 500 10 {
2482 [R 1 asm.get_cluster_event_log] eq $import_event_log &&
2483 [R 4 asm.get_cluster_event_log] eq $import_event_log &&
2484 [R 7 asm.get_cluster_event_log] eq $import_event_log
2485 } else {
2486 fail "ASM import event not received"
2487 }
2488
2489 # Verify the trim events on destination (partially imported keys are trimmed)
2490 if {$trim_method eq "active"} {
2491 set trim_event_log [list \
2492 "sub: cluster-slot-migration-trim-started, slots:0-100" \
2493 "keyspace: key_trimmed, key: $key" \
2494 "sub: cluster-slot-migration-trim-completed, slots:0-100" \
2495 ]
2496 } else {
2497 set trim_event_log [list \
2498 "sub: cluster-slot-migration-trim-background, slots:0-100" \
2499 ]
2500 }
2501 wait_for_condition 500 10 {
2502 [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2503 [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2504 [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log
2505 } else {
2506 fail "ASM destination trim event not received"
2507 }
2508
2509 # cleanup
2510 clear_module_event_log
2511 reset_default_trim_method
2512 wait_for_asm_done
2513 R 0 flushall
2514 R 1 flushall
2515 }
2516
2517 test "Test cluster module notifications on failover ($trim_method-trim)" {
2518 # NOTE: cluster legacy may have a bug, multiple manual failover will fail,
2519 # so only perform one round of failover test, fix it later
2520 if {$trim_method eq "bg"} {
2521 clear_module_event_log
2522 R 1 debug asm-trim-method $trim_method
2523 R 4 debug asm-trim-method $trim_method
2524 R 7 debug asm-trim-method $trim_method
2525
2526 # Set a key in the slot range
2527 set key [slot_key 0 mykey]
2528 R 0 set $key "value"
2529
2530 # Start migration
2531 set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
2532 # Wait until at least one key is moved to destination
2533 wait_for_condition 1000 10 {
2534 [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
2535 } else {
2536 fail "Key not moved to destination"
2537 }
2538
2539 failover_and_wait_for_done 4
2540 wait_for_asm_done
2541
2542 set src_id [R 0 cluster myid]
2543 set dest_id [R 1 cluster myid]
2544
2545 # Verify the events on source, both master and replica
2546 set migrate_event_log [list \
2547 "sub: cluster-slot-migration-migrate-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2548 "sub: cluster-slot-migration-migrate-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2549 ]
2550 assert_equal [R 0 asm.get_cluster_event_log] $migrate_event_log
2551 assert_equal [R 3 asm.get_cluster_event_log] {}
2552 assert_equal [R 6 asm.get_cluster_event_log] {}
2553
2554 # Verify the events on destination, both master and replica
2555 set import_event_log [list \
2556 "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2557 "sub: cluster-slot-migration-import-failed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2558 ]
2559 wait_for_condition 500 20 {
2560 [R 1 asm.get_cluster_event_log] eq $import_event_log &&
2561 [R 4 asm.get_cluster_event_log] eq $import_event_log &&
2562 [R 7 asm.get_cluster_event_log] eq $import_event_log
2563 } else {
2564 puts "R1: [R 1 asm.get_cluster_event_log]"
2565 puts "R4: [R 4 asm.get_cluster_event_log]"
2566 puts "R7: [R 7 asm.get_cluster_event_log]"
2567 fail "ASM import event not received"
2568 }
2569
2570 # Verify the trim events on destination (partially imported keys are trimmed)
2571 # NOTE: after failover, the new master will initiate the slot trimming,
2572 # and only slot 0 has data, so only slot 0 is trimmed
2573 if {$trim_method eq "active"} {
2574 set trim_event_log [list \
2575 "sub: cluster-slot-migration-trim-started, slots:0-0" \
2576 "keyspace: key_trimmed, key: $key" \
2577 "sub: cluster-slot-migration-trim-completed, slots:0-0" \
2578 ]
2579 } else {
2580 set trim_event_log [list \
2581 "sub: cluster-slot-migration-trim-background, slots:0-0" \
2582 ]
2583 }
2584 wait_for_condition 500 20 {
2585 [R 1 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2586 [R 4 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2587 [R 7 asm.get_cluster_trim_event_log] eq $trim_event_log
2588 } else {
2589 puts "R1: [R 1 asm.get_cluster_trim_event_log]"
2590 puts "R4: [R 4 asm.get_cluster_trim_event_log]"
2591 puts "R7: [R 7 asm.get_cluster_trim_event_log]"
2592 fail "ASM destination trim event not received"
2593 }
2594
2595 # cleanup
2596 failover_and_wait_for_done 1
2597 clear_module_event_log
2598 reset_default_trim_method
2599 R 0 flushall
2600 R 1 flushall
2601 }
2602 }
2603 }
2604
2605 foreach with_rdb {"with" "without"} {
2606 test "Test cluster module notifications when replica restart $with_rdb RDB during importing" {
2607 clear_module_event_log
2608 R 1 debug asm-trim-method $trim_method
2609 R 4 debug asm-trim-method $trim_method
2610 R 7 debug asm-trim-method $trim_method
2611 R 4 config set save ""
2612
2613 set src_id [R 0 cluster myid]
2614 set dest_id [R 1 cluster myid]
2615
2616 # Set a key in the slot range
2617 set key [slot_key 0 mykey]
2618 R 0 set $key "value"
2619
2620 # Start migration, 2s delay
2621 set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
2622 # Wait until at least one key is moved to destination
2623 wait_for_condition 1000 10 {
2624 [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
2625 } else {
2626 fail "Key not moved to destination"
2627 }
2628 wait_for_ofs_sync [Rn 1] [Rn 4]
2629
2630 # restart node 4
2631 if {$with_rdb eq "with"} {
2632 restart_server -4 true false true save ;# rdb save
2633 } else {
2634 restart_server -4 true false true nosave ;# no rdb saved
2635 }
2636 wait_for_cluster_propagation
2637
2638 wait_for_asm_done
2639
2640 # started and completed are paired, and not duplicated
2641 set import_event_log [list \
2642 "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2643 "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2644 ]
2645 wait_for_condition 500 10 {
2646 [R 1 asm.get_cluster_event_log] eq $import_event_log &&
2647 [R 4 asm.get_cluster_event_log] eq $import_event_log &&
2648 [R 7 asm.get_cluster_event_log] eq $import_event_log
2649 } else {
2650 fail "ASM import event not received"
2651 }
2652
2653 R 0 CLUSTER MIGRATION IMPORT 0 100
2654 wait_for_asm_done
2655 R 4 save ;# save an empty rdb to override previous one
2656 clear_module_event_log
2657 reset_default_trim_method
2658 R 0 flushall
2659 R 1 flushall
2660 }
2661 }
2662
2663 test "Test cluster module notifications when replica is disconnected and full resync after importing" {
2664 clear_module_event_log
2665 R 1 debug asm-trim-method $trim_method
2666 R 4 debug asm-trim-method $trim_method
2667 R 7 debug asm-trim-method $trim_method
2668
2669 set src_id [R 0 cluster myid]
2670 set dest_id [R 1 cluster myid]
2671
2672 # Set a key in the slot range
2673 set key [slot_key 0 mykey]
2674 R 0 set $key "value"
2675
2676 # Start migration, 2s delay
2677 set task_id [setup_slot_migration_with_delay 0 1 0 100 0 2000000]
2678 # Wait until at least one key is moved to destination
2679 wait_for_condition 1000 10 {
2680 [scan [regexp -inline {keys\=([\d]*)} [R 1 info keyspace]] keys=%d] >= 1
2681 } else {
2682 fail "Key not moved to destination"
2683 }
2684 wait_for_ofs_sync [Rn 1] [Rn 4]
2685
2686 # puase node-4
2687 set r4_pid [S 4 process_id]
2688 pause_process $r4_pid
2689
2690 # set a small repl-backlog-size and write some commands to make node-4
2691 # full resync when reconnecting after waking up
2692 set r1_full_sync [S 1 sync_full]
2693 R 1 config set repl-backlog-size 16kb
2694 R 1 client kill type replica
2695 set 1k_str [string repeat "a" 1024]
2696 for {set i 0} {$i < 2000} {incr i} {
2697 R 1 set [slot_key 6000] $1k_str
2698 }
2699
2700 # after ASM task is completed, wake up node-4
2701 wait_for_condition 1000 10 {
2702 [CI 1 cluster_slot_migration_active_tasks] == 0 &&
2703 [CI 1 cluster_slot_migration_active_trim_running] == 0
2704 } else {
2705 fail "ASM tasks did not completed"
2706 }
2707 resume_process $r4_pid
2708
2709 # make sure full resync happens
2710 wait_for_sync [Rn 4]
2711 wait_for_ofs_sync [Rn 1] [Rn 4]
2712 assert_morethan [S 1 sync_full] $r1_full_sync
2713
2714 # started and completed are paired, and not duplicated
2715 set import_event_log [list \
2716 "sub: cluster-slot-migration-import-started, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2717 "sub: cluster-slot-migration-import-completed, source_node_id:$src_id, destination_node_id:$dest_id, task_id:$task_id, slots:0-100" \
2718 ]
2719 wait_for_condition 500 10 {
2720 [R 1 asm.get_cluster_event_log] eq $import_event_log &&
2721 [R 4 asm.get_cluster_event_log] eq $import_event_log &&
2722 [R 7 asm.get_cluster_event_log] eq $import_event_log
2723 } else {
2724 fail "ASM import event not received"
2725 }
2726
2727 # since ASM task is completed on node-1 before node-4 reconnects,
2728 # no trim event should be received on node-4
2729 assert_equal {} [R 4 asm.get_cluster_trim_event_log]
2730
2731 R 0 CLUSTER MIGRATION IMPORT 0 100
2732 wait_for_asm_done
2733 clear_module_event_log
2734 reset_default_trim_method
2735 R 0 flushall
2736 R 1 flushall
2737 }
2738
2739 test "Test new master can trim slots when migration is completed and failover occurs on source side" {
2740 R 0 asm.disable_trim ;# can not start slot trimming on source side
2741 set slot0_key [slot_key 0 mykey]
2742 R 0 set $slot0_key "value"
2743
2744 # migrate slot 0 from #0 to #1, and wait it completed, but not allow to trim slots
2745 # on source node
2746 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
2747 wait_for_condition 1000 10 {
2748 [string match {*completed*} [migration_status 0 $task_id state]] &&
2749 [string match {*completed*} [migration_status 1 $task_id state]]
2750 } else {
2751 fail "ASM task did not complete"
2752 }
2753 # verify trim is not allowed on source node, and replica node doesn't have trim job either
2754 wait_for_ofs_sync [Rn 0] [Rn 3]
2755 assert_equal 1 [R 0 asm.trim_in_progress]
2756 assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key]
2757 assert_equal 0 [R 3 asm.trim_in_progress]
2758 assert_equal "value" [R 3 asm.read_pending_trim_key $slot0_key]
2759
2760 set loglines [count_log_lines 0]
2761
2762 # failover happens on source node, instance #3 become slave, #0 become master
2763 failover_and_wait_for_done 3
2764 R 0 asm.enable_trim ;# enable trim on old master
2765
2766 # old master should cancel the pending trim job
2767 wait_for_log_messages 0 {"*Cancelling the pending trim job*"} $loglines 1000 10
2768
2769 wait_for_ofs_sync [Rn 3] [Rn 0]
2770 # verify trim is allowed on new master, and the key is trimmed
2771 wait_for_condition 1000 10 {
2772 [R 3 asm.trim_in_progress] == 0 &&
2773 [R 3 asm.read_pending_trim_key $slot0_key] eq "" &&
2774 [R 0 asm.trim_in_progress] == 0 &&
2775 [R 0 asm.read_pending_trim_key $slot0_key] eq ""
2776 } else {
2777 fail "Trim did not complete"
2778 }
2779
2780 # verify the trim events, use active trim since module is subscribed to trimmed event
2781 set trim_event_log [list \
2782 "sub: cluster-slot-migration-trim-started, slots:0-0" \
2783 "keyspace: key_trimmed, key: $slot0_key" \
2784 "sub: cluster-slot-migration-trim-completed, slots:0-0" \
2785 ]
2786 wait_for_condition 500 20 {
2787 [R 0 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2788 [R 3 asm.get_cluster_trim_event_log] eq $trim_event_log &&
2789 [R 6 asm.get_cluster_trim_event_log] eq $trim_event_log
2790 } else {
2791 fail "ASM destination trim event not received"
2792 }
2793
2794 # cleanup
2795 failover_and_wait_for_done 0
2796 R 0 CLUSTER MIGRATION IMPORT 0 0
2797 wait_for_asm_done
2798 clear_module_event_log
2799 reset_default_trim_method
2800 R 0 flushall
2801 R 1 flushall
2802 }
2803
2804 test "Test module replicates commands at the beginning of slot migration " {
2805 R 0 flushall
2806 R 1 flushall
2807
2808 # Sanity check
2809 assert_equal 0 [R 1 asm.read_keyless_cmd_val]
2810 assert_equal 0 [R 4 asm.read_keyless_cmd_val]
2811
2812 # Enable module command replication and set a key to be replicated
2813 # Module will replicate two commands:
2814 # 1- A keyless command: asm.keyless_cmd
2815 # 2- SET command for the given key and value
2816 set keyname [slot_key 0 modulekey]
2817 R 0 asm.replicate_module_command 1 $keyname "value"
2818
2819 setup_slot_migration_with_delay 0 1 0 100
2820 wait_for_asm_done
2821 wait_for_ofs_sync [Rn 1] [Rn 4]
2822
2823 # Verify the commands are replicated
2824 assert_equal 1 [R 1 asm.read_keyless_cmd_val]
2825 assert_equal value [R 1 get $keyname]
2826
2827 # Verify the commands are replicated to replica
2828 R 4 readonly
2829 assert_equal 1 [R 4 asm.read_keyless_cmd_val]
2830 assert_equal value [R 4 get $keyname]
2831
2832 # cleanup
2833 R 0 asm.replicate_module_command 0 "" ""
2834 R 0 CLUSTER MIGRATION IMPORT 0 100
2835 wait_for_asm_done
2836 R 0 flushall
2837 R 1 flushall
2838 }
2839
2840 test "Test subcommand propagation during slot migration" {
2841 R 0 flushall
2842 R 1 flushall
2843 set task_id [setup_slot_migration_with_delay 0 1 0 100]
2844
2845 set key [slot_key 0 mykey]
2846 R 0 asm.parent set $key "value" ;# execute a module subcommand
2847 wait_for_asm_done
2848 assert_equal "value" [R 1 GET $key]
2849
2850 # cleanup
2851 R 0 cluster migration import 0 100
2852 wait_for_asm_done
2853 }
2854
2855 test "Test trim method selection based on module keyspace subscription" {
2856 R 0 debug asm-trim-method default
2857 R 1 debug asm-trim-method default
2858
2859 R 0 flushall
2860 R 1 flushall
2861
2862 populate_slot 10 -idx 0 -slot 0
2863
2864 # Make sure module is subscribed to NOTIFY_KEY_TRIMMED event. In this
2865 # case, active trim must be used.
2866 R 0 asm.subscribe_trimmed_event 1
2867 set loglines [count_log_lines 0]
2868 R 1 CLUSTER MIGRATION IMPORT 0 15
2869 wait_for_asm_done
2870 wait_for_log_messages 0 {"*Active trim scheduled for slots: 0-15*"} $loglines 1000 10
2871
2872 # Move slots back to node-0. Make sure module is not subscribed to
2873 # NOTIFY_KEY_TRIMMED event. In this case, background trim must be used.
2874 R 1 asm.subscribe_trimmed_event 0
2875 set loglines [count_log_lines -1]
2876 R 0 CLUSTER MIGRATION IMPORT 0 15
2877 wait_for_asm_done
2878 wait_for_log_messages -1 {"*Background trim started for slots: 0-15*"} $loglines 1000 10
2879
2880 # cleanup
2881 wait_for_asm_done
2882 R 0 asm.subscribe_trimmed_event 1
2883 R 1 asm.subscribe_trimmed_event 1
2884 R 0 flushall
2885 R 1 flushall
2886 }
2887
2888 test "Verify trimmed key value can be read in the server event callback" {
2889 R 0 flushall
2890 set key [slot_key 0]
2891 set value "value123random"
2892 R 0 set $key $value
2893
2894 R 1 CLUSTER MIGRATION IMPORT 0 0
2895 wait_for_asm_done
2896 wait_for_condition 1000 10 {
2897 [R 0 asm.get_last_deleted_key] eq "keyevent: key: $key, value: $value"
2898 } else {
2899 fail "Last deleted key event not received"
2900 }
2901
2902 # cleanup
2903 R 0 CLUSTER MIGRATION IMPORT 0 0
2904 wait_for_asm_done
2905 }
2906
2907 test "Verify module cannot open a key in a slot that is being trimmed" {
2908 R 0 flushall
2909 R 0 debug asm-trim-method active -1 ;# disable active trim
2910
2911 set key [slot_key 0]
2912 R 0 set $key value
2913
2914 R 1 CLUSTER MIGRATION IMPORT 0 0
2915 wait_for_condition 1000 10 {
2916 [CI 0 cluster_slot_migration_active_tasks] == 0 &&
2917 [CI 1 cluster_slot_migration_active_tasks] == 0 &&
2918 [CI 0 cluster_slot_migration_active_trim_running] == 1
2919 } else {
2920 fail "migrate failed"
2921 }
2922
2923 # We cannot open the key since it is in a slot being trimmed
2924 assert_equal {} [R 0 asm.get $key]
2925
2926 # cleanup
2927 R 0 debug asm-trim-method default
2928 R 0 CLUSTER MIGRATION IMPORT 0 0
2929 wait_for_asm_done
2930 }
2931
2932 test "Test RM_ClusterGetLocalSlotRanges" {
2933 assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461}}
2934 assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461}}
2935
2936 R 0 cluster migration import 5463 6000
2937 wait_for_asm_done
2938 wait_for_cluster_propagation
2939 assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}}
2940 assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 5461} {5463 6000}}
2941
2942 R 0 cluster migration import 5462 5462 6001 10922
2943 wait_for_asm_done
2944 wait_for_cluster_propagation
2945 assert_equal [R 0 asm.cluster_get_local_slot_ranges] {{0 10922}}
2946 assert_equal [R 3 asm.cluster_get_local_slot_ranges] {{0 10922}}
2947 assert_equal [R 1 asm.cluster_get_local_slot_ranges] {}
2948 assert_equal [R 4 asm.cluster_get_local_slot_ranges] {}
2949 }
2950}
2951
2952set testmodule [file normalize tests/modules/atomicslotmigration.so]
2953
2954start_cluster 2 0 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no appendonly yes]] {
2955 test "TRIMSLOTS in AOF will work synchronously on restart" {
2956 # When TRIMSLOTS is replayed from AOF during restart, it must execute
2957 # synchronously rather than using active trim. This prevents race
2958 # conditions where subsequent AOF commands might operate on keys
2959 # that should have been trimmed.
2960
2961 # Subscribe to key trimmed event to force active trim
2962 R 0 asm.subscribe_trimmed_event 1
2963 populate_slot 1000 -slot 0
2964 populate_slot 1000 -slot 1
2965 R 1 CLUSTER MIGRATION IMPORT 0 0
2966 wait_for_asm_done
2967
2968 # verify active trim is used
2969 assert_equal 1 [CI 0 cluster_slot_migration_stats_active_trim_completed]
2970
2971 # restart server and verify aof is loaded
2972 restart_server 0 yes no yes nosave
2973 assert {[scan [regexp -inline {aof_current_size:([\d]*)} [R 0 info persistence]] aof_current_size=%d] > 0}
2974 wait_for_cluster_state "ok"
2975
2976 # verify TRIMSLOTS in AOF is executed synchronously
2977 assert_equal 0 [CI 0 cluster_slot_migration_stats_active_trim_completed]
2978 assert_equal 1000 [R 0 dbsize]
2979
2980 # cleanup
2981 R 0 CLUSTER MIGRATION IMPORT 0 0
2982 wait_for_asm_done
2983 assert_equal 2000 [R 0 dbsize]
2984 R 0 flushall
2985 R 1 flushall
2986 clear_module_event_log
2987
2988 }
2989
2990 test "Test trim is disabled when module requests it" {
2991 R 0 asm.disable_trim
2992
2993 set slot0_key [slot_key 0 mykey]
2994 R 0 set $slot0_key "value"
2995 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
2996 wait_for_condition 1000 10 {
2997 [string match {*completed*} [migration_status 0 $task_id state]]
2998 } else {
2999 fail "ASM task did not complete"
3000 }
3001 # since we disable trim, the key should still exist on source,
3002 # we can read it with REDISMODULE_OPEN_KEY_ACCESS_TRIMMED flag
3003 assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key]
3004 assert_equal 1 [R 0 asm.trim_in_progress]
3005
3006 # enable trim and verify the key is trimmed
3007 R 0 asm.enable_trim
3008 wait_for_condition 1000 10 {
3009 [R 0 asm.read_pending_trim_key $slot0_key] eq "" &&
3010 [R 0 asm.trim_in_progress] == 0
3011 } else {
3012 fail "Trim did not complete"
3013 }
3014 wait_for_asm_done
3015 R 0 CLUSTER MIGRATION IMPORT 0 0
3016 wait_for_asm_done
3017 clear_module_event_log
3018 }
3019
3020 test "Can not start new asm task when trim is not allowed" {
3021 # start a migration task, wait it completed but not allow to trim slots
3022 R 0 asm.disable_trim
3023 set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
3024 wait_for_condition 1000 10 {
3025 [string match {*completed*} [migration_status 0 $task_id state]]
3026 } else {
3027 fail "ASM task did not complete"
3028 }
3029 # Can not start new migrating task since trim is disabled
3030 set task_id [R 1 CLUSTER MIGRATION IMPORT 1 1]
3031 wait_for_condition 1000 10 {
3032 [string match {*fail*} [migration_status 1 $task_id state]] &&
3033 [string match {*Trim is disabled by module*} [migration_status 1 $task_id last_error]]
3034 } else {
3035 fail "ASM task did not fail"
3036 }
3037 R 0 asm.enable_trim
3038 wait_for_asm_done
3039
3040 # start a migration task, wait it completed but not allow to trim slots
3041 R 0 asm.disable_trim
3042 set task_id [R 1 CLUSTER MIGRATION IMPORT 2 2]
3043 wait_for_condition 1000 10 {
3044 [string match {*completed*} [migration_status 0 $task_id state]]
3045 } else {
3046 fail "ASM task did not complete"
3047 }
3048 set logline [count_log_lines 0]
3049 # Can not start new importing task since trim is disabled
3050 set task_id [R 0 CLUSTER MIGRATION IMPORT 0 1]
3051 wait_for_log_messages 0 {"*Can not start import task*trim is disabled by module*"} $logline 1000 10
3052 R 0 asm.enable_trim
3053 wait_for_asm_done
3054 }
3055}
3056
3057start_server {tags "cluster external:skip"} {
3058 test "Test RM_ClusterGetLocalSlotRanges without cluster" {
3059 r module load $testmodule
3060 assert_equal [r asm.cluster_get_local_slot_ranges] {{0 16383}}
3061 }
3062}
3063}