diff options
Diffstat (limited to 'examples/redis-unstable/src/bitops.c')
| -rw-r--r-- | examples/redis-unstable/src/bitops.c | 2037 |
1 files changed, 2037 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/bitops.c b/examples/redis-unstable/src/bitops.c new file mode 100644 index 0000000..7a3d9f9 --- /dev/null +++ b/examples/redis-unstable/src/bitops.c @@ -0,0 +1,2037 @@ +/* Bit operations. + * + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" +#include "ctype.h" + +#ifdef HAVE_AVX2 +/* Define __MM_MALLOC_H to prevent importing the memory aligned + * allocation functions, which we don't use. */ +#define __MM_MALLOC_H +#include <immintrin.h> +#endif + +#ifdef HAVE_AVX512 +/* Define __MM_MALLOC_H to prevent importing the memory aligned + * allocation functions, which we don't use. */ +#define __MM_MALLOC_H +#include <immintrin.h> +#endif + +#ifdef HAVE_AARCH64_NEON +#include <arm_neon.h> +#endif + +#ifdef HAVE_AVX2 +#define BITOP_USE_AVX2 (__builtin_cpu_supports("avx2")) +#else +#define BITOP_USE_AVX2 0 +#endif + +/* AArch64 NEON support is determined at compile time via HAVE_AARCH64_NEON */ +#ifdef HAVE_AVX512 +#define BITOP_USE_AVX512 (__builtin_cpu_supports("avx512f") && __builtin_cpu_supports("avx512vpopcntdq")) +#else +#define BITOP_USE_AVX512 0 +#endif + + +/* ----------------------------------------------------------------------------- + * Helpers and low level bit functions. + * -------------------------------------------------------------------------- */ + + /* Shared lookup table for bit counting - maps each byte value to its popcount */ +static const uint8_t bitsinbyte[256] = { + #define B2(n) n, n+1, n+1, n+2 + #define B4(n) B2(n), B2(n+1), B2(n+1), B2(n+2) + #define B6(n) B4(n), B4(n+1), B4(n+1), B4(n+2) + B6(0), B6(1), B6(1), B6(2) + #undef B6 + #undef B4 + #undef B2 +}; + +/* Count number of bits set in the binary array pointed by 's' and long + * 'count' bytes. The implementation of this function is required to + * work with an input string length up to 512 MB or more (server.proto_max_bulk_len) */ +ATTRIBUTE_TARGET_POPCNT +long long redisPopcount(void *s, long count) { + long long bits = 0; + unsigned char *p = s; + uint32_t *p4; +#if defined(HAVE_POPCNT) + int use_popcnt = __builtin_cpu_supports("popcnt"); /* Check if CPU supports POPCNT instruction. */ +#else + int use_popcnt = 0; /* Assume CPU does not support POPCNT if + * __builtin_cpu_supports() is not available. */ +#endif + /* Count initial bytes not aligned to 64-bit when using the POPCNT instruction, + * otherwise align to 32-bit. */ + int align = use_popcnt ? 7 : 3; + while ((unsigned long)p & align && count) { + bits += bitsinbyte[*p++]; + count--; + } + + if (likely(use_popcnt)) { + /* Use separate counters to make the CPU think there are no + * dependencies between these popcnt operations. */ + uint64_t cnt[4]; + memset(cnt, 0, sizeof(cnt)); + + /* Count bits 32 bytes at a time by using popcnt. + * Unroll the loop to avoid the overhead of a single popcnt per iteration, + * allowing the CPU to extract more instruction-level parallelism. + * Reference: https://danluu.com/assembly-intrinsics/ */ + while (count >= 32) { + cnt[0] += __builtin_popcountll(*(uint64_t*)(p)); + cnt[1] += __builtin_popcountll(*(uint64_t*)(p + 8)); + cnt[2] += __builtin_popcountll(*(uint64_t*)(p + 16)); + cnt[3] += __builtin_popcountll(*(uint64_t*)(p + 24)); + count -= 32; + p += 32; + /* Prefetch with 2K stride is just enough to overlap L3 miss latency effectively + * without causing pressure on lower memory hierarchy or polluting L1/L2 */ + redis_prefetch_read(p + 2048); + } + bits += cnt[0] + cnt[1] + cnt[2] + cnt[3]; + goto remain; + } + + /* Count bits 28 bytes at a time */ + p4 = (uint32_t*)p; + while(count>=28) { + uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7; + + aux1 = *p4++; + aux2 = *p4++; + aux3 = *p4++; + aux4 = *p4++; + aux5 = *p4++; + aux6 = *p4++; + aux7 = *p4++; + count -= 28; + + aux1 = aux1 - ((aux1 >> 1) & 0x55555555); + aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333); + aux2 = aux2 - ((aux2 >> 1) & 0x55555555); + aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333); + aux3 = aux3 - ((aux3 >> 1) & 0x55555555); + aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333); + aux4 = aux4 - ((aux4 >> 1) & 0x55555555); + aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333); + aux5 = aux5 - ((aux5 >> 1) & 0x55555555); + aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333); + aux6 = aux6 - ((aux6 >> 1) & 0x55555555); + aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333); + aux7 = aux7 - ((aux7 >> 1) & 0x55555555); + aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333); + bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) + + ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) + + ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) + + ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) + + ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) + + ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) + + ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24; + } + p = (unsigned char*)p4; + +remain: + /* Count the remaining bytes. */ + while(count--) bits += bitsinbyte[*p++]; + return bits; +} + +#ifdef HAVE_AARCH64_NEON +/* AArch64 optimized popcount implementation. + * Processes the input bitmap using four NEON vector accumulators in parallel + * to improve instruction-level parallelism and reduce the frequency of + * scalar reductions. Each accumulator holds 16-bit partial sums that are + * combined only once per large block (128 bytes), minimizing data movement. + * + * Benchmark results show this approach outperforms 2-lane implementations + * and matches or exceeds 8-lane versions in throughput, while avoiding + * register pressure and keeping the backend pipeline fully utilized. + * + * This function is now memory bound on large bitmaps, as confirmed by perf + * profiling, with backend stalls dominated by L1/L2 data cache refills. + */ +long long redisPopCountAarch64(void *s, long count) { + long long bits = 0; + const uint8_t *p = (const uint8_t*)s; + + /* Align */ + while (((uintptr_t)p & 15) && count) { + bits += bitsinbyte[*p++]; + count--; + } + + /* Four vector accumulators of u16 (pairwise-accumulated byte counts). */ + uint16x8_t acc0 = vdupq_n_u16(0); + uint16x8_t acc1 = vdupq_n_u16(0); + uint16x8_t acc2 = vdupq_n_u16(0); + uint16x8_t acc3 = vdupq_n_u16(0); + + /* Process 128B per loop to amortize reductions. */ + while (count >= 128) { + uint8x16_t d0 = vld1q_u8(p + 0); + uint8x16_t d1 = vld1q_u8(p + 16); + uint8x16_t d2 = vld1q_u8(p + 32); + uint8x16_t d3 = vld1q_u8(p + 48); + uint8x16_t d4 = vld1q_u8(p + 64); + uint8x16_t d5 = vld1q_u8(p + 80); + uint8x16_t d6 = vld1q_u8(p + 96); + uint8x16_t d7 = vld1q_u8(p +112); + + /* Per-byte popcount */ + uint8x16_t c0 = vcntq_u8(d0); + uint8x16_t c1 = vcntq_u8(d1); + uint8x16_t c2 = vcntq_u8(d2); + uint8x16_t c3 = vcntq_u8(d3); + uint8x16_t c4 = vcntq_u8(d4); + uint8x16_t c5 = vcntq_u8(d5); + uint8x16_t c6 = vcntq_u8(d6); + uint8x16_t c7 = vcntq_u8(d7); + + /* Pairwise widen-add with accumulation: u8 -> u16, stay in vectors */ + acc0 = vpadalq_u8(acc0, c0); + acc1 = vpadalq_u8(acc1, c1); + acc2 = vpadalq_u8(acc2, c2); + acc3 = vpadalq_u8(acc3, c3); + + acc0 = vpadalq_u8(acc0, c4); + acc1 = vpadalq_u8(acc1, c5); + acc2 = vpadalq_u8(acc2, c6); + acc3 = vpadalq_u8(acc3, c7); + + p += 128; + count -= 128; + } + + /* Reduce vector accumulators to scalar once. */ + uint32x4_t s0 = vpaddlq_u16(acc0); + uint32x4_t s1 = vpaddlq_u16(acc1); + uint32x4_t s2 = vpaddlq_u16(acc2); + uint32x4_t s3 = vpaddlq_u16(acc3); + uint32x4_t s01 = vaddq_u32(s0, s1); + uint32x4_t s23 = vaddq_u32(s2, s3); + uint32x4_t st = vaddq_u32(s01, s23); + uint64x2_t s64 = vpaddlq_u32(st); + bits += (long long)(vgetq_lane_u64(s64, 0) + vgetq_lane_u64(s64, 1)); + + /* Remaining 64B blocks (keep vector domain) */ + while (count >= 64) { + uint8x16_t d0 = vld1q_u8(p + 0); + uint8x16_t d1 = vld1q_u8(p + 16); + uint8x16_t d2 = vld1q_u8(p + 32); + uint8x16_t d3 = vld1q_u8(p + 48); + + uint8x16_t c0 = vcntq_u8(d0); + uint8x16_t c1 = vcntq_u8(d1); + uint8x16_t c2 = vcntq_u8(d2); + uint8x16_t c3 = vcntq_u8(d3); + + uint64x2_t t0 = vpaddlq_u32(vpaddlq_u16(vpaddlq_u8(c0))); + uint64x2_t t1 = vpaddlq_u32(vpaddlq_u16(vpaddlq_u8(c1))); + uint64x2_t t2 = vpaddlq_u32(vpaddlq_u16(vpaddlq_u8(c2))); + uint64x2_t t3 = vpaddlq_u32(vpaddlq_u16(vpaddlq_u8(c3))); + + uint64x2_t s = vaddq_u64(vaddq_u64(t0, t1), vaddq_u64(t2, t3)); + bits += (long long)(vgetq_lane_u64(s, 0) + vgetq_lane_u64(s, 1)); + + p += 64; + count -= 64; + } + + /* 16B chunks */ + while (count >= 16) { + uint8x16_t d = vld1q_u8(p); + uint64x2_t s = vpaddlq_u32(vpaddlq_u16(vpaddlq_u8(vcntq_u8(d)))); + bits += (long long)(vgetq_lane_u64(s, 0) + vgetq_lane_u64(s, 1)); + p += 16; + count -= 16; + } + + /* Tail */ + while (count--) bits += bitsinbyte[*p++]; + + return bits; +} +#endif + +#ifdef HAVE_AVX512 +/* AVX512 optimized version of redisPopcount using VPOPCNTDQ instruction. + * This function requires AVX512F and AVX512VPOPCNTDQ support. */ +ATTRIBUTE_TARGET_AVX512_POPCOUNT +long long redisPopCountAvx512(void *s, long count) { + long long bits = 0; + unsigned char *p = s; + + /* Align to 64-byte boundary for optimal AVX512 performance */ + while ((unsigned long)p & 63 && count) { + bits += bitsinbyte[*p++]; + count--; + } + + /* Process 64 bytes at a time using AVX512 */ + while (count >= 64) { + __m512i data = _mm512_loadu_si512((__m512i*)p); + __m512i popcnt = _mm512_popcnt_epi64(data); + + /* Sum all 8 64-bit popcount results */ + bits += _mm512_reduce_add_epi64(popcnt); + + p += 64; + count -= 64; + + /* Prefetch next cache line */ + redis_prefetch_read(p + 2048); + } + + /* Handle remaining bytes with scalar popcount */ + while (count >= 8) { + bits += __builtin_popcountll(*(uint64_t*)p); + p += 8; + count -= 8; + } + + /* Handle final bytes */ + while (count--) { + bits += bitsinbyte[*p++]; + } + + return bits; +} +#endif + +#ifdef HAVE_AVX2 +/* AVX2 optimized version of redisPopcount. + * This function requires AVX2 and POPCNT support. */ +ATTRIBUTE_TARGET_AVX2_POPCOUNT +long long redisPopCountAvx2(void *s, long count) { + long long bits = 0; + unsigned char *p = s; + + /* Align to 8-byte boundary for 64-bit operations */ + while ((unsigned long)p & 7 && count) { + bits += bitsinbyte[*p++]; + count--; + } + + /* Use separate counters to avoid dependencies, similar to regular redisPopcount */ + uint64_t cnt[4]; + memset(cnt, 0, sizeof(cnt)); + + /* Process 32 bytes at a time using POPCNT on 64-bit chunks */ + while (count >= 32) { + cnt[0] += __builtin_popcountll(*(uint64_t*)(p)); + cnt[1] += __builtin_popcountll(*(uint64_t*)(p + 8)); + cnt[2] += __builtin_popcountll(*(uint64_t*)(p + 16)); + cnt[3] += __builtin_popcountll(*(uint64_t*)(p + 24)); + + p += 32; + count -= 32; + + /* Prefetch next cache line */ + redis_prefetch_read(p + 2048); + } + + bits += cnt[0] + cnt[1] + cnt[2] + cnt[3]; + + /* Handle remaining bytes with scalar popcount */ + while (count >= 8) { + bits += __builtin_popcountll(*(uint64_t*)p); + p += 8; + count -= 8; + } + + /* Handle final bytes */ + while (count--) { + bits += bitsinbyte[*p++]; + } + + return bits; +} +#endif + +/* Automatically select the best available popcount implementation */ +static inline long long redisPopcountAuto(const unsigned char *p, long count) { +#ifdef HAVE_AVX512 + if (BITOP_USE_AVX512) { + return redisPopCountAvx512((void*)p, count); + } +#endif +#ifdef HAVE_AVX2 + if (BITOP_USE_AVX2) { + return redisPopCountAvx2((void*)p, count); + } +#endif +#ifdef HAVE_AARCH64_NEON + return redisPopCountAarch64((void*)p, count); +#else + return redisPopcount((void*)p, count); +#endif +} + +/* Return the position of the first bit set to one (if 'bit' is 1) or + * zero (if 'bit' is 0) in the bitmap starting at 's' and long 'count' bytes. + * + * The function is guaranteed to return a value >= 0 if 'bit' is 0 since if + * no zero bit is found, it returns count*8 assuming the string is zero + * padded on the right. However if 'bit' is 1 it is possible that there is + * not a single set bit in the bitmap. In this special case -1 is returned. */ +long long redisBitpos(void *s, unsigned long count, int bit) { + unsigned long *l; + unsigned char *c; + unsigned long skipval, word = 0, one; + long long pos = 0; /* Position of bit, to return to the caller. */ + unsigned long j; + int found; + + /* Process whole words first, seeking for first word that is not + * all ones or all zeros respectively if we are looking for zeros + * or ones. This is much faster with large strings having contiguous + * blocks of 1 or 0 bits compared to the vanilla bit per bit processing. + * + * Note that if we start from an address that is not aligned + * to sizeof(unsigned long) we consume it byte by byte until it is + * aligned. */ + + /* Skip initial bits not aligned to sizeof(unsigned long) byte by byte. */ + skipval = bit ? 0 : UCHAR_MAX; + c = (unsigned char*) s; + found = 0; + while((unsigned long)c & (sizeof(*l)-1) && count) { + if (*c != skipval) { + found = 1; + break; + } + c++; + count--; + pos += 8; + } + + /* Skip bits with full word step. */ + l = (unsigned long*) c; + if (!found) { + skipval = bit ? 0 : ULONG_MAX; + while (count >= sizeof(*l)) { + if (*l != skipval) break; + l++; + count -= sizeof(*l); + pos += sizeof(*l)*8; + } + } + + /* Load bytes into "word" considering the first byte as the most significant + * (we basically consider it as written in big endian, since we consider the + * string as a set of bits from left to right, with the first bit at position + * zero. + * + * Note that the loading is designed to work even when the bytes left + * (count) are less than a full word. We pad it with zero on the right. */ + c = (unsigned char*)l; + for (j = 0; j < sizeof(*l); j++) { + word <<= 8; + if (count) { + word |= *c; + c++; + count--; + } + } + + /* Special case: + * If bits in the string are all zero and we are looking for one, + * return -1 to signal that there is not a single "1" in the whole + * string. This can't happen when we are looking for "0" as we assume + * that the right of the string is zero padded. */ + if (bit == 1 && word == 0) return -1; + + /* Last word left, scan bit by bit. The first thing we need is to + * have a single "1" set in the most significant position in an + * unsigned long. We don't know the size of the long so we use a + * simple trick. */ + one = ULONG_MAX; /* All bits set to 1.*/ + one >>= 1; /* All bits set to 1 but the MSB. */ + one = ~one; /* All bits set to 0 but the MSB. */ + + while(one) { + if (((one & word) != 0) == bit) return pos; + pos++; + one >>= 1; + } + + /* If we reached this point, there is a bug in the algorithm, since + * the case of no match is handled as a special case before. */ + serverPanic("End of redisBitpos() reached."); + return 0; /* Just to avoid warnings. */ +} + +/* The following set.*Bitfield and get.*Bitfield functions implement setting + * and getting arbitrary size (up to 64 bits) signed and unsigned integers + * at arbitrary positions into a bitmap. + * + * The representation considers the bitmap as having the bit number 0 to be + * the most significant bit of the first byte, and so forth, so for example + * setting a 5 bits unsigned integer to value 23 at offset 7 into a bitmap + * previously set to all zeroes, will produce the following representation: + * + * +--------+--------+ + * |00000001|01110000| + * +--------+--------+ + * + * When offsets and integer sizes are aligned to bytes boundaries, this is the + * same as big endian, however when such alignment does not exist, its important + * to also understand how the bits inside a byte are ordered. + * + * Note that this format follows the same convention as SETBIT and related + * commands. + */ + +void setUnsignedBitfield(unsigned char *p, uint64_t offset, uint64_t bits, uint64_t value) { + uint64_t byte, bit, byteval, bitval, j; + + for (j = 0; j < bits; j++) { + bitval = (value & ((uint64_t)1<<(bits-1-j))) != 0; + byte = offset >> 3; + bit = 7 - (offset & 0x7); + byteval = p[byte]; + byteval &= ~(1 << bit); + byteval |= bitval << bit; + p[byte] = byteval & 0xff; + offset++; + } +} + +void setSignedBitfield(unsigned char *p, uint64_t offset, uint64_t bits, int64_t value) { + uint64_t uv = value; /* Casting will add UINT64_MAX + 1 if v is negative. */ + setUnsignedBitfield(p,offset,bits,uv); +} + +uint64_t getUnsignedBitfield(unsigned char *p, uint64_t offset, uint64_t bits) { + uint64_t byte, bit, byteval, bitval, j, value = 0; + + for (j = 0; j < bits; j++) { + byte = offset >> 3; + bit = 7 - (offset & 0x7); + byteval = p[byte]; + bitval = (byteval >> bit) & 1; + value = (value<<1) | bitval; + offset++; + } + return value; +} + +int64_t getSignedBitfield(unsigned char *p, uint64_t offset, uint64_t bits) { + int64_t value; + union {uint64_t u; int64_t i;} conv; + + /* Converting from unsigned to signed is undefined when the value does + * not fit, however here we assume two's complement and the original value + * was obtained from signed -> unsigned conversion, so we'll find the + * most significant bit set if the original value was negative. + * + * Note that two's complement is mandatory for exact-width types + * according to the C99 standard. */ + conv.u = getUnsignedBitfield(p,offset,bits); + value = conv.i; + + /* If the top significant bit is 1, propagate it to all the + * higher bits for two's complement representation of signed + * integers. */ + if (bits < 64 && (value & ((uint64_t)1 << (bits-1)))) + value |= ((uint64_t)-1) << bits; + return value; +} + +/* The following two functions detect overflow of a value in the context + * of storing it as an unsigned or signed integer with the specified + * number of bits. The functions both take the value and a possible increment. + * If no overflow could happen and the value+increment fit inside the limits, + * then zero is returned, otherwise in case of overflow, 1 is returned, + * otherwise in case of underflow, -1 is returned. + * + * When non-zero is returned (overflow or underflow), if not NULL, *limit is + * set to the value the operation should result when an overflow happens, + * depending on the specified overflow semantics: + * + * For BFOVERFLOW_SAT if 1 is returned, *limit it is set maximum value that + * you can store in that integer. when -1 is returned, *limit is set to the + * minimum value that an integer of that size can represent. + * + * For BFOVERFLOW_WRAP *limit is set by performing the operation in order to + * "wrap" around towards zero for unsigned integers, or towards the most + * negative number that is possible to represent for signed integers. */ + +#define BFOVERFLOW_WRAP 0 +#define BFOVERFLOW_SAT 1 +#define BFOVERFLOW_FAIL 2 /* Used by the BITFIELD command implementation. */ + +int checkUnsignedBitfieldOverflow(uint64_t value, int64_t incr, uint64_t bits, int owtype, uint64_t *limit) { + uint64_t max = (bits == 64) ? UINT64_MAX : (((uint64_t)1<<bits)-1); + int64_t maxincr = max-value; + int64_t minincr = -value; + + if (value > max || (incr > 0 && incr > maxincr)) { + if (limit) { + if (owtype == BFOVERFLOW_WRAP) { + goto handle_wrap; + } else if (owtype == BFOVERFLOW_SAT) { + *limit = max; + } + } + return 1; + } else if (incr < 0 && incr < minincr) { + if (limit) { + if (owtype == BFOVERFLOW_WRAP) { + goto handle_wrap; + } else if (owtype == BFOVERFLOW_SAT) { + *limit = 0; + } + } + return -1; + } + return 0; + +handle_wrap: + { + uint64_t mask = ((uint64_t)-1) << bits; + uint64_t res = value+incr; + + res &= ~mask; + *limit = res; + } + return 1; +} + +int checkSignedBitfieldOverflow(int64_t value, int64_t incr, uint64_t bits, int owtype, int64_t *limit) { + int64_t max = (bits == 64) ? INT64_MAX : (((int64_t)1<<(bits-1))-1); + int64_t min = (-max)-1; + + /* Note that maxincr and minincr could overflow, but we use the values + * only after checking 'value' range, so when we use it no overflow + * happens. 'uint64_t' cast is there just to prevent undefined behavior on + * overflow */ + int64_t maxincr = (uint64_t)max-value; + int64_t minincr = min-value; + + if (value > max || (bits != 64 && incr > maxincr) || (value >= 0 && incr > 0 && incr > maxincr)) + { + if (limit) { + if (owtype == BFOVERFLOW_WRAP) { + goto handle_wrap; + } else if (owtype == BFOVERFLOW_SAT) { + *limit = max; + } + } + return 1; + } else if (value < min || (bits != 64 && incr < minincr) || (value < 0 && incr < 0 && incr < minincr)) { + if (limit) { + if (owtype == BFOVERFLOW_WRAP) { + goto handle_wrap; + } else if (owtype == BFOVERFLOW_SAT) { + *limit = min; + } + } + return -1; + } + return 0; + +handle_wrap: + { + uint64_t msb = (uint64_t)1 << (bits-1); + uint64_t a = value, b = incr, c; + c = a+b; /* Perform addition as unsigned so that's defined. */ + + /* If the sign bit is set, propagate to all the higher order + * bits, to cap the negative value. If it's clear, mask to + * the positive integer limit. */ + if (bits < 64) { + uint64_t mask = ((uint64_t)-1) << bits; + if (c & msb) { + c |= mask; + } else { + c &= ~mask; + } + } + *limit = c; + } + return 1; +} + +/* Debugging function. Just show bits in the specified bitmap. Not used + * but here for not having to rewrite it when debugging is needed. */ +void printBits(unsigned char *p, unsigned long count) { + unsigned long j, i, byte; + + for (j = 0; j < count; j++) { + byte = p[j]; + for (i = 0x80; i > 0; i /= 2) + printf("%c", (byte & i) ? '1' : '0'); + printf("|"); + } + printf("\n"); +} + +/* ----------------------------------------------------------------------------- + * Bits related string commands: GETBIT, SETBIT, BITCOUNT, BITOP. + * -------------------------------------------------------------------------- */ + +#define BITOP_AND 0 +#define BITOP_OR 1 +#define BITOP_XOR 2 +#define BITOP_NOT 3 +#define BITOP_DIFF 4 /* DIFF(X, A1, A2, ..., An) = X & !(A1 | A2 | ... | An) */ +#define BITOP_DIFF1 5 /* DIFF1(X, A1, A2, ..., An) = !X & (A1 | A2 | ... | An) */ +#define BITOP_ANDOR 6 /* ANDOR(X, A1, A2, ..., An) = X & (A1 | A2 | ... | An) */ + +/* ONE(A1, A2, ..., An) = X. + * If X[i] is the i-th bit of X then: + * X[i] == 1 if and only if there is m such that: + * Am[i] == 1 and Al[i] == 0 for all l != m. */ +#define BITOP_ONE 7 + +#define BITFIELDOP_GET 0 +#define BITFIELDOP_SET 1 +#define BITFIELDOP_INCRBY 2 + +/* This helper function used by GETBIT / SETBIT parses the bit offset argument + * making sure an error is returned if it is negative or if it overflows + * Redis 512 MB limit for the string value or more (server.proto_max_bulk_len). + * + * If the 'hash' argument is true, and 'bits is positive, then the command + * will also parse bit offsets prefixed by "#". In such a case the offset + * is multiplied by 'bits'. This is useful for the BITFIELD command. */ +int getBitOffsetFromArgument(client *c, robj *o, uint64_t *offset, int hash, int bits) { + long long loffset; + char *err = "bit offset is not an integer or out of range"; + char *p = o->ptr; + size_t plen = sdslen(p); + int usehash = 0; + + /* Handle #<offset> form. */ + if (p[0] == '#' && hash && bits > 0) usehash = 1; + + if (string2ll(p+usehash,plen-usehash,&loffset) == 0) { + addReplyError(c,err); + return C_ERR; + } + + /* Adjust the offset by 'bits' for #<offset> form. */ + if (usehash) loffset *= bits; + + /* Limit offset to server.proto_max_bulk_len (512MB in bytes by default) */ + if (loffset < 0 || (!mustObeyClient(c) && (loffset >> 3) >= server.proto_max_bulk_len)) + { + addReplyError(c,err); + return C_ERR; + } + + *offset = loffset; + return C_OK; +} + +/* This helper function for BITFIELD parses a bitfield type in the form + * <sign><bits> where sign is 'u' or 'i' for unsigned and signed, and + * the bits is a value between 1 and 64. However 64 bits unsigned integers + * are reported as an error because of current limitations of Redis protocol + * to return unsigned integer values greater than INT64_MAX. + * + * On error C_ERR is returned and an error is sent to the client. */ +int getBitfieldTypeFromArgument(client *c, robj *o, int *sign, int *bits) { + char *p = o->ptr; + char *err = "Invalid bitfield type. Use something like i16 u8. Note that u64 is not supported but i64 is."; + long long llbits; + + if (p[0] == 'i') { + *sign = 1; + } else if (p[0] == 'u') { + *sign = 0; + } else { + addReplyError(c,err); + return C_ERR; + } + + if ((string2ll(p+1,strlen(p+1),&llbits)) == 0 || + llbits < 1 || + (*sign == 1 && llbits > 64) || + (*sign == 0 && llbits > 63)) + { + addReplyError(c,err); + return C_ERR; + } + *bits = llbits; + return C_OK; +} + +/* This is a helper function for commands implementations that need to write + * bits to a string object. The command creates or pad with zeroes the string + * so that the 'maxbit' bit can be addressed. The object is finally + * returned. Otherwise if the key holds a wrong type NULL is returned and + * an error is sent to the client. + * + * (Must provide all the arguments to the function) + */ +static kvobj *lookupStringForBitCommand(client *c, uint64_t maxbit, + size_t *strOldSize, size_t *strGrowSize) +{ + dictEntryLink link; + size_t byte = maxbit >> 3; + size_t oldAllocSize = 0; + kvobj *o = lookupKeyWriteWithLink(c->db,c->argv[1],&link); + if (checkType(c,o,OBJ_STRING)) return NULL; + + if (o == NULL) { + o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1)); + dbAddByLink(c->db,c->argv[1],&o,&link); + *strGrowSize = byte + 1; + *strOldSize = 0; + } else { + o = dbUnshareStringValue(c->db,c->argv[1],o); + *strOldSize = sdslen(o->ptr); + if (server.memory_tracking_per_slot) + oldAllocSize = stringObjectAllocSize(o); + o->ptr = sdsgrowzero(o->ptr,byte+1); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldAllocSize, stringObjectAllocSize(o)); + *strGrowSize = sdslen(o->ptr) - *strOldSize; + } + return o; +} + +/* Return a pointer to the string object content, and stores its length + * in 'len'. The user is required to pass (likely stack allocated) buffer + * 'llbuf' of at least LONG_STR_SIZE bytes. Such a buffer is used in the case + * the object is integer encoded in order to provide the representation + * without using heap allocation. + * + * The function returns the pointer to the object array of bytes representing + * the string it contains, that may be a pointer to 'llbuf' or to the + * internal object representation. As a side effect 'len' is filled with + * the length of such buffer. + * + * If the source object is NULL the function is guaranteed to return NULL + * and set 'len' to 0. */ +unsigned char *getObjectReadOnlyString(robj *o, long *len, char *llbuf) { + serverAssert(!o || o->type == OBJ_STRING); + unsigned char *p = NULL; + + /* Set the 'p' pointer to the string, that can be just a stack allocated + * array if our string was integer encoded. */ + if (o && o->encoding == OBJ_ENCODING_INT) { + p = (unsigned char*) llbuf; + if (len) *len = ll2string(llbuf,LONG_STR_SIZE,(long)o->ptr); + } else if (o) { + p = (unsigned char*) o->ptr; + if (len) *len = sdslen(o->ptr); + } else { + if (len) *len = 0; + } + return p; +} + +/* SETBIT key offset bitvalue */ +void setbitCommand(client *c) { + char *err = "bit is not an integer or out of range"; + uint64_t bitoffset; + ssize_t byte, bit; + int byteval, bitval; + long on; + + if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK) + return; + + if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK) + return; + + /* Bits can only be set or cleared... */ + if (on & ~1) { + addReplyError(c,err); + return; + } + + size_t strOldSize, strGrowSize; + kvobj *o = lookupStringForBitCommand(c, bitoffset, &strOldSize, &strGrowSize); + if (o == NULL) return; + + /* Get current values */ + byte = bitoffset >> 3; + byteval = ((uint8_t*)o->ptr)[byte]; + bit = 7 - (bitoffset & 0x7); + bitval = byteval & (1 << bit); + + /* Either it is newly created, changed length, or the bit changes before and after. + * Note that the bitval here is actually a decimal number. + * So we need to use `!!` to convert it to 0 or 1 for comparison. */ + if (strGrowSize || (!!bitval != on)) { + /* Update byte with new bit value. */ + byteval &= ~(1 << bit); + byteval |= ((on & 0x1) << bit); + ((uint8_t*)o->ptr)[byte] = byteval; + keyModified(c,c->db,c->argv[1],o,1); + notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); + server.dirty++; + + /* If this is not a new key (old size not 0) and size changed, then + * update the keysizes histogram. Otherwise, the histogram already + * updated in lookupStringForBitCommand() by calling dbAdd(). */ + if ((strOldSize > 0) && (strGrowSize != 0)) + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, + strOldSize, strOldSize + strGrowSize); + } + + /* Return original value. */ + addReply(c, bitval ? shared.cone : shared.czero); +} + +/* GETBIT key offset */ +void getbitCommand(client *c) { + char llbuf[32]; + uint64_t bitoffset; + size_t byte, bit; + size_t bitval = 0; + + if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK) + return; + + kvobj *kv = lookupKeyReadOrReply(c, c->argv[1], shared.czero); + if (kv == NULL || checkType(c,kv,OBJ_STRING)) return; + + byte = bitoffset >> 3; + bit = 7 - (bitoffset & 0x7); + if (sdsEncodedObject(kv)) { + if (byte < sdslen(kv->ptr)) + bitval = ((uint8_t*)kv->ptr)[byte] & (1 << bit); + } else { + if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)kv->ptr)) + bitval = llbuf[byte] & (1 << bit); + } + + addReply(c, bitval ? shared.cone : shared.czero); +} + +#ifdef HAVE_AVX2 +/* Compute the given bitop operation using AVX2 intrinsics. + * Return how many bytes were successfully processed, as AVX2 operates on + * 256-bit registers so if `minlen` is not a multiple of 32 some of the bytes + * will be skipped. They will be taken care for in the unoptimized loop in the + * main bitopCommand function. */ +ATTRIBUTE_TARGET_AVX2_POPCOUNT +unsigned long bitopCommandAVX(unsigned char **keys, unsigned char *res, + unsigned long op, unsigned long numkeys, + unsigned long minlen) +{ + const unsigned long step = sizeof(__m256i); + + unsigned long i; + unsigned long processed = 0; + unsigned char *res_start = res; + unsigned char *fst_key = keys[0]; + + if (minlen < step) { + return 0; + } + + /* Unlike other operations that do the same with all source keys + * DIFF, DIFF1 and ANDOR all compute the disjunction of all the source keys + * but the first one. We first store that disjunction in `lres` and later + * compute the final operation using the first source key. */ + if (op != BITOP_DIFF && op != BITOP_DIFF1 && op != BITOP_ANDOR) { + memcpy(res, keys[0], minlen); + } + + const __m256i max256 = _mm256_set1_epi64x(-1); + const __m256i zero256 = _mm256_set1_epi64x(0); + + switch (op) { + case BITOP_AND: + while (minlen >= step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + + for (i = 1; i < numkeys; i++) { + __m256i lkey = _mm256_lddqu_si256((__m256i*)(keys[i]+processed)); + lres = _mm256_and_si256(lres, lkey); + } + _mm256_storeu_si256((__m256i*)res, lres); + res += step; + processed += step; + minlen -= step; + } + break; + case BITOP_DIFF: + case BITOP_DIFF1: + case BITOP_ANDOR: + case BITOP_OR: + while (minlen >= step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + + for (i = 1; i < numkeys; i++) { + __m256i lkey = _mm256_lddqu_si256((__m256i*)(keys[i]+processed)); + lres = _mm256_or_si256(lres, lkey); + } + _mm256_storeu_si256((__m256i*)res, lres); + res += step; + processed += step; + minlen -= step; + } + break; + case BITOP_XOR: + while (minlen >= step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + + for (i = 1; i < numkeys; i++) { + __m256i lkey = _mm256_lddqu_si256((__m256i*)(keys[i]+processed)); + lres = _mm256_xor_si256(lres, lkey); + } + _mm256_storeu_si256((__m256i*)res, lres); + res += step; + processed += step; + minlen -= step; + } + break; + case BITOP_NOT: + while (minlen >= step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + lres = _mm256_xor_si256(lres, max256); + _mm256_storeu_si256((__m256i*)res, lres); + res += step; + processed += step; + minlen -= step; + } + break; + case BITOP_ONE: + while (minlen >= step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + __m256i common_bits = zero256; + + for (i = 1; i < numkeys; i++) { + __m256i lkey = _mm256_lddqu_si256((__m256i*)(keys[i]+processed)); + __m256i common = _mm256_and_si256(lres, lkey); + common_bits = _mm256_or_si256(common_bits, common); + + lres = _mm256_xor_si256(lres, lkey); + } + lres = _mm256_andnot_si256(common_bits, lres); + _mm256_storeu_si256((__m256i*)res, lres); + res += step; + processed += step; + minlen -= step; + } + break; + default: + break; + } + + res = res_start; + switch (op) { + case BITOP_DIFF: + for (i = 0; i < processed; i += step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + __m256i fkey = _mm256_lddqu_si256((__m256i*)fst_key); + + lres = _mm256_andnot_si256(lres, fkey); + _mm256_storeu_si256((__m256i*)res, lres); + + res += step; + fst_key += step; + } + break; + case BITOP_DIFF1: + for (i = 0; i < processed; i += step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + __m256i fkey = _mm256_lddqu_si256((__m256i*)fst_key); + + lres = _mm256_andnot_si256(fkey, lres); + _mm256_storeu_si256((__m256i*)res, lres); + + res += step; + fst_key += step; + } + break; + case BITOP_ANDOR: + for (i = 0; i < processed; i += step) { + __m256i lres = _mm256_lddqu_si256((__m256i*)res); + __m256i fkey = _mm256_lddqu_si256((__m256i*)fst_key); + + lres = _mm256_and_si256(fkey, lres); + _mm256_storeu_si256((__m256i*)res, lres); + + res += step; + fst_key += step; + } + break; + default: + break; + } + + return processed; +} +#endif /* HAVE_AVX2 */ + +/* BITOP op_name target_key src_key1 src_key2 src_key3 ... src_keyN */ +REDIS_NO_SANITIZE("alignment") +void bitopCommand(client *c) { + char *opname = c->argv[1]->ptr; + robj *targetkey = c->argv[2]; + unsigned long op, j, numkeys; + robj **objects; /* Array of source objects. */ + unsigned char **src; /* Array of source strings pointers. */ + unsigned long *len, maxlen = 0; /* Array of length of src strings, + and max len. */ + unsigned long minlen = 0; /* Min len among the input keys. */ + unsigned char *res = NULL; /* Resulting string. */ + + /* Parse the operation name. */ + if ((opname[0] == 'a' || opname[0] == 'A') && !strcasecmp(opname,"and")) + op = BITOP_AND; + else if((opname[0] == 'o' || opname[0] == 'O') && !strcasecmp(opname,"or")) + op = BITOP_OR; + else if((opname[0] == 'x' || opname[0] == 'X') && !strcasecmp(opname,"xor")) + op = BITOP_XOR; + else if((opname[0] == 'n' || opname[0] == 'N') && !strcasecmp(opname,"not")) + op = BITOP_NOT; + else if ((opname[0] == 'd' || opname[0] == 'D') && !strcasecmp(opname,"diff")) + op = BITOP_DIFF; + else if ((opname[0] == 'd' || opname[0] == 'D') && !strcasecmp(opname,"diff1")) + op = BITOP_DIFF1; + else if ((opname[0] == 'a' || opname[0] == 'A') && !strcasecmp(opname,"andor")) + op = BITOP_ANDOR; + else if ((opname[0] == 'o' || opname[0] == 'O') && !strcasecmp(opname,"one")) + op = BITOP_ONE; + else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + /* Sanity check: NOT accepts only a single key argument. */ + if (op == BITOP_NOT && c->argc != 4) { + addReplyError(c,"BITOP NOT must be called with a single source key."); + return; + } + + if ((op == BITOP_DIFF || op == BITOP_DIFF1 || op == BITOP_ANDOR) && c->argc < 5) { + sds opname_upper = sdsnew(opname); + sdstoupper(opname_upper); + addReplyErrorFormat(c,"BITOP %s must be called with at least two source keys.", opname_upper); + sdsfree(opname_upper); + return; + } + + /* Lookup keys, and store pointers to the string objects into an array. */ + numkeys = c->argc - 3; + src = zmalloc(sizeof(unsigned char*) * numkeys); + len = zmalloc(sizeof(long) * numkeys); + objects = zmalloc(sizeof(robj*) * numkeys); + for (j = 0; j < numkeys; j++) { + kvobj *kv = lookupKeyRead(c->db, c->argv[j + 3]); + /* Handle non-existing keys as empty strings. */ + if (kv == NULL) { + objects[j] = NULL; + src[j] = NULL; + len[j] = 0; + minlen = 0; + continue; + } + /* Return an error if one of the keys is not a string. */ + if (checkType(c, kv, OBJ_STRING)) { + unsigned long i; + for (i = 0; i < j; i++) { + if (objects[i]) + decrRefCount(objects[i]); + } + zfree(src); + zfree(len); + zfree(objects); + return; + } + objects[j] = getDecodedObject(kv); + src[j] = objects[j]->ptr; + len[j] = sdslen(objects[j]->ptr); + if (len[j] > maxlen) maxlen = len[j]; + if (j == 0 || len[j] < minlen) minlen = len[j]; + } + + /* Compute the bit operation, if at least one string is not empty. */ + if (maxlen) { + res = (unsigned char*) sdsnewlen(NULL,maxlen); + unsigned char output, byte, disjunction, common_bits; + unsigned long i; + int useAVX2 = 0; + + /* Number of bytes processed from each source key */ + j = 0; + +#if defined(HAVE_AVX2) + if (BITOP_USE_AVX2) { + j = bitopCommandAVX(src, res, op, numkeys, minlen); + + serverAssert(minlen >= j); + minlen -= j; + + useAVX2 = 1; + } +#endif + +#if !defined(USE_ALIGNED_ACCESS) + /* We don't have AVX2 but we still have fast path: + * as far as we have data for all the input bitmaps we + * can take a fast path that performs much better than the + * vanilla algorithm. On ARM we skip the fast path since it will + * result in GCC compiling the code using multiple-words load/store + * operations that are not supported even in ARM >= v6. */ + if (minlen >= sizeof(unsigned long)*4) { + /* We can't have entered the AVX2 path since minlen >= sizeof(unsigned long)*4 + * AVX2 path operates on steps of sizeof(__m256i) which for 64-bit + * machines (the only ones supporting AVX2) is equal to + * sizeof(unsigned long)*4. That means after the AVX2 + * path minlen will necessarily be < sizeof(unsigned long)*4. */ + serverAssert(!useAVX2); + + unsigned long **lp = (unsigned long**)src; + unsigned long *lres = (unsigned long*) res; + + /* Index over the unsigned long version of the source keys */ + size_t k = 0; + + /* Unlike other operations that do the same with all source keys + * DIFF, DIFF1 and ANDOR all compute the disjunction of all the + * source keys but the first one. We first store that disjunction + * in `lres` and later compute the final operation using the first + * source key. */ + if (op != BITOP_DIFF && op != BITOP_DIFF1 && op != BITOP_ANDOR) + memcpy(lres,src[0],minlen); + + /* Different branches per different operations for speed (sorry). */ + if (op == BITOP_AND) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] &= lp[i][k+0]; + lres[1] &= lp[i][k+1]; + lres[2] &= lp[i][k+2]; + lres[3] &= lp[i][k+3]; + } + k+=4; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_OR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] |= lp[i][k+0]; + lres[1] |= lp[i][k+1]; + lres[2] |= lp[i][k+2]; + lres[3] |= lp[i][k+3]; + } + k+=4; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_XOR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] ^= lp[i][k+0]; + lres[1] ^= lp[i][k+1]; + lres[2] ^= lp[i][k+2]; + lres[3] ^= lp[i][k+3]; + } + k+=4; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_NOT) { + while(minlen >= sizeof(unsigned long)*4) { + lres[0] = ~lres[0]; + lres[1] = ~lres[1]; + lres[2] = ~lres[2]; + lres[3] = ~lres[3]; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_DIFF || op == BITOP_DIFF1 || op == BITOP_ANDOR) { + size_t processed = 0; + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] |= lp[i][k+0]; + lres[1] |= lp[i][k+1]; + lres[2] |= lp[i][k+2]; + lres[3] |= lp[i][k+3]; + } + k+=4; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + processed += sizeof(unsigned long)*4; + } + + lres = (unsigned long*) res; + unsigned long *first_key = (unsigned long*)src[0]; + switch (op) { + case BITOP_DIFF: + for (i = 0; i < processed; i += sizeof(unsigned long)*4) { + lres[0] = (first_key[0] & ~lres[0]); + lres[1] = (first_key[1] & ~lres[1]); + lres[2] = (first_key[2] & ~lres[2]); + lres[3] = (first_key[3] & ~lres[3]); + lres+=4; + first_key += 4; + } + break; + case BITOP_DIFF1: + for (i = 0; i < processed; i += sizeof(unsigned long)*4) { + lres[0] = (~first_key[0] & lres[0]); + lres[1] = (~first_key[1] & lres[1]); + lres[2] = (~first_key[2] & lres[2]); + lres[3] = (~first_key[3] & lres[3]); + lres+=4; + first_key += 4; + } + break; + case BITOP_ANDOR: + for (i = 0; i < processed; i += sizeof(unsigned long)*4) { + lres[0] = (first_key[0] & lres[0]); + lres[1] = (first_key[1] & lres[1]); + lres[2] = (first_key[2] & lres[2]); + lres[3] = (first_key[3] & lres[3]); + lres+=4; + first_key += 4; + } + break; + } + } else if (op == BITOP_ONE) { + unsigned long lcommon_bits[4]; + + while(minlen >= sizeof(unsigned long)*4) { + memset(lcommon_bits, 0, sizeof(lcommon_bits)); + + for (i = 1; i < numkeys; i++) { + lcommon_bits[0] |= (lres[0] & lp[i][k+0]); + lcommon_bits[1] |= (lres[1] & lp[i][k+1]); + lcommon_bits[2] |= (lres[2] & lp[i][k+2]); + lcommon_bits[3] |= (lres[3] & lp[i][k+3]); + + lres[0] ^= lp[i][k+0]; + lres[1] ^= lp[i][k+1]; + lres[2] ^= lp[i][k+2]; + lres[3] ^= lp[i][k+3]; + } + + lres[0] &= ~lcommon_bits[0]; + lres[1] &= ~lcommon_bits[1]; + lres[2] &= ~lcommon_bits[2]; + lres[3] &= ~lcommon_bits[3]; + + k+=4; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } + } +#endif /* !defined(USE_ALIGNED_ACCESS) */ + + /* j is set to the next byte to process by the previous loop. */ + for (; j < maxlen; j++) { + output = (len[0] <= j) ? 0 : src[0][j]; + if (op == BITOP_NOT) output = ~output; + disjunction = 0; + common_bits = 0; + + for (i = 1; i < numkeys; i++) { + int skip = 0; + byte = (len[i] <= j) ? 0 : src[i][j]; + switch(op) { + case BITOP_AND: + output &= byte; + skip = (output == 0); + break; + case BITOP_OR: + output |= byte; + skip = (output == 0xff); + break; + case BITOP_XOR: output ^= byte; break; + + /* For DIFF, DIFF1 and ANDOR we compute the disjunction of all + * key arguments except the first one. After that we do their + * respective bit op on said first arg and that disjunction. + * */ + case BITOP_DIFF: + case BITOP_DIFF1: + case BITOP_ANDOR: + disjunction |= byte; + skip = (disjunction == 0xff); + break; + + /* BITOP ONE dest key_1 [key_2...] + * If dest[i] is the i-th bit of dest then: + * dest[i] == 1 if and only if there is j such that key_j[i] == 1 + * and key_n[i] == 0 for all n != j. + * + * In order to compute that on each step we track which bits + * were seen in more than one key and store that in a helper + * variable. Then the operation is just XOR but on each step we + * nullify the bits that are set in the helper. + * Logically, this operation is the same as nullifying the + * helper bits only once at the end, but performance-wise it had + * no significant benefit and makes the code only more unclear. + * + * e.g: + * 0001 0111 # key1 + * 0010 0110 # key2 + * + * 0011 0001 # intermediate1 + * 0000 0110 # helper + * 0011 0001 # intermediate1 & ~helper + * + * 0100 1101 # key3 + * + * 0111 1100 # intermediate2 + * 0000 0111 # helper + * 0111 1000 # intermediate2 & ~helper + * --------- + * 0111 1000 # result + * */ + case BITOP_ONE: + common_bits |= (output & byte); + output ^= byte; + output &= ~common_bits; + skip = (common_bits == 0xff); + break; + default: + break; + } + + if (skip) { + break; + } + } + + switch(op) { + case BITOP_DIFF: + res[j] = (output & ~disjunction); + break; + case BITOP_DIFF1: + res[j] = (~output & disjunction); + break; + case BITOP_ANDOR: + res[j] = (output & disjunction); + break; + default: + res[j] = output; + break; + } + } + } + for (j = 0; j < numkeys; j++) { + if (objects[j]) + decrRefCount(objects[j]); + } + zfree(src); + zfree(len); + zfree(objects); + + /* Store the computed value into the target key */ + if (maxlen) { + robj *o = createObject(OBJ_STRING, res); + setKey(c, c->db, targetkey, &o, 0); + notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id); + server.dirty++; + } else if (dbDelete(c->db,targetkey)) { + keyModified(c,c->db,targetkey,NULL,1); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id); + server.dirty++; + } + addReplyLongLong(c,maxlen); /* Return the output string length in bytes. */ +} + +/* BITCOUNT key [start end [BIT|BYTE]] */ +void bitcountCommand(client *c) { + kvobj *o; + long long start, end; + long strlen; + unsigned char *p; + char llbuf[LONG_STR_SIZE]; + int isbit = 0; + unsigned char first_byte_neg_mask = 0, last_byte_neg_mask = 0; + + /* Parse start/end range if any. */ + if (c->argc == 4 || c->argc == 5) { + if (getLongLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) + return; + if (getLongLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK) + return; + if (c->argc == 5) { + if (!strcasecmp(c->argv[4]->ptr,"bit")) isbit = 1; + else if (!strcasecmp(c->argv[4]->ptr,"byte")) isbit = 0; + else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + /* Lookup, check for type. */ + o = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c, o, OBJ_STRING)) return; + p = getObjectReadOnlyString(o,&strlen,llbuf); + long long totlen = strlen; + + /* Make sure we will not overflow */ + serverAssert(totlen <= LLONG_MAX >> 3); + + /* Convert negative indexes */ + if (start < 0 && end < 0 && start > end) { + addReply(c,shared.czero); + return; + } + if (isbit) totlen <<= 3; + if (start < 0) start = totlen+start; + if (end < 0) end = totlen+end; + if (start < 0) start = 0; + if (end < 0) end = 0; + if (end >= totlen) end = totlen-1; + if (isbit && start <= end) { + /* Before converting bit offset to byte offset, create negative masks + * for the edges. */ + first_byte_neg_mask = ~((1<<(8-(start&7)))-1) & 0xFF; + last_byte_neg_mask = (1<<(7-(end&7)))-1; + start >>= 3; + end >>= 3; + } + } else if (c->argc == 2) { + /* Lookup, check for type. */ + o = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c, o, OBJ_STRING)) return; + p = getObjectReadOnlyString(o,&strlen,llbuf); + /* The whole string. */ + start = 0; + end = strlen-1; + } else { + /* Syntax error. */ + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + /* Return 0 for non existing keys. */ + if (o == NULL) { + addReply(c, shared.czero); + return; + } + + /* Precondition: end >= 0 && end < strlen, so the only condition where + * zero can be returned is: start > end. */ + if (start > end) { + addReply(c,shared.czero); + } else { + long bytes = (long)(end-start+1); + long long count; + + /* Use the best available popcount implementation */ + count = redisPopcountAuto(p+start, bytes); + + if (first_byte_neg_mask != 0 || last_byte_neg_mask != 0) { + unsigned char firstlast[2] = {0, 0}; + /* We may count bits of first byte and last byte which are out of + * range. So we need to subtract them. Here we use a trick. We set + * bits in the range to zero. So these bit will not be excluded. */ + if (first_byte_neg_mask != 0) firstlast[0] = p[start] & first_byte_neg_mask; + if (last_byte_neg_mask != 0) firstlast[1] = p[end] & last_byte_neg_mask; + + /* Use the same popcount implementation for consistency */ + count -= redisPopcountAuto(firstlast, 2); + } + addReplyLongLong(c,count); + } +} + +/* BITPOS key bit [start [end [BIT|BYTE]]] */ +void bitposCommand(client *c) { + kvobj *o; + long long start, end; + long bit, strlen; + unsigned char *p; + char llbuf[LONG_STR_SIZE]; + int isbit = 0, end_given = 0; + unsigned char first_byte_neg_mask = 0, last_byte_neg_mask = 0; + + /* Parse the bit argument to understand what we are looking for, set + * or clear bits. */ + if (getLongFromObjectOrReply(c,c->argv[2],&bit,NULL) != C_OK) + return; + if (bit != 0 && bit != 1) { + addReplyError(c, "The bit argument must be 1 or 0."); + return; + } + + /* Parse start/end range if any. */ + if (c->argc == 4 || c->argc == 5 || c->argc == 6) { + if (getLongLongFromObjectOrReply(c,c->argv[3],&start,NULL) != C_OK) + return; + if (c->argc == 6) { + if (!strcasecmp(c->argv[5]->ptr,"bit")) isbit = 1; + else if (!strcasecmp(c->argv[5]->ptr,"byte")) isbit = 0; + else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + if (c->argc >= 5) { + if (getLongLongFromObjectOrReply(c,c->argv[4],&end,NULL) != C_OK) + return; + end_given = 1; + } + + /* Lookup, check for type. */ + o = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c, o, OBJ_STRING)) return; + p = getObjectReadOnlyString(o, &strlen, llbuf); + + /* Make sure we will not overflow */ + long long totlen = strlen; + serverAssert(totlen <= LLONG_MAX >> 3); + + if (c->argc < 5) { + if (isbit) end = (totlen<<3) + 7; + else end = totlen-1; + } + + if (isbit) totlen <<= 3; + /* Convert negative indexes */ + if (start < 0) start = totlen+start; + if (end < 0) end = totlen+end; + if (start < 0) start = 0; + if (end < 0) end = 0; + if (end >= totlen) end = totlen-1; + if (isbit && start <= end) { + /* Before converting bit offset to byte offset, create negative masks + * for the edges. */ + first_byte_neg_mask = ~((1<<(8-(start&7)))-1) & 0xFF; + last_byte_neg_mask = (1<<(7-(end&7)))-1; + start >>= 3; + end >>= 3; + } + } else if (c->argc == 3) { + /* Lookup, check for type. */ + o = lookupKeyRead(c->db, c->argv[1]); + if (checkType(c,o,OBJ_STRING)) return; + p = getObjectReadOnlyString(o,&strlen,llbuf); + + /* The whole string. */ + start = 0; + end = strlen-1; + } else { + /* Syntax error. */ + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + /* If the key does not exist, from our point of view it is an infinite + * array of 0 bits. If the user is looking for the first clear bit return 0, + * If the user is looking for the first set bit, return -1. */ + if (o == NULL) { + addReplyLongLong(c, bit ? -1 : 0); + return; + } + + /* For empty ranges (start > end) we return -1 as an empty range does + * not contain a 0 nor a 1. */ + if (start > end) { + addReplyLongLong(c, -1); + } else { + long bytes = end-start+1; + long long pos; + unsigned char tmpchar; + if (first_byte_neg_mask) { + if (bit) tmpchar = p[start] & ~first_byte_neg_mask; + else tmpchar = p[start] | first_byte_neg_mask; + /* Special case, there is only one byte */ + if (last_byte_neg_mask && bytes == 1) { + if (bit) tmpchar = tmpchar & ~last_byte_neg_mask; + else tmpchar = tmpchar | last_byte_neg_mask; + } + pos = redisBitpos(&tmpchar,1,bit); + /* If there are no more bytes or we get valid pos, we can exit early */ + if (bytes == 1 || (pos != -1 && pos != 8)) goto result; + start++; + bytes--; + } + /* If the last byte has not bits in the range, we should exclude it */ + long curbytes = bytes - (last_byte_neg_mask ? 1 : 0); + if (curbytes > 0) { + pos = redisBitpos(p+start,curbytes,bit); + /* If there is no more bytes or we get valid pos, we can exit early */ + if (bytes == curbytes || (pos != -1 && pos != (long long)curbytes<<3)) goto result; + start += curbytes; + bytes -= curbytes; + } + if (bit) tmpchar = p[end] & ~last_byte_neg_mask; + else tmpchar = p[end] | last_byte_neg_mask; + pos = redisBitpos(&tmpchar,1,bit); + + result: + /* If we are looking for clear bits, and the user specified an exact + * range with start-end, we can't consider the right of the range as + * zero padded (as we do when no explicit end is given). + * + * So if redisBitpos() returns the first bit outside the range, + * we return -1 to the caller, to mean, in the specified range there + * is not a single "0" bit. */ + if (end_given && bit == 0 && pos == (long long)bytes<<3) { + addReplyLongLong(c,-1); + return; + } + if (pos != -1) pos += (long long)start<<3; /* Adjust for the bytes we skipped. */ + addReplyLongLong(c,pos); + } +} + +/* BITFIELD key subcommand-1 arg ... subcommand-2 arg ... subcommand-N ... + * + * Supported subcommands: + * + * GET <type> <offset> + * SET <type> <offset> <value> + * INCRBY <type> <offset> <increment> + * OVERFLOW [WRAP|SAT|FAIL] + */ + +#define BITFIELD_FLAG_NONE 0 +#define BITFIELD_FLAG_READONLY (1<<0) + +struct bitfieldOp { + uint64_t offset; /* Bitfield offset. */ + int64_t i64; /* Increment amount (INCRBY) or SET value */ + int opcode; /* Operation id. */ + int owtype; /* Overflow type to use. */ + int bits; /* Integer bitfield bits width. */ + int sign; /* True if signed, otherwise unsigned op. */ +}; + +/* This implements both the BITFIELD command and the BITFIELD_RO command + * when flags is set to BITFIELD_FLAG_READONLY: in this case only the + * GET subcommand is allowed, other subcommands will return an error. */ +void bitfieldGeneric(client *c, int flags) { + kvobj *o; + uint64_t bitoffset; + int j, numops = 0, changes = 0; + size_t strOldSize, strGrowSize = 0; + struct bitfieldOp *ops = NULL; /* Array of ops to execute at end. */ + int owtype = BFOVERFLOW_WRAP; /* Overflow type. */ + int readonly = 1; + uint64_t highest_write_offset = 0; + + for (j = 2; j < c->argc; j++) { + int remargs = c->argc-j-1; /* Remaining args other than current. */ + char *subcmd = c->argv[j]->ptr; /* Current command name. */ + int opcode; /* Current operation code. */ + long long i64 = 0; /* Signed SET value. */ + int sign = 0; /* Signed or unsigned type? */ + int bits = 0; /* Bitfield width in bits. */ + + if (!strcasecmp(subcmd,"get") && remargs >= 2) + opcode = BITFIELDOP_GET; + else if (!strcasecmp(subcmd,"set") && remargs >= 3) + opcode = BITFIELDOP_SET; + else if (!strcasecmp(subcmd,"incrby") && remargs >= 3) + opcode = BITFIELDOP_INCRBY; + else if (!strcasecmp(subcmd,"overflow") && remargs >= 1) { + char *owtypename = c->argv[j+1]->ptr; + j++; + if (!strcasecmp(owtypename,"wrap")) + owtype = BFOVERFLOW_WRAP; + else if (!strcasecmp(owtypename,"sat")) + owtype = BFOVERFLOW_SAT; + else if (!strcasecmp(owtypename,"fail")) + owtype = BFOVERFLOW_FAIL; + else { + addReplyError(c,"Invalid OVERFLOW type specified"); + zfree(ops); + return; + } + continue; + } else { + addReplyErrorObject(c,shared.syntaxerr); + zfree(ops); + return; + } + + /* Get the type and offset arguments, common to all the ops. */ + if (getBitfieldTypeFromArgument(c,c->argv[j+1],&sign,&bits) != C_OK) { + zfree(ops); + return; + } + + if (getBitOffsetFromArgument(c,c->argv[j+2],&bitoffset,1,bits) != C_OK){ + zfree(ops); + return; + } + + if (opcode != BITFIELDOP_GET) { + readonly = 0; + if (highest_write_offset < bitoffset + bits - 1) + highest_write_offset = bitoffset + bits - 1; + /* INCRBY and SET require another argument. */ + if (getLongLongFromObjectOrReply(c,c->argv[j+3],&i64,NULL) != C_OK){ + zfree(ops); + return; + } + } + + /* Populate the array of operations we'll process. */ + ops = zrealloc(ops,sizeof(*ops)*(numops+1)); + ops[numops].offset = bitoffset; + ops[numops].i64 = i64; + ops[numops].opcode = opcode; + ops[numops].owtype = owtype; + ops[numops].bits = bits; + ops[numops].sign = sign; + numops++; + + j += 3 - (opcode == BITFIELDOP_GET); + } + + if (readonly) { + /* Lookup for read is ok if key doesn't exit, but errors + * if it's not a string. */ + o = lookupKeyRead(c->db,c->argv[1]); + if (o != NULL && checkType(c,o,OBJ_STRING)) { + zfree(ops); + return; + } + } else { + if (flags & BITFIELD_FLAG_READONLY) { + zfree(ops); + addReplyError(c, "BITFIELD_RO only supports the GET subcommand"); + return; + } + + /* Lookup by making room up to the farthest bit reached by + * this operation. */ + if ((o = lookupStringForBitCommand(c, + highest_write_offset,&strOldSize,&strGrowSize)) == NULL) { + zfree(ops); + return; + } + } + + addReplyArrayLen(c,numops); + + /* Actually process the operations. */ + for (j = 0; j < numops; j++) { + struct bitfieldOp *thisop = ops+j; + + /* Execute the operation. */ + if (thisop->opcode == BITFIELDOP_SET || + thisop->opcode == BITFIELDOP_INCRBY) + { + /* SET and INCRBY: We handle both with the same code path + * for simplicity. SET return value is the previous value so + * we need fetch & store as well. */ + + /* We need two different but very similar code paths for signed + * and unsigned operations, since the set of functions to get/set + * the integers and the used variables types are different. */ + if (thisop->sign) { + int64_t oldval, newval, wrapped, retval; + int overflow; + + oldval = getSignedBitfield(o->ptr,thisop->offset, + thisop->bits); + + if (thisop->opcode == BITFIELDOP_INCRBY) { + overflow = checkSignedBitfieldOverflow(oldval, + thisop->i64,thisop->bits,thisop->owtype,&wrapped); + newval = overflow ? wrapped : oldval + thisop->i64; + retval = newval; + } else { + newval = thisop->i64; + overflow = checkSignedBitfieldOverflow(newval, + 0,thisop->bits,thisop->owtype,&wrapped); + if (overflow) newval = wrapped; + retval = oldval; + } + + /* On overflow of type is "FAIL", don't write and return + * NULL to signal the condition. */ + if (!(overflow && thisop->owtype == BFOVERFLOW_FAIL)) { + addReplyLongLong(c,retval); + setSignedBitfield(o->ptr,thisop->offset, + thisop->bits,newval); + + if (strGrowSize || (oldval != newval)) + changes++; + } else { + addReplyNull(c); + } + } else { + /* Initialization of 'wrapped' is required to avoid + * false-positive warning "-Wmaybe-uninitialized" */ + uint64_t oldval, newval, retval, wrapped = 0; + int overflow; + + oldval = getUnsignedBitfield(o->ptr,thisop->offset, + thisop->bits); + + if (thisop->opcode == BITFIELDOP_INCRBY) { + newval = oldval + thisop->i64; + overflow = checkUnsignedBitfieldOverflow(oldval, + thisop->i64,thisop->bits,thisop->owtype,&wrapped); + if (overflow) newval = wrapped; + retval = newval; + } else { + newval = thisop->i64; + overflow = checkUnsignedBitfieldOverflow(newval, + 0,thisop->bits,thisop->owtype,&wrapped); + if (overflow) newval = wrapped; + retval = oldval; + } + /* On overflow of type is "FAIL", don't write and return + * NULL to signal the condition. */ + if (!(overflow && thisop->owtype == BFOVERFLOW_FAIL)) { + addReplyLongLong(c,retval); + setUnsignedBitfield(o->ptr,thisop->offset, + thisop->bits,newval); + + if (strGrowSize || (oldval != newval)) + changes++; + } else { + addReplyNull(c); + } + } + } else { + /* GET */ + unsigned char buf[9]; + long strlen = 0; + unsigned char *src = NULL; + char llbuf[LONG_STR_SIZE]; + + if (o != NULL) + src = getObjectReadOnlyString(o,&strlen,llbuf); + + /* For GET we use a trick: before executing the operation + * copy up to 9 bytes to a local buffer, so that we can easily + * execute up to 64 bit operations that are at actual string + * object boundaries. */ + memset(buf,0,9); + int i; + uint64_t byte = thisop->offset >> 3; + for (i = 0; i < 9; i++) { + if (src == NULL || i+byte >= (uint64_t)strlen) break; + buf[i] = src[i+byte]; + } + + /* Now operate on the copied buffer which is guaranteed + * to be zero-padded. */ + if (thisop->sign) { + int64_t val = getSignedBitfield(buf,thisop->offset-(byte*8), + thisop->bits); + addReplyLongLong(c,val); + } else { + uint64_t val = getUnsignedBitfield(buf,thisop->offset-(byte*8), + thisop->bits); + addReplyLongLong(c,val); + } + } + } + + if (changes) { + + /* If this is not a new key (old size not 0) and size changed, then + * update the keysizes histogram. Otherwise, the histogram already + * updated in lookupStringForBitCommand() by calling dbAdd(). */ + if ((strOldSize > 0) && (strGrowSize != 0)) + updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, + strOldSize, strOldSize + strGrowSize); + + keyModified(c,c->db,c->argv[1],o,1); + notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); + server.dirty += changes; + } + zfree(ops); +} + +void bitfieldCommand(client *c) { + bitfieldGeneric(c, BITFIELD_FLAG_NONE); +} + +void bitfieldroCommand(client *c) { + bitfieldGeneric(c, BITFIELD_FLAG_READONLY); +} + +#ifdef REDIS_TEST +/* Test function to verify popcount implementations */ +int bitopsTest(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + /* Test data with known popcount values */ + unsigned char test_data[] = {0xFF, 0x00, 0xAA, 0x55, 0xF0, 0x0F, 0x33, 0xCC}; + int expected_bits = 8 + 0 + 4 + 4 + 4 + 4 + 4 + 4; /* = 32 bits */ + + long long result_regular = redisPopcount(test_data, sizeof(test_data)); + + printf("Regular popcount: %lld (expected: %d)\n", result_regular, expected_bits); + + if (result_regular != expected_bits) { + printf("FAIL: Regular popcount mismatch\n"); + return 1; + } + +#ifdef HAVE_AVX2 + if (BITOP_USE_AVX2) { + long long result_avx2 = redisPopCountAvx2(test_data, sizeof(test_data)); + printf("AVX2 popcount: %lld (expected: %d)\n", result_avx2, expected_bits); + + if (result_avx2 != expected_bits) { + printf("FAIL: AVX2 popcount mismatch\n"); + return 1; + } + } else { + printf("AVX2 not supported on this CPU\n"); + } +#else + printf("AVX2 not compiled in\n"); +#endif + +#ifdef HAVE_AVX512 + if (BITOP_USE_AVX512) { + long long result_avx512 = redisPopCountAvx512(test_data, sizeof(test_data)); + printf("AVX512 popcount: %lld (expected: %d)\n", result_avx512, expected_bits); + + if (result_avx512 != expected_bits) { + printf("FAIL: AVX512 popcount mismatch\n"); + return 1; + } + } else { + printf("AVX512 not supported on this CPU\n"); + } +#else + printf("AVX512 not compiled in\n"); +#endif + +#ifdef HAVE_AARCH64_NEON + { + long long result_aarch64 = redisPopCountAarch64(test_data, sizeof(test_data)); + printf("AArch64 NEON popcount: %lld (expected: %d)\n", result_aarch64, expected_bits); + + if (result_aarch64 != expected_bits) { + printf("FAIL: AArch64 NEON popcount mismatch\n"); + return 1; + } + } +#else + printf("AArch64 NEON not available\n"); +#endif + printf("All popcount tests passed!\n"); + return 0; +} +#endif |
