diff options
Diffstat (limited to 'examples/redis-unstable/tests/unit/cluster/links.tcl')
| -rw-r--r-- | examples/redis-unstable/tests/unit/cluster/links.tcl | 292 |
1 files changed, 292 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/unit/cluster/links.tcl b/examples/redis-unstable/tests/unit/cluster/links.tcl new file mode 100644 index 0000000..a202c37 --- /dev/null +++ b/examples/redis-unstable/tests/unit/cluster/links.tcl | |||
| @@ -0,0 +1,292 @@ | |||
| 1 | proc get_links_with_peer {this_instance_id peer_nodename} { | ||
| 2 | set links [R $this_instance_id cluster links] | ||
| 3 | set links_with_peer {} | ||
| 4 | foreach l $links { | ||
| 5 | if {[dict get $l node] eq $peer_nodename} { | ||
| 6 | lappend links_with_peer $l | ||
| 7 | } | ||
| 8 | } | ||
| 9 | return $links_with_peer | ||
| 10 | } | ||
| 11 | |||
| 12 | # Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that | ||
| 13 | # corresponds to the link established toward a peer identified by `peer_nodename` | ||
| 14 | proc get_link_to_peer {this_instance_id peer_nodename} { | ||
| 15 | set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] | ||
| 16 | foreach l $links_with_peer { | ||
| 17 | if {[dict get $l direction] eq "to"} { | ||
| 18 | return $l | ||
| 19 | } | ||
| 20 | } | ||
| 21 | return {} | ||
| 22 | } | ||
| 23 | |||
| 24 | # Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that | ||
| 25 | # corresponds to the link accepted from a peer identified by `peer_nodename` | ||
| 26 | proc get_link_from_peer {this_instance_id peer_nodename} { | ||
| 27 | set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] | ||
| 28 | foreach l $links_with_peer { | ||
| 29 | if {[dict get $l direction] eq "from"} { | ||
| 30 | return $l | ||
| 31 | } | ||
| 32 | } | ||
| 33 | return {} | ||
| 34 | } | ||
| 35 | |||
| 36 | # Reset cluster links to their original state | ||
| 37 | proc reset_links {id} { | ||
| 38 | set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1] | ||
| 39 | |||
| 40 | # Set a 1 byte limit and wait for cluster cron to run | ||
| 41 | # (executes every 100ms) and terminate links | ||
| 42 | R $id CONFIG SET cluster-link-sendbuf-limit 1 | ||
| 43 | after 150 | ||
| 44 | |||
| 45 | # Reset limit | ||
| 46 | R $id CONFIG SET cluster-link-sendbuf-limit $limit | ||
| 47 | |||
| 48 | # Wait until the cluster links come back up for each node | ||
| 49 | wait_for_condition 50 100 { | ||
| 50 | [number_of_links $id] == [expr [number_of_peers $id] * 2] | ||
| 51 | } else { | ||
| 52 | fail "Cluster links did not come back up" | ||
| 53 | } | ||
| 54 | } | ||
| 55 | |||
| 56 | proc number_of_peers {id} { | ||
| 57 | expr [llength $::servers] - 1 | ||
| 58 | } | ||
| 59 | |||
| 60 | proc number_of_links {id} { | ||
| 61 | llength [R $id cluster links] | ||
| 62 | } | ||
| 63 | |||
| 64 | proc publish_messages {server num_msgs msg_size} { | ||
| 65 | for {set i 0} {$i < $num_msgs} {incr i} { | ||
| 66 | $server PUBLISH channel [string repeat "x" $msg_size] | ||
| 67 | } | ||
| 68 | } | ||
| 69 | |||
| 70 | start_cluster 1 2 {tags {external:skip cluster}} { | ||
| 71 | set primary_id 0 | ||
| 72 | set replica1_id 1 | ||
| 73 | |||
| 74 | set primary [Rn $primary_id] | ||
| 75 | set replica1 [Rn $replica1_id] | ||
| 76 | |||
| 77 | test "Broadcast message across a cluster shard while a cluster link is down" { | ||
| 78 | set replica1_node_id [$replica1 CLUSTER MYID] | ||
| 79 | |||
| 80 | set channelname ch3 | ||
| 81 | |||
| 82 | # subscribe on replica1 | ||
| 83 | set subscribeclient1 [redis_deferring_client -1] | ||
| 84 | $subscribeclient1 deferred 1 | ||
| 85 | $subscribeclient1 SSUBSCRIBE $channelname | ||
| 86 | $subscribeclient1 read | ||
| 87 | |||
| 88 | # subscribe on replica2 | ||
| 89 | set subscribeclient2 [redis_deferring_client -2] | ||
| 90 | $subscribeclient2 deferred 1 | ||
| 91 | $subscribeclient2 SSUBSCRIBE $channelname | ||
| 92 | $subscribeclient2 read | ||
| 93 | |||
| 94 | # Verify number of links with cluster stable state | ||
| 95 | assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id] | ||
| 96 | |||
| 97 | # Disconnect the cluster between primary and replica1 and publish a message. | ||
| 98 | $primary MULTI | ||
| 99 | $primary DEBUG CLUSTERLINK KILL TO $replica1_node_id | ||
| 100 | $primary SPUBLISH $channelname hello | ||
| 101 | set res [$primary EXEC] | ||
| 102 | |||
| 103 | # Verify no client exists on the primary to receive the published message. | ||
| 104 | assert_equal $res {OK 0} | ||
| 105 | |||
| 106 | # Wait for all the cluster links are healthy | ||
| 107 | wait_for_condition 50 100 { | ||
| 108 | [number_of_peers $primary_id]*2 == [number_of_links $primary_id] | ||
| 109 | } else { | ||
| 110 | fail "All peer links couldn't be established" | ||
| 111 | } | ||
| 112 | |||
| 113 | # Publish a message afterwards. | ||
| 114 | $primary SPUBLISH $channelname world | ||
| 115 | |||
| 116 | # Verify replica1 has received only (world) / hello is lost. | ||
| 117 | assert_equal "smessage ch3 world" [$subscribeclient1 read] | ||
| 118 | |||
| 119 | # Verify replica2 has received both messages (hello/world) | ||
| 120 | assert_equal "smessage ch3 hello" [$subscribeclient2 read] | ||
| 121 | assert_equal "smessage ch3 world" [$subscribeclient2 read] | ||
| 122 | } {} {needs:debug} | ||
| 123 | } | ||
| 124 | |||
| 125 | start_cluster 3 0 {tags {external:skip cluster}} { | ||
| 126 | test "Each node has two links with each peer" { | ||
| 127 | for {set id 0} {$id < [llength $::servers]} {incr id} { | ||
| 128 | # Assert that from point of view of each node, there are two links for | ||
| 129 | # each peer. It might take a while for cluster to stabilize so wait up | ||
| 130 | # to 5 seconds. | ||
| 131 | wait_for_condition 50 100 { | ||
| 132 | [number_of_peers $id]*2 == [number_of_links $id] | ||
| 133 | } else { | ||
| 134 | assert_equal [expr [number_of_peers $id]*2] [number_of_links $id] | ||
| 135 | } | ||
| 136 | |||
| 137 | set nodes [get_cluster_nodes $id] | ||
| 138 | set links [R $id cluster links] | ||
| 139 | |||
| 140 | # For each peer there should be exactly one | ||
| 141 | # link "to" it and one link "from" it. | ||
| 142 | foreach n $nodes { | ||
| 143 | if {[cluster_has_flag $n myself]} continue | ||
| 144 | set peer [dict get $n id] | ||
| 145 | set to 0 | ||
| 146 | set from 0 | ||
| 147 | foreach l $links { | ||
| 148 | if {[dict get $l node] eq $peer} { | ||
| 149 | if {[dict get $l direction] eq "to"} { | ||
| 150 | incr to | ||
| 151 | } elseif {[dict get $l direction] eq "from"} { | ||
| 152 | incr from | ||
| 153 | } | ||
| 154 | } | ||
| 155 | } | ||
| 156 | assert {$to eq 1} | ||
| 157 | assert {$from eq 1} | ||
| 158 | } | ||
| 159 | } | ||
| 160 | } | ||
| 161 | |||
| 162 | test {Validate cluster links format} { | ||
| 163 | set lines [R 0 cluster links] | ||
| 164 | foreach l $lines { | ||
| 165 | if {$l eq {}} continue | ||
| 166 | assert_equal [llength $l] 12 | ||
| 167 | assert_equal 1 [dict exists $l "direction"] | ||
| 168 | assert_equal 1 [dict exists $l "node"] | ||
| 169 | assert_equal 1 [dict exists $l "create-time"] | ||
| 170 | assert_equal 1 [dict exists $l "events"] | ||
| 171 | assert_equal 1 [dict exists $l "send-buffer-allocated"] | ||
| 172 | assert_equal 1 [dict exists $l "send-buffer-used"] | ||
| 173 | } | ||
| 174 | } | ||
| 175 | |||
| 176 | set primary1_id 0 | ||
| 177 | set primary2_id 1 | ||
| 178 | |||
| 179 | set primary1 [Rn $primary1_id] | ||
| 180 | set primary2 [Rn $primary2_id] | ||
| 181 | |||
| 182 | test "Disconnect link when send buffer limit reached" { | ||
| 183 | # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts | ||
| 184 | set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1] | ||
| 185 | $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000] | ||
| 186 | |||
| 187 | # Get primary1's links with primary2 | ||
| 188 | set primary2_name [dict get [cluster_get_myself $primary2_id] id] | ||
| 189 | set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] | ||
| 190 | set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] | ||
| 191 | |||
| 192 | # On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be | ||
| 193 | # overflowed by regular gossip messages but also small enough that it doesn't take too much | ||
| 194 | # memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this | ||
| 195 | # limit is overflowed in some RAM-limited test environments. | ||
| 196 | set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1] | ||
| 197 | $primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024] | ||
| 198 | assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0} | ||
| 199 | |||
| 200 | # To manufacture an ever-growing send buffer from primary1 to primary2, | ||
| 201 | # make primary2 unresponsive. | ||
| 202 | set primary2_pid [srv [expr -1*$primary2_id] pid] | ||
| 203 | pause_process $primary2_pid | ||
| 204 | |||
| 205 | # On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from | ||
| 206 | # primary1 to primary2 exceeds buffer limit therefore be dropped. | ||
| 207 | # For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP | ||
| 208 | # receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB | ||
| 209 | # messages should be sufficient. | ||
| 210 | set i 0 | ||
| 211 | wait_for_condition 100 0 { | ||
| 212 | [catch {incr i} e] == 0 && | ||
| 213 | [catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 && | ||
| 214 | [catch {after 500} e] == 0 && | ||
| 215 | [CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1 | ||
| 216 | } else { | ||
| 217 | fail "Cluster link not freed as expected" | ||
| 218 | } | ||
| 219 | |||
| 220 | # A new link to primary2 should have been recreated | ||
| 221 | set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] | ||
| 222 | assert {[dict get $new_link_p1_to_p2 create-time] > [dict get $orig_link_p1_to_p2 create-time]} | ||
| 223 | |||
| 224 | # Link from primary2 should not be affected | ||
| 225 | set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] | ||
| 226 | assert {[dict get $same_link_p1_from_p2 create-time] eq [dict get $orig_link_p1_from_p2 create-time]} | ||
| 227 | |||
| 228 | # Revive primary2 | ||
| 229 | resume_process $primary2_pid | ||
| 230 | |||
| 231 | # Reset configs on primary1 so config changes don't leak out to other tests | ||
| 232 | $primary1 CONFIG set cluster-node-timeout $oldtimeout | ||
| 233 | $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit | ||
| 234 | |||
| 235 | reset_links $primary1_id | ||
| 236 | } | ||
| 237 | |||
| 238 | test "Link memory increases with publishes" { | ||
| 239 | set server_id 0 | ||
| 240 | set server [Rn $server_id] | ||
| 241 | set msg_size 10000 | ||
| 242 | set num_msgs 10 | ||
| 243 | |||
| 244 | # Remove any sendbuf limit | ||
| 245 | $primary1 CONFIG set cluster-link-sendbuf-limit 0 | ||
| 246 | |||
| 247 | # Publish ~100KB to one of the servers | ||
| 248 | $server MULTI | ||
| 249 | $server INFO memory | ||
| 250 | publish_messages $server $num_msgs $msg_size | ||
| 251 | $server INFO memory | ||
| 252 | set res [$server EXEC] | ||
| 253 | |||
| 254 | set link_mem_before_pubs [getInfoProperty $res mem_cluster_links] | ||
| 255 | |||
| 256 | # Remove the first half of the response string which contains the | ||
| 257 | # first "INFO memory" results and search for the property again | ||
| 258 | set res [string range $res [expr [string length $res] / 2] end] | ||
| 259 | set link_mem_after_pubs [getInfoProperty $res mem_cluster_links] | ||
| 260 | |||
| 261 | # We expect the memory to have increased by more than | ||
| 262 | # the culmulative size of the publish messages | ||
| 263 | set mem_diff_floor [expr $msg_size * $num_msgs] | ||
| 264 | set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs] | ||
| 265 | assert {$mem_diff > $mem_diff_floor} | ||
| 266 | |||
| 267 | # Reset links to ensure no leftover data for the next test | ||
| 268 | reset_links $server_id | ||
| 269 | } | ||
| 270 | |||
| 271 | test "Link memory resets after publish messages flush" { | ||
| 272 | set server [Rn 0] | ||
| 273 | set msg_size 100000 | ||
| 274 | set num_msgs 10 | ||
| 275 | |||
| 276 | set link_mem_before [status $server mem_cluster_links] | ||
| 277 | |||
| 278 | # Publish ~1MB to one of the servers | ||
| 279 | $server MULTI | ||
| 280 | publish_messages $server $num_msgs $msg_size | ||
| 281 | $server EXEC | ||
| 282 | |||
| 283 | # Wait until the cluster link memory has returned to below the pre-publish value. | ||
| 284 | # We can't guarantee it returns to the exact same value since gossip messages | ||
| 285 | # can cause the values to fluctuate. | ||
| 286 | wait_for_condition 1000 500 { | ||
| 287 | [status $server mem_cluster_links] <= $link_mem_before | ||
| 288 | } else { | ||
| 289 | fail "Cluster link memory did not settle back to expected range" | ||
| 290 | } | ||
| 291 | } | ||
| 292 | } | ||
