From fcea71f99bb5fc3873539d42e1e29000fa36ec46 Mon Sep 17 00:00:00 2001 From: michael-grunder Date: Wed, 17 Jun 2015 12:45:54 -0700 Subject: [PATCH] Initial commit for cluster-enabled session handling --- cluster_library.c | 64 +++++++++++- cluster_library.h | 3 + redis.c | 2 + redis_session.c | 249 +++++++++++++++++++++++++++++++++++++++++++++- redis_session.h | 6 ++ 5 files changed, 322 insertions(+), 2 deletions(-) diff --git a/cluster_library.c b/cluster_library.c index 93885c8..b6a760e 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -97,7 +97,7 @@ void cluster_free_reply(clusterReply *reply, int free_data) { case TYPE_ERR: case TYPE_LINE: case TYPE_BULK: - if(free_data) + if(free_data && reply->str) efree(reply->str); break; case TYPE_MULTIBULK: @@ -785,6 +785,68 @@ static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) { return cluster_get_asking_node(c TSRMLS_CC)->sock; } +/* Our context seeds will be a hash table with RedisSock* pointers */ +static void ht_free_seed(void *data) { + RedisSock *redis_sock = *(RedisSock**)data; + if(redis_sock) redis_free_socket(redis_sock); +} + +/* Free redisClusterNode objects we've stored */ +static void ht_free_node(void *data) { + redisClusterNode *node = *(redisClusterNode**)data; + cluster_free_node(node); +} + +/* Construct a redisCluster object */ +PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout, + int failover) +{ + redisCluster *c; + + /* Actual our actual cluster structure */ + c = ecalloc(1, sizeof(redisCluster)); + + /* Initialize flags and settings */ + c->flags = ecalloc(1, sizeof(RedisSock)); + c->subscribed_slot = -1; + c->clusterdown = 0; + c->timeout = timeout; + c->read_timeout = read_timeout; + + /* Set up our waitms based on timeout */ + c->waitms = (long)(1000 * timeout); + + /* Allocate our seeds hash table */ + ALLOC_HASHTABLE(c->seeds); + zend_hash_init(c->seeds, 0, NULL, ht_free_seed, 0); + + /* Allocate our nodes HashTable */ + ALLOC_HASHTABLE(c->nodes); + zend_hash_init(c->nodes, 0, NULL, ht_free_node, 0); + + return c; +} + +PHP_REDIS_API void cluster_free(redisCluster *c) { + /* Free any allocated prefix */ + if (c->flags->prefix) efree(c->flags->prefix); + efree(c->flags); + + /* Call hash table destructors */ + zend_hash_destroy(c->seeds); + zend_hash_destroy(c->nodes); + + /* Free hash tables themselves */ + efree(c->seeds); + efree(c->nodes); + + /* Free any error we've got */ + if (c->err) efree(c->err); + + /* Free structure itself */ + efree(c); +} + /* Initialize seeds */ PHP_REDIS_API int cluster_init_seeds(redisCluster *cluster, HashTable *ht_seeds) { diff --git a/cluster_library.h b/cluster_library.h index a5020dc..e83ef2f 100644 --- a/cluster_library.h +++ b/cluster_library.h @@ -358,6 +358,9 @@ PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host, PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd, int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC); +PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout, + int failover); +PHP_REDIS_API void cluster_free(redisCluster *c); PHP_REDIS_API int cluster_init_seeds(redisCluster *c, HashTable *ht_seeds); PHP_REDIS_API int cluster_map_keyspace(redisCluster *c TSRMLS_DC); PHP_REDIS_API void cluster_free_node(redisClusterNode *node); diff --git a/redis.c b/redis.c index b338e07..313672c 100644 --- a/redis.c +++ b/redis.c @@ -52,6 +52,7 @@ extern int le_redis_array; #ifdef PHP_SESSION extern ps_module ps_mod_redis; +extern ps_module ps_mod_redis_cluster; #endif extern zend_class_entry *redis_array_ce; @@ -541,6 +542,7 @@ static void add_class_constants(zend_class_entry *ce, int is_cluster TSRMLS_DC) #ifdef PHP_SESSION php_session_register_module(&ps_mod_redis); + php_session_register_module(&ps_mod_redis_cluster); #endif } diff --git a/redis_session.c b/redis_session.c index 0416cdc..7183473 100644 --- a/redis_session.c +++ b/redis_session.c @@ -34,6 +34,7 @@ #include #include "library.h" +#include "cluster_library.h" #include "php.h" #include "php_ini.h" @@ -44,6 +45,9 @@ ps_module ps_mod_redis = { PS_MOD(redis) }; +ps_module ps_mod_redis_cluster = { + PS_MOD(rediscluster) +}; typedef struct redis_pool_member_ { @@ -333,7 +337,6 @@ redis_session_key(redis_pool_member *rpm, const char *key, int key_len, int *ses return session; } - /* {{{ PS_READ_FUNC */ PS_READ_FUNC(redis) @@ -457,5 +460,249 @@ PS_GC_FUNC(redis) } /* }}} */ +/** + * Redis Cluster session handler functions + */ + +/* Helper to extract timeout values */ +static void session_conf_timeout(HashTable *ht_conf, const char *key, int key_len, + double *val) +{ + zval **z_val; + + if (zend_hash_find(ht_conf, key, key_len, (void**)&z_val) == SUCCESS) { + if (Z_TYPE_PP(z_val) == IS_STRING) { + *val = atof(Z_STRVAL_PP(z_val)); + } else { + *val = Z_DVAL_PP(z_val); + } + } +} + +/* Prefix a session key */ +static char *cluster_session_key(redisCluster *c, const char *key, int keylen, + int *skeylen, short *slot) { + char *skey; + + *skeylen = keylen + c->flags->prefix_len; + skey = emalloc(*skeylen); + memcpy(skey, c->flags->prefix, c->flags->prefix_len); + memcpy(skey + c->flags->prefix_len, key, keylen); + + *slot = cluster_hash_key(skey, *skeylen); + + return skey; +} + +PS_OPEN_FUNC(rediscluster) { + redisCluster *c; + zval *z_conf, **z_val; + HashTable *ht_conf, *ht_seeds; + double timeout = 0, read_timeout = 0; + int retval, prefix_len, failover = REDIS_FAILOVER_NONE; + char *prefix; + + /* Parse configuration for session handler */ + MAKE_STD_ZVAL(z_conf); + array_init(z_conf); + sapi_module.treat_data(PARSE_STRING, estrdup(save_path), z_conf TSRMLS_CC); + + /* Sanity check that we're able to parse and have a seeds array */ + if (Z_TYPE_P(z_conf) != IS_ARRAY || + zend_hash_find(Z_ARRVAL_P(z_conf), "seed", sizeof("seed"), (void**)&z_val) == FAILURE || + Z_TYPE_PP(z_val) != IS_ARRAY) + { + zval_dtor(z_conf); + efree(z_conf); + return FAILURE; + } + + /* Grab a copy of our config hash table and keep seeds array */ + ht_conf = Z_ARRVAL_P(z_conf); + ht_seeds = Z_ARRVAL_PP(z_val); + + /* Grab timeouts if they were specified */ + session_conf_timeout(ht_conf, "timeout", sizeof("timeout"), &timeout); + session_conf_timeout(ht_conf, "read_timeout", sizeof("read_timeout"), &read_timeout); + + /* Look for a specific prefix */ + if (zend_hash_find(ht_conf, "prefix", sizeof("prefix"), (void**)&z_val) == SUCCESS && + Z_TYPE_PP(z_val) == IS_STRING && Z_STRLEN_PP(z_val) > 0) + { + prefix = Z_STRVAL_PP(z_val); + prefix_len = Z_STRLEN_PP(z_val); + } else { + prefix = "PHPREDIS_CLUSTER_SESSION:"; + prefix_len = sizeof("PHPREDIS_CLUSTER_SESSION:")-1; + } + + /* Look for a specific failover setting */ + if (zend_hash_find(ht_conf, "failover", sizeof("failover"), (void**)&z_val) == SUCCESS && + Z_TYPE_PP(z_val) == IS_STRING) + { + if (!strcasecmp(Z_STRVAL_PP(z_val), "error")) { + failover = REDIS_FAILOVER_ERROR; + } else if (!strcasecmp(Z_STRVAL_PP(z_val), "distribute")) { + failover = REDIS_FAILOVER_DISTRIBUTE; + } + } + + c = cluster_create(timeout, read_timeout, failover); + if (!cluster_init_seeds(c, ht_seeds) && !cluster_map_keyspace(c TSRMLS_CC)) { + /* Set up our prefix */ + c->flags->prefix = estrndup(prefix, prefix_len); + c->flags->prefix_len = prefix_len; + + PS_SET_MOD_DATA(c); + retval = SUCCESS; + } else { + cluster_free(c); + retval = FAILURE; + } + + /* Cleanup */ + zval_dtor(z_conf); + efree(z_conf); + + return retval; +} + +/* {{{ PS_READ_FUNC + */ +PS_READ_FUNC(rediscluster) { + redisCluster *c = PS_GET_MOD_DATA(); + clusterReply *reply; + char *cmd, *skey; + int cmdlen, skeylen; + short slot; + + /* Set up our command and slot information */ + skey = cluster_session_key(c, key, strlen(key), &skeylen, &slot); + cmdlen = redis_cmd_format_static(&cmd, "GET", "s", skey, skeylen); + efree(skey); + + /* Attempt to kick off our command */ + c->readonly = 1; + if (cluster_send_command(c,slot,cmd,cmdlen TSRMLS_CC)<0 || c->err) { + efree(cmd); + return FAILURE; + } + + /* Clean up command */ + efree(cmd); + + /* Attempt to read reply */ + reply = cluster_read_resp(c TSRMLS_CC); + if (!reply || c->err) { + if (reply) cluster_free_reply(reply, 1); + return FAILURE; + } + + /* Push reply value to caller */ + *val = reply->str; + *vallen = reply->len; + + /* Clean up */ + cluster_free_reply(reply, 0); + + /* Success! */ + return SUCCESS; +} + +/* {{{ PS_WRITE_FUNC + */ +PS_WRITE_FUNC(rediscluster) { + redisCluster *c = PS_GET_MOD_DATA(); + clusterReply *reply; + char *cmd, *skey; + int cmdlen, skeylen; + short slot; + + /* Set up command and slot info */ + skey = cluster_session_key(c, key, strlen(key), &skeylen, &slot); + cmdlen = redis_cmd_format_static(&cmd, "SETEX", "sds", skey, skeylen, + INI_INT("session.gc_maxlifetime"), + val, vallen); + efree(skey); + + /* Attempt to send command */ + c->readonly = 0; + if (cluster_send_command(c,slot,cmd,cmdlen TSRMLS_CC)<0 || c->err) { + efree(cmd); + return FAILURE; + } + + /* Clean up our command */ + efree(cmd); + + /* Attempt to read reply */ + reply = cluster_read_resp(c TSRMLS_CC); + if (!reply || c->err) { + if (reply) cluster_free_reply(reply, 1); + return FAILURE; + } + + /* Clean up*/ + cluster_free_reply(reply, 1); + + return SUCCESS; +} + +/* {{{ PS_DESTROY_FUNC(rediscluster) + */ +PS_DESTROY_FUNC(rediscluster) { + redisCluster *c = PS_GET_MOD_DATA(); + clusterReply *reply; + char *cmd, *skey; + int cmdlen, skeylen; + short slot; + + /* Set up command and slot info */ + skey = cluster_session_key(c, key, strlen(key), &skeylen, &slot); + cmdlen = redis_cmd_format_static(&cmd, "DEL", "s", skey, skeylen); + efree(skey); + + /* Attempt to send command */ + if (cluster_send_command(c,slot,cmd,cmdlen TSRMLS_CC)<0 || c->err) { + if (reply) cluster_free_reply(reply, 1); + efree(cmd); + return FAILURE; + } + + /* Clean up our command */ + efree(cmd); + + /* Attempt to read reply */ + reply = cluster_read_resp(c TSRMLS_CC); + if (!reply || c->err) { + if (reply) cluster_free_reply(reply, 1); + return FAILURE; + } + + /* Clean up our reply */ + cluster_free_reply(reply, 1); + + return SUCCESS; +} + +/* {{{ PS_CLOSE_FUNC + */ +PS_CLOSE_FUNC(rediscluster) +{ + redisCluster *c = PS_GET_MOD_DATA(); + if (c) { + cluster_free(c); + PS_SET_MOD_DATA(NULL); + } + return SUCCESS; +} + +/* {{{ PS_GC_FUNC + */ +PS_GC_FUNC(rediscluster) { + return SUCCESS; +} + #endif + /* vim: set tabstop=4 expandtab: */ diff --git a/redis_session.h b/redis_session.h index ce74e23..11f861c 100644 --- a/redis_session.h +++ b/redis_session.h @@ -10,6 +10,12 @@ PS_WRITE_FUNC(redis); PS_DESTROY_FUNC(redis); PS_GC_FUNC(redis); +PS_OPEN_FUNC(rediscluster); +PS_CLOSE_FUNC(rediscluster); +PS_READ_FUNC(rediscluster); +PS_WRITE_FUNC(rediscluster); +PS_DESTROY_FUNC(rediscluster); +PS_GC_FUNC(rediscluster); #endif #endif