summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/tests/integration/replication-iothreads.tcl
blob: 12d084526526dabeebca4d3215757c32418daa05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#
# Copyright (c) 2025-Present, Redis Ltd.
# All rights reserved.
#
# Licensed under your choice of (a) the Redis Source Available License 2.0
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
# GNU Affero General Public License v3 (AGPLv3).
#

# Tests for master and slave clients in IO threads

# Helper function to get master client IO thread from INFO replication
proc get_master_client_io_thread {r} {
    return [status $r master_client_io_thread]
}

# Helper function to get slave client IO thread from INFO replication
proc get_slave_client_io_thread {r slave_idx} {
    set info [$r info replication]
    set lines [split $info "\r\n"]

    foreach line $lines {
        if {[string match "slave${slave_idx}:*" $line]} {
            # Parse the slave line to extract io-thread value
            set parts [split $line ","]
            foreach part $parts {
                if {[string match "*io-thread=*" $part]} {
                    set kv [split $part "="]
                    assert_equal [llength $kv] 2
                    return [lindex $kv 1]
                }
            }
        }
    }
    return -1
}

start_server {overrides {io-threads 4 save ""} tags {"iothreads repl network external:skip"}} {
start_server {overrides {io-threads 4 save ""}} {
    set master [srv 0 client]
    set master_host [srv 0 host]
    set master_port [srv 0 port]
    set slave [srv -1 client]

    test {Setup slave} {
        $slave slaveof $master_host $master_port
        wait_for_condition 1000 100 {
            [s -1 master_link_status] eq {up}
        } else {
            fail "Replication not started."
        }
    }

    test {Master client moves to IO thread after sync complete} {
        # Check master client thread assignment (master client is on slave side)
        wait_for_condition 100 100 {
            [get_master_client_io_thread $slave] > 0
        } else {
            fail "Master client was not assigned to IO thread"
        }
    }

    test {Slave client assignment to IO threads} {
        # Verify slave is connected and online
        wait_replica_online $master 0

        # Slave client is connected - force a write so that it's assigned to an
        # IO thread.
        assert_equal "OK" [$master set x x]

        # Check slave client thread assignment
        wait_for_condition 50 100 {
            [get_slave_client_io_thread $master 0] > 0
        } else {
            fail "Slave client was not assigned to IO thread"
        }
    }

    test {WAIT command works with master/slave in IO threads} {
        # Test basic WAIT functionality
        $master set wait_test_key1 value1
        $master set wait_test_key2 value2
        $master incr wait_counter

        assert {[$master wait 1 2000] == 1}

        # Verify data reached slave
        wait_for_condition 10 100 {
            [$slave get wait_test_key1] eq "value1" &&
            [$slave get wait_test_key2] eq "value2" &&
            [$slave get wait_counter] eq "1"
        } else {
            fail "commands not propagated to IO thread slave in time"
        }
    }

    test {Replication data integrity with IO threads} {
        # Generate significant replication traffic
        for {set i 0} {$i < 100} {incr i} {
            $master set bulk_key_$i [string repeat "data" 10]
            $master lpush bulk_list element_$i
            $master zadd bulk_zset $i member_$i
            if {$i % 20 == 0} {
                # Periodically verify WAIT works
                assert {[$master wait 1 2000] == 1}
            }
        }

        # Final verification
        wait_for_condition 50 100 {
            [$slave get bulk_key_99] eq [string repeat "data" 10] &&
            [$slave llen bulk_list] eq 100 &&
            [$slave zcard bulk_zset] eq 100
        } else {
            fail "Replication data integrity failed"
        }
    }

    test {WAIT timeout behavior with slave in IO thread} {
        set slave_pid [srv -1 pid]

        # Pause slave to test timeout
        pause_process $slave_pid

        # Should timeout and return 0 acks
        $master set timeout_test_key timeout_value
        set start_time [clock milliseconds]
        assert {[$master wait 1 2000] == 0}
        set elapsed [expr {[clock milliseconds] - $start_time}]
        assert_range $elapsed 2000 2500

        # Resume and verify recovery
        resume_process $slave_pid

        assert {[$master wait 1 2000] == 1}

        # Verify data reached slave after resume
        wait_for_condition 10 100 {
            [$slave get timeout_test_key] eq "timeout_value"
        } else {
            fail "commands not propagated to IO thread slave in time"
        }
    }

    test {Network interruption recovery with IO threads} {
        # Generate traffic before interruption
        for {set i 0} {$i < 50} {incr i} {
            $master set pre_interrupt_$i value_$i
        }

        # Simulate network interruption
        pause_process $slave_pid

        # Continue writing during interruption
        for {set i 0} {$i < 50} {incr i} {
            $master set during_interrupt_$i value_$i
        }

        # WAIT should timeout
        assert {[$master wait 1 2000] == 0}

        # Resume slave and verify recovery
        resume_process $slave_pid

        # Verify WAIT works again
        assert {[$master wait 1 2000] == 1}

        # Wait for reconnection and catch up
        wait_for_condition 100 100 {
            [$slave get during_interrupt_49] eq "value_49"
        } else {
            fail "Slave didn't catch up after network recovery"
        }

        $master set post_recovery_test recovery_value
        wait_for_condition 10 100 {
          [$slave get post_recovery_test] eq "recovery_value"
        } else {
          fail "Slave didn't receive 'set post_recovery_test' command"
        }

        # Check thread assignments after recovery
        wait_for_condition 100 100 {
            [get_master_client_io_thread $slave] > 0
        } else {
            fail "Slave client not assigned to IO thread after recovery"
        }
    }

    test {Replication reconnection cycles with IO threads} {
        # Test multiple disconnect/reconnect cycles
        for {set cycle 0} {$cycle < 3} {incr cycle} {
            # Generate traffic
            for {set i 0} {$i < 20} {incr i} {
                $master set cycle_${cycle}_key_$i value_$i
            }

            assert {[$master wait 1 2000] == 1}

            # Record thread assignments during cycle
            set master_thread [get_master_client_io_thread $slave]
            set slave_thread [get_slave_client_io_thread $master 0]
            puts "Cycle $cycle - Master thread: $master_thread, Slave thread: $slave_thread"

            # Disconnect and reconnect (except last cycle)
            if {$cycle < 2} {
                $slave replicaof no one
                after 100
                $slave replicaof $master_host $master_port
                wait_for_sync $slave
            }
        }

        # Verify final state
        wait_for_condition 10 100 {
            [$slave get cycle_2_key_19] eq "value_19"
        } else {
            fail "last command not propagated to IO thread slave in time"
        }
    }

    test {INFO replication shows correct thread information} {
        # Test INFO replication output format
        set info [$master info replication]

        # Should show master role
        assert_match "*role:master*" $info

        # Should have slave thread information
        assert_match "*slave0:*io-thread=*" $info

        # Test we can parse the thread ID
        set slave_thread [get_slave_client_io_thread $master 0]
        assert_morethan $slave_thread 0

        # Test master client thread info
        set slave_info [$slave info replication]
        assert_match "*role:slave*" $slave_info
        assert_match "*master_client_io_thread:*" $slave_info

        set master_thread [get_master_client_io_thread $slave]
        assert_morethan $master_thread 0
    }
}
}

start_server {overrides {io-threads 4 save ""} tags {"iothreads repl network external:skip"}} {
start_server {overrides {io-threads 4 save ""}} {
start_server {overrides {io-threads 4 save ""}} {
start_server {overrides {io-threads 4 save ""}} {
    set master [srv 0 client]
    set master_host [srv 0 host]
    set master_port [srv 0 port]
    set slave1 [srv -1 client]
    set slave2 [srv -2 client]
    set slave3 [srv -3 client]

    test {Multiple slaves across IO threads} {
        # Setup replication for all slaves
        $slave1 replicaof $master_host $master_port
        $slave2 replicaof $master_host $master_port
        $slave3 replicaof $master_host $master_port

        # Wait for all slaves to be online
        wait_replica_online $master 0
        wait_replica_online $master 1
        wait_replica_online $master 2

        set iterations 5
        while {[incr iterations -1] >= 0} {
            # Slave clients are connected - force a write so that they are assigned
            # to IO threads.
            assert_equal "OK" [$master set x x]

            wait_for_condition 10 100 {
                ([get_slave_client_io_thread $master 0] > 0) &&
                ([get_slave_client_io_thread $master 1] > 0) &&
                ([get_slave_client_io_thread $master 2] > 0)
            } else {
                continue
            }

            break
        }
        if {$iterations < 0} {
            fail "Replicas failed to be assigned to IO threads in time"
        }

        # Test concurrent replication to all slaves
        for {set i 0} {$i < 200} {incr i} {
            $master set multi_key_$i value_$i
            if {$i % 50 == 0} {
                assert {[$master wait 3 2000] == 3}
            }
        }

        # Final verification all slaves got data
        wait_for_condition 50 100 {
            [$slave1 get multi_key_199] eq "value_199" &&
            [$slave2 get multi_key_199] eq "value_199" &&
            [$slave3 get multi_key_199] eq "value_199"
        } else {
            fail "Multi-slave replication failed"
        }
    }

    test {WAIT with multiple slaves in IO threads} {
        # Test various WAIT scenarios
        $master set wait_multi_test1 value1
        assert {[$master wait 3 2000] == 3}

        $master set wait_multi_test2 value2
        assert {[$master wait 2 2000] >= 2}

        $master set wait_multi_test3 value3
        assert {[$master wait 1 2000] >= 1}

        # Verify all slaves have the data
        wait_for_condition 10 100 {
            [$slave1 get wait_multi_test3] eq "value3" &&
            [$slave2 get wait_multi_test3] eq "value3" &&
            [$slave3 get wait_multi_test3] eq "value3"
        } else {
            fail "commands not propagated to io thread slaves in time"
        }
    }
}
}
}
}