Issue.1765 (#1774)

Various improvements and fixes to cluster slot caching.

* Improves slot caching so any unique set of seeds all hash to the same key

* Fix a couple of memory leaks.

* Fixes a segfault when executing a multiple key command such as `MGET` or `MSET` while the cluster is resharding.
This commit is contained in:
Michael Grunder
2020-06-07 13:50:22 -07:00
committed by GitHub
parent a0c53e0b30
commit 5ca4141c72
4 changed files with 259 additions and 180 deletions

View File

@@ -865,12 +865,14 @@ cluster_free(redisCluster *c, int free_ctx)
/* Free any error we've got */
if (c->err) zend_string_release(c->err);
/* Invalidate our cache if we were redirected during operation */
if (c->cache_key) {
/* Invalidate persistent cache if the cluster has changed */
if (c->redirections) {
zend_hash_del(&EG(persistent_list), c->cache_key);
c->cache_key = NULL;
}
/* Release our hold on the cache key */
zend_string_release(c->cache_key);
}
/* Free structure itself */
@@ -921,34 +923,6 @@ redisCachedCluster *cluster_cache_create(zend_string *hash, HashTable *nodes) {
return cc;
}
/* Takes our input hash table and returns a straight C array with elements,
* which have been randomized. The return value needs to be freed. */
static zval **cluster_shuffle_seeds(HashTable *seeds, int *len) {
zval **z_seeds, *z_ele;
int *map, i, count, index = 0;
/* How many */
count = zend_hash_num_elements(seeds);
/* Allocate our return value and map */
z_seeds = ecalloc(count, sizeof(zval*));
map = emalloc(sizeof(int)*count);
/* Fill in and shuffle our map */
for (i = 0; i < count; i++) map[i] = i;
fyshuffle(map, count);
/* Iterate over our source array and use our map to create a random list */
ZEND_HASH_FOREACH_VAL(seeds, z_ele) {
z_seeds[map[index++]] = z_ele;
} ZEND_HASH_FOREACH_END();
efree(map);
*len = count;
return z_seeds;
}
static void cluster_free_cached_master(redisCachedMaster *cm) {
size_t i;
@@ -988,7 +962,6 @@ PHP_REDIS_API void cluster_cache_free(redisCachedCluster *rcc) {
cluster_free_cached_master(&rcc->master[i]);
}
/* Free hash key */
zend_string_release(rcc->hash);
pefree(rcc->master, 1);
pefree(rcc, 1);
@@ -1009,17 +982,16 @@ void cluster_init_cache(redisCluster *c, redisCachedCluster *cc) {
for (i = 0; i < cc->count; i++) map[i] = i;
fyshuffle(map, cc->count);
/* Duplicate the hash key so we can invalidate when redirected */
c->cache_key = zend_string_copy(cc->hash);
/* Iterate over masters */
for (i = 0; i < cc->count; i++) {
/* Attach cache key */
c->cache_key = cc->hash;
/* Grab the next master */
cm = &cc->master[map[i]];
/* Hash our host and port */
keylen = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(cm->host.addr),
cm->host.port);
keylen = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(cm->host.addr), cm->host.port);
/* Create socket */
sock = redis_sock_create(ZSTR_VAL(cm->host.addr), ZSTR_LEN(cm->host.addr), cm->host.port,
@@ -1053,56 +1025,45 @@ void cluster_init_cache(redisCluster *c, redisCachedCluster *cc) {
efree(map);
}
/* Initialize seeds */
PHP_REDIS_API int
cluster_init_seeds(redisCluster *cluster, HashTable *ht_seeds) {
RedisSock *redis_sock;
char *str, *psep, key[1024];
int key_len, count, i;
zval **z_seeds, *z_seed;
/* Initialize seeds. By the time we get here we've already validated our
* seeds array and know we have a non-empty array of strings all in
* host:port format. */
PHP_REDIS_API void
cluster_init_seeds(redisCluster *cluster, zend_string **seeds, uint32_t nseeds) {
RedisSock *sock;
char *seed, *sep, key[1024];
int key_len, i, *map;
/* Get our seeds in a randomized array */
z_seeds = cluster_shuffle_seeds(ht_seeds, &count);
/* Get a randomized order to hit our seeds */
map = ecalloc(nseeds, sizeof(*map));
for (i = 0; i < nseeds; i++) map[i] = i;
fyshuffle(map, nseeds);
// Iterate our seeds array
for (i = 0; i < count; i++) {
if ((z_seed = z_seeds[i]) == NULL) continue;
for (i = 0; i < nseeds; i++) {
seed = ZSTR_VAL(seeds[map[i]]);
ZVAL_DEREF(z_seed);
/* Has to be a string */
if (Z_TYPE_P(z_seed) != IS_STRING) continue;
// Grab a copy of the string
str = Z_STRVAL_P(z_seed);
/* Make sure we have a colon for host:port. Search right to left in the
* case of IPv6 */
if ((psep = strrchr(str, ':')) == NULL)
continue;
sep = strrchr(seed, ':');
ZEND_ASSERT(sep != NULL);
// Allocate a structure for this seed
redis_sock = redis_sock_create(str, psep-str,
(unsigned short)atoi(psep+1), cluster->timeout,
sock = redis_sock_create(seed, sep - seed,
(unsigned short)atoi(sep+1), cluster->timeout,
cluster->read_timeout, cluster->persistent, NULL, 0);
// Set auth information if specified
if (cluster->flags->auth) {
redis_sock->auth = zend_string_copy(cluster->flags->auth);
sock->auth = zend_string_copy(cluster->flags->auth);
}
// Index this seed by host/port
key_len = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(redis_sock->host),
redis_sock->port);
key_len = snprintf(key, sizeof(key), "%s:%u", ZSTR_VAL(sock->host),
sock->port);
// Add to our seed HashTable
zend_hash_str_update_ptr(cluster->seeds, key, key_len, redis_sock);
zend_hash_str_update_ptr(cluster->seeds, key, key_len, sock);
}
efree(z_seeds);
// Success if at least one seed seems valid
return zend_hash_num_elements(cluster->seeds) > 0 ? SUCCESS : FAILURE;
efree(map);
}
/* Initial mapping of our cluster keyspace */
@@ -1137,7 +1098,7 @@ PHP_REDIS_API int cluster_map_keyspace(redisCluster *c) {
// Throw an exception if we couldn't map
if (!mapped) {
CLUSTER_THROW_EXCEPTION("Couldn't map cluster keyspace using any provided seed", 0);
return -1;
return FAILURE;
}
return SUCCESS;
@@ -1626,11 +1587,9 @@ PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char
redis_sock_disconnect(c->cmd_sock, 1);
if (timedout) {
CLUSTER_THROW_EXCEPTION(
"Timed out attempting to find data in the correct node!", 0);
CLUSTER_THROW_EXCEPTION("Timed out attempting to find data in the correct node!", 0);
} else {
CLUSTER_THROW_EXCEPTION(
"Error processing response from Redis node!", 0);
CLUSTER_THROW_EXCEPTION("Error processing response from Redis node!", 0);
}
return -1;
@@ -2460,10 +2419,11 @@ PHP_REDIS_API void cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluste
// Set return value if it's our last response
if (mctx->last) {
if (CLUSTER_IS_ATOMIC(c)) {
RETVAL_ZVAL(mctx->z_multi, 0, 1);
RETVAL_ZVAL(mctx->z_multi, 0, 0);
} else {
add_next_index_zval(&c->multi_resp, mctx->z_multi);
}
efree(mctx->z_multi);
}
// Free multi context
@@ -2745,42 +2705,145 @@ int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result,
return SUCCESS;
}
/* Turn a seed array into a zend_string we can use to look up a slot cache */
zend_string *cluster_hash_seeds(HashTable *ht) {
smart_str hash = {0};
zend_string *zstr;
zval *z_seed;
/* Free an array of zend_string seeds */
void free_seed_array(zend_string **seeds, uint32_t nseeds) {
int i;
ZEND_HASH_FOREACH_VAL(ht, z_seed) {
zstr = zval_get_string(z_seed);
smart_str_appendc(&hash, '[');
smart_str_appendl(&hash, ZSTR_VAL(zstr), ZSTR_LEN(zstr));
smart_str_appendc(&hash, ']');
zend_string_release(zstr);
if (seeds == NULL)
return;
for (i = 0; i < nseeds; i++)
zend_string_release(seeds[i]);
efree(seeds);
}
static zend_string **get_valid_seeds(HashTable *input, uint32_t *nseeds) {
HashTable *valid;
uint32_t count, idx = 0;
zval *z_seed;
zend_string *zkey, **seeds = NULL;
/* Short circuit if we don't have any sees */
count = zend_hash_num_elements(input);
if (count == 0)
return NULL;
ALLOC_HASHTABLE(valid);
zend_hash_init(valid, count, NULL, NULL, 0);
ZEND_HASH_FOREACH_VAL(input, z_seed) {
ZVAL_DEREF(z_seed);
if (Z_TYPE_P(z_seed) != IS_STRING) {
php_error_docref(NULL, E_WARNING, "Skipping non-string entry in seeds array");
continue;
} else if (strrchr(Z_STRVAL_P(z_seed), ':') == NULL) {
php_error_docref(NULL, E_WARNING,
"Seed '%s' not in host:port format, ignoring", Z_STRVAL_P(z_seed));
continue;
}
/* Add as a key to avoid duplicates */
zend_hash_str_update_ptr(valid, Z_STRVAL_P(z_seed), Z_STRLEN_P(z_seed), NULL);
} ZEND_HASH_FOREACH_END();
/* Not strictly needed but null terminate anyway */
/* We need at least one valid seed */
count = zend_hash_num_elements(valid);
if (count == 0)
goto cleanup;
/* Populate our return array */
seeds = ecalloc(count, sizeof(*seeds));
ZEND_HASH_FOREACH_STR_KEY(valid, zkey) {
seeds[idx++] = zend_string_copy(zkey);
} ZEND_HASH_FOREACH_END();
*nseeds = idx;
cleanup:
zend_hash_destroy(valid);
FREE_HASHTABLE(valid);
return seeds;
}
/* Validate cluster construction arguments and return a sanitized and validated
* array of seeds */
zend_string**
cluster_validate_args(double timeout, double read_timeout, HashTable *seeds,
uint32_t *nseeds, char **errstr)
{
zend_string **retval;
if (timeout < 0L || timeout > INT_MAX) {
if (errstr) *errstr = "Invalid timeout";
return NULL;
}
if (read_timeout < 0L || read_timeout > INT_MAX) {
if (errstr) *errstr = "Invalid read timeout";
return NULL;
}
retval = get_valid_seeds(seeds, nseeds);
if (retval == NULL && errstr)
*errstr = "No valid seeds detected";
return retval;
}
/* Helper function to compare to host:port seeds */
static int cluster_cmp_seeds(const void *a, const void *b) {
zend_string *za = *(zend_string **)a;
zend_string *zb = *(zend_string **)b;
return strcmp(ZSTR_VAL(za), ZSTR_VAL(zb));
}
static void cluster_swap_seeds(void *a, void *b) {
zend_string **za, **zb, *tmp;
za = a;
zb = b;
tmp = *za;
*za = *zb;
*zb = tmp;
}
/* Turn an array of cluster seeds into a string we can cache. If we get here we know
* we have at least one entry and that every entry is a string in the form host:port */
#define SLOT_CACHE_PREFIX "phpredis_slots:"
zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t count) {
smart_str hash = {0};
size_t i;
/* Sort our seeds so any any array with identical seeds hashes to the same key
* regardless of what order the user gives them to us in. */
zend_sort(seeds, count, sizeof(*seeds), cluster_cmp_seeds, cluster_swap_seeds);
/* Global phpredis hash prefix */
smart_str_appendl(&hash, SLOT_CACHE_PREFIX, sizeof(SLOT_CACHE_PREFIX) - 1);
/* Construct our actual hash */
for (i = 0; i < count; i++) {
smart_str_appendc(&hash, '[');
smart_str_append_ex(&hash, seeds[i], 0);
smart_str_appendc(&hash, ']');
}
/* Null terminate */
smart_str_0(&hash);
/* smart_str is a zend_string internally */
/* Return the internal zend_string */
return hash.s;
}
#define SLOT_CACHING_ENABLED() (INI_INT("redis.clusters.cache_slots") == 1)
PHP_REDIS_API redisCachedCluster *cluster_cache_load(HashTable *ht_seeds) {
PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash) {
zend_resource *le;
zend_string *h;
/* Short circuit if we're not caching slots or if our seeds don't have any
* elements, since it doesn't make sense to cache an empty string */
if (!SLOT_CACHING_ENABLED() || zend_hash_num_elements(ht_seeds) == 0)
return NULL;
/* Look for cached slot information */
h = cluster_hash_seeds(ht_seeds);
le = zend_hash_find_ptr(&EG(persistent_list), h);
zend_string_release(h);
le = zend_hash_find_ptr(&EG(persistent_list), hash);
if (le != NULL) {
/* Sanity check on our list type */
@@ -2798,21 +2861,11 @@ PHP_REDIS_API redisCachedCluster *cluster_cache_load(HashTable *ht_seeds) {
}
/* Cache a cluster's slot information in persistent_list if it's enabled */
PHP_REDIS_API int cluster_cache_store(HashTable *ht_seeds, HashTable *nodes) {
PHP_REDIS_API int cluster_cache_store(zend_string *hash, HashTable *nodes) {
redisCachedCluster *cc;
zend_string *hash;
/* Short circuit if caching is disabled or there aren't any seeds */
if (!SLOT_CACHING_ENABLED()) {
return SUCCESS;
} else if (zend_hash_num_elements(ht_seeds) == 0) {
return FAILURE;
}
/* Construct our cache */
hash = cluster_hash_seeds(ht_seeds);
cc = cluster_cache_create(hash, nodes);
zend_string_release(hash);
/* Set up our resource */
#if PHP_VERSION_ID < 70300

View File

@@ -129,6 +129,8 @@
mc.kw = keyword; \
mc.kw_len = keyword_len; \
#define CLUSTER_CACHING_ENABLED() (INI_INT("redis.clusters.cache_slots") == 1)
/* Cluster redirection enum */
typedef enum CLUSTER_REDIR_TYPE {
REDIR_NONE,
@@ -358,6 +360,15 @@ void cluster_multi_fini(clusterMultiCmd *mc);
unsigned short cluster_hash_key_zval(zval *key);
unsigned short cluster_hash_key(const char *key, int len);
/* Validate and sanitize cluster construction args */
zend_string** cluster_validate_args(double timeout, double read_timeout,
HashTable *seeds, uint32_t *nseeds, char **errstr);
void free_seed_array(zend_string **seeds, uint32_t nseeds);
/* Generate a unique hash string from seeds array */
zend_string *cluster_hash_seeds(zend_string **seeds, uint32_t nseeds);
/* Get the current time in milliseconds */
long long mstime(void);
@@ -379,7 +390,7 @@ PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
int failover, int persistent);
PHP_REDIS_API void cluster_free(redisCluster *c, int free_ctx);
PHP_REDIS_API int cluster_init_seeds(redisCluster *c, HashTable *ht_seeds);
PHP_REDIS_API void cluster_init_seeds(redisCluster *c, zend_string **seeds, uint32_t nseeds);
PHP_REDIS_API int cluster_map_keyspace(redisCluster *c);
PHP_REDIS_API void cluster_free_node(redisClusterNode *node);
@@ -390,10 +401,10 @@ PHP_REDIS_API void cluster_init_cache(redisCluster *c, redisCachedCluster *rcc);
/* Functions to facilitate cluster slot caching */
PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock,
int *len);
PHP_REDIS_API int cluster_cache_store(HashTable *ht_seeds, HashTable *nodes);
PHP_REDIS_API redisCachedCluster *cluster_cache_load(HashTable *ht_seeds);
PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock, int *len);
PHP_REDIS_API int cluster_cache_store(zend_string *hash, HashTable *nodes);
PHP_REDIS_API redisCachedCluster *cluster_cache_load(zend_string *hash);
/*
* Redis Cluster response handlers. Our response handlers generally take the

View File

@@ -340,55 +340,51 @@ void free_cluster_context(zend_object *object) {
zend_object_std_dtor(&cluster->std);
}
/* Validate redis cluster construction arguments */
static int
cluster_validate_args(double timeout, double read_timeout, HashTable *seeds) {
if (timeout < 0L || timeout > INT_MAX) {
CLUSTER_THROW_EXCEPTION("Invalid timeout", 0);
return FAILURE;
}
if (read_timeout < 0L || read_timeout > INT_MAX) {
CLUSTER_THROW_EXCEPTION("Invalid read timeout", 0);
return FAILURE;
}
/* Make sure there are some seeds */
if (zend_hash_num_elements(seeds) == 0) {
CLUSTER_THROW_EXCEPTION("Must pass seeds", 0);
return FAILURE;
}
return SUCCESS;
}
/* Take user provided seeds and return unique and valid ones */
/* Attempt to connect to a Redis cluster provided seeds and timeout options */
static void redis_cluster_init(redisCluster *c, HashTable *ht_seeds, double timeout,
double read_timeout, int persistent, char *auth,
size_t auth_len)
{
zend_string *hash = NULL, **seeds;
redisCachedCluster *cc;
uint32_t nseeds;
char *err;
cluster_validate_args(timeout, read_timeout, ht_seeds);
/* Validate our arguments and get a sanitized seed array */
seeds = cluster_validate_args(timeout, read_timeout, ht_seeds, &nseeds, &err);
if (seeds == NULL) {
CLUSTER_THROW_EXCEPTION(err, 0);
return;
}
if (auth && auth_len > 0) {
if (auth && auth_len) {
c->flags->auth = zend_string_init(auth, auth_len, 0);
}
c->timeout = timeout;
c->read_timeout = read_timeout;
c->persistent = persistent;
c->waitms = timeout * 1000L;
/* Calculate the number of milliseconds we will wait when bouncing around,
* (e.g. a node goes down), which is not the same as a standard timeout. */
c->waitms = (long)(timeout * 1000);
/* Attempt to load from cache */
if ((cc = cluster_cache_load(ht_seeds))) {
cluster_init_cache(c, cc);
} else if (cluster_init_seeds(c, ht_seeds) == SUCCESS &&
cluster_map_keyspace(c) == SUCCESS)
{
cluster_cache_store(ht_seeds, c->nodes);
/* Attempt to load slots from cache if caching is enabled */
if (CLUSTER_CACHING_ENABLED()) {
/* Exit early if we can load from cache */
hash = cluster_hash_seeds(seeds, nseeds);
if ((cc = cluster_cache_load(hash))) {
cluster_init_cache(c, cc);
goto cleanup;
}
}
/* Initialize seeds and attempt to map keyspace */
cluster_init_seeds(c, seeds, nseeds);
if (cluster_map_keyspace(c) == SUCCESS && hash)
cluster_cache_store(hash, c->nodes);
cleanup:
if (hash) zend_string_release(hash);
free_seed_array(seeds, nseeds);
}
@@ -556,11 +552,7 @@ distcmd_resp_handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, short slot,
ctx->last = last;
// Attempt to send the command
if (cluster_send_command(c,slot,mc->cmd.c,mc->cmd.len) < 0 ||
c->err != NULL)
{
cluster_multi_free(mc);
zval_dtor(z_ret);
if (cluster_send_command(c,slot,mc->cmd.c,mc->cmd.len) < 0 || c->err != NULL) {
efree(ctx);
return -1;
}
@@ -865,6 +857,7 @@ static int cluster_mset_cmd(INTERNAL_FUNCTION_PARAMETERS, char *kw, int kw_len,
if (distcmd_resp_handler(INTERNAL_FUNCTION_PARAM_PASSTHRU, c,
slot, &mc, z_ret, i == argc, cb) < 0)
{
cluster_multi_free(&mc);
return -1;
}
}
@@ -890,6 +883,7 @@ static int cluster_mset_cmd(INTERNAL_FUNCTION_PARAMETERS, char *kw, int kw_len,
if (distcmd_resp_handler(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, slot, &mc,
z_ret, 1, cb) < 0)
{
cluster_multi_free(&mc);
return -1;
}
}

View File

@@ -894,7 +894,7 @@ PS_OPEN_FUNC(rediscluster) {
zval z_conf, *z_val;
HashTable *ht_conf, *ht_seeds;
double timeout = 0, read_timeout = 0;
int retval, persistent = 0, failover = REDIS_FAILOVER_NONE;
int persistent = 0, failover = REDIS_FAILOVER_NONE;
size_t prefix_len, auth_len = 0;
char *prefix, *auth = NULL;
@@ -960,35 +960,56 @@ PS_OPEN_FUNC(rediscluster) {
auth_len = Z_STRLEN_P(z_val);
}
c = cluster_create(timeout, read_timeout, failover, persistent);
if (auth && auth_len > 0) {
c->flags->auth = zend_string_init(auth, auth_len, 0);
}
redisCachedCluster *cc;
zend_string **seeds, *hash = NULL;
uint32_t nseeds;
/* Attempt to load from cache */
if ((cc = cluster_cache_load(ht_seeds))) {
cluster_init_cache(c, cc);
/* Set up our prefix */
c->flags->prefix = zend_string_init(prefix, prefix_len, 0);
PS_SET_MOD_DATA(c);
retval = SUCCESS;
} else if (!cluster_init_seeds(c, ht_seeds) && !cluster_map_keyspace(c)) {
/* Set up our prefix */
c->flags->prefix = zend_string_init(prefix, prefix_len, 0);
cluster_cache_store(ht_seeds, c->nodes);
PS_SET_MOD_DATA(c);
retval = SUCCESS;
} else {
cluster_free(c, 1);
retval = FAILURE;
/* Extract at least one valid seed or abort */
seeds = cluster_validate_args(timeout, read_timeout, ht_seeds, &nseeds, NULL);
if (seeds == NULL) {
php_error_docref(NULL, E_WARNING, "No valid seeds detected");
zval_dtor(&z_conf);
return FAILURE;
}
/* Cleanup */
zval_dtor(&z_conf);
#define CLUSTER_SESSION_CLEANUP() \
if (hash) zend_string_release(hash); \
free_seed_array(seeds, nseeds); \
zval_dtor(&z_conf); \
return retval;
c = cluster_create(timeout, read_timeout, failover, persistent);
c->flags->prefix = zend_string_init(prefix, prefix_len, 0);
if (auth && auth_len)
c->flags->auth = zend_string_init(auth, auth_len, 0);
/* First attempt to load from cache */
if (CLUSTER_CACHING_ENABLED()) {
hash = cluster_hash_seeds(seeds, nseeds);
if ((cc = cluster_cache_load(hash))) {
cluster_init_cache(c, cc);
goto success;
}
}
/* Initialize seed array, and attempt to map keyspace */
cluster_init_seeds(c, seeds, nseeds);
if (cluster_map_keyspace(c) != SUCCESS)
goto failure;
/* Now cache our cluster if caching is enabled */
if (hash)
cluster_cache_store(hash, c->nodes);
success:
CLUSTER_SESSION_CLEANUP();
PS_SET_MOD_DATA(c);
return SUCCESS;
failure:
CLUSTER_SESSION_CLEANUP();
cluster_free(c, 1);
return FAILURE;
}
/* {{{ PS_READ_FUNC