Initial commit for cluster-enabled session handling

This commit is contained in:
michael-grunder
2015-06-17 12:45:54 -07:00
parent 4ce910ad38
commit fcea71f99b
5 changed files with 322 additions and 2 deletions

View File

@@ -97,7 +97,7 @@ void cluster_free_reply(clusterReply *reply, int free_data) {
case TYPE_ERR:
case TYPE_LINE:
case TYPE_BULK:
if(free_data)
if(free_data && reply->str)
efree(reply->str);
break;
case TYPE_MULTIBULK:
@@ -785,6 +785,68 @@ static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) {
return cluster_get_asking_node(c TSRMLS_CC)->sock;
}
/* Our context seeds will be a hash table with RedisSock* pointers */
static void ht_free_seed(void *data) {
RedisSock *redis_sock = *(RedisSock**)data;
if(redis_sock) redis_free_socket(redis_sock);
}
/* Free redisClusterNode objects we've stored */
static void ht_free_node(void *data) {
redisClusterNode *node = *(redisClusterNode**)data;
cluster_free_node(node);
}
/* Construct a redisCluster object */
PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
int failover)
{
redisCluster *c;
/* Actual our actual cluster structure */
c = ecalloc(1, sizeof(redisCluster));
/* Initialize flags and settings */
c->flags = ecalloc(1, sizeof(RedisSock));
c->subscribed_slot = -1;
c->clusterdown = 0;
c->timeout = timeout;
c->read_timeout = read_timeout;
/* Set up our waitms based on timeout */
c->waitms = (long)(1000 * timeout);
/* Allocate our seeds hash table */
ALLOC_HASHTABLE(c->seeds);
zend_hash_init(c->seeds, 0, NULL, ht_free_seed, 0);
/* Allocate our nodes HashTable */
ALLOC_HASHTABLE(c->nodes);
zend_hash_init(c->nodes, 0, NULL, ht_free_node, 0);
return c;
}
PHP_REDIS_API void cluster_free(redisCluster *c) {
/* Free any allocated prefix */
if (c->flags->prefix) efree(c->flags->prefix);
efree(c->flags);
/* Call hash table destructors */
zend_hash_destroy(c->seeds);
zend_hash_destroy(c->nodes);
/* Free hash tables themselves */
efree(c->seeds);
efree(c->nodes);
/* Free any error we've got */
if (c->err) efree(c->err);
/* Free structure itself */
efree(c);
}
/* Initialize seeds */
PHP_REDIS_API int
cluster_init_seeds(redisCluster *cluster, HashTable *ht_seeds) {

View File

@@ -358,6 +358,9 @@ PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host,
PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC);
PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
int failover);
PHP_REDIS_API void cluster_free(redisCluster *c);
PHP_REDIS_API int cluster_init_seeds(redisCluster *c, HashTable *ht_seeds);
PHP_REDIS_API int cluster_map_keyspace(redisCluster *c TSRMLS_DC);
PHP_REDIS_API void cluster_free_node(redisClusterNode *node);

View File

@@ -52,6 +52,7 @@ extern int le_redis_array;
#ifdef PHP_SESSION
extern ps_module ps_mod_redis;
extern ps_module ps_mod_redis_cluster;
#endif
extern zend_class_entry *redis_array_ce;
@@ -541,6 +542,7 @@ static void add_class_constants(zend_class_entry *ce, int is_cluster TSRMLS_DC)
#ifdef PHP_SESSION
php_session_register_module(&ps_mod_redis);
php_session_register_module(&ps_mod_redis_cluster);
#endif
}

View File

@@ -34,6 +34,7 @@
#include <zend_exceptions.h>
#include "library.h"
#include "cluster_library.h"
#include "php.h"
#include "php_ini.h"
@@ -44,6 +45,9 @@
ps_module ps_mod_redis = {
PS_MOD(redis)
};
ps_module ps_mod_redis_cluster = {
PS_MOD(rediscluster)
};
typedef struct redis_pool_member_ {
@@ -333,7 +337,6 @@ redis_session_key(redis_pool_member *rpm, const char *key, int key_len, int *ses
return session;
}
/* {{{ PS_READ_FUNC
*/
PS_READ_FUNC(redis)
@@ -457,5 +460,249 @@ PS_GC_FUNC(redis)
}
/* }}} */
/**
* Redis Cluster session handler functions
*/
/* Helper to extract timeout values */
static void session_conf_timeout(HashTable *ht_conf, const char *key, int key_len,
double *val)
{
zval **z_val;
if (zend_hash_find(ht_conf, key, key_len, (void**)&z_val) == SUCCESS) {
if (Z_TYPE_PP(z_val) == IS_STRING) {
*val = atof(Z_STRVAL_PP(z_val));
} else {
*val = Z_DVAL_PP(z_val);
}
}
}
/* Prefix a session key */
static char *cluster_session_key(redisCluster *c, const char *key, int keylen,
int *skeylen, short *slot) {
char *skey;
*skeylen = keylen + c->flags->prefix_len;
skey = emalloc(*skeylen);
memcpy(skey, c->flags->prefix, c->flags->prefix_len);
memcpy(skey + c->flags->prefix_len, key, keylen);
*slot = cluster_hash_key(skey, *skeylen);
return skey;
}
PS_OPEN_FUNC(rediscluster) {
redisCluster *c;
zval *z_conf, **z_val;
HashTable *ht_conf, *ht_seeds;
double timeout = 0, read_timeout = 0;
int retval, prefix_len, failover = REDIS_FAILOVER_NONE;
char *prefix;
/* Parse configuration for session handler */
MAKE_STD_ZVAL(z_conf);
array_init(z_conf);
sapi_module.treat_data(PARSE_STRING, estrdup(save_path), z_conf TSRMLS_CC);
/* Sanity check that we're able to parse and have a seeds array */
if (Z_TYPE_P(z_conf) != IS_ARRAY ||
zend_hash_find(Z_ARRVAL_P(z_conf), "seed", sizeof("seed"), (void**)&z_val) == FAILURE ||
Z_TYPE_PP(z_val) != IS_ARRAY)
{
zval_dtor(z_conf);
efree(z_conf);
return FAILURE;
}
/* Grab a copy of our config hash table and keep seeds array */
ht_conf = Z_ARRVAL_P(z_conf);
ht_seeds = Z_ARRVAL_PP(z_val);
/* Grab timeouts if they were specified */
session_conf_timeout(ht_conf, "timeout", sizeof("timeout"), &timeout);
session_conf_timeout(ht_conf, "read_timeout", sizeof("read_timeout"), &read_timeout);
/* Look for a specific prefix */
if (zend_hash_find(ht_conf, "prefix", sizeof("prefix"), (void**)&z_val) == SUCCESS &&
Z_TYPE_PP(z_val) == IS_STRING && Z_STRLEN_PP(z_val) > 0)
{
prefix = Z_STRVAL_PP(z_val);
prefix_len = Z_STRLEN_PP(z_val);
} else {
prefix = "PHPREDIS_CLUSTER_SESSION:";
prefix_len = sizeof("PHPREDIS_CLUSTER_SESSION:")-1;
}
/* Look for a specific failover setting */
if (zend_hash_find(ht_conf, "failover", sizeof("failover"), (void**)&z_val) == SUCCESS &&
Z_TYPE_PP(z_val) == IS_STRING)
{
if (!strcasecmp(Z_STRVAL_PP(z_val), "error")) {
failover = REDIS_FAILOVER_ERROR;
} else if (!strcasecmp(Z_STRVAL_PP(z_val), "distribute")) {
failover = REDIS_FAILOVER_DISTRIBUTE;
}
}
c = cluster_create(timeout, read_timeout, failover);
if (!cluster_init_seeds(c, ht_seeds) && !cluster_map_keyspace(c TSRMLS_CC)) {
/* Set up our prefix */
c->flags->prefix = estrndup(prefix, prefix_len);
c->flags->prefix_len = prefix_len;
PS_SET_MOD_DATA(c);
retval = SUCCESS;
} else {
cluster_free(c);
retval = FAILURE;
}
/* Cleanup */
zval_dtor(z_conf);
efree(z_conf);
return retval;
}
/* {{{ PS_READ_FUNC
*/
PS_READ_FUNC(rediscluster) {
redisCluster *c = PS_GET_MOD_DATA();
clusterReply *reply;
char *cmd, *skey;
int cmdlen, skeylen;
short slot;
/* Set up our command and slot information */
skey = cluster_session_key(c, key, strlen(key), &skeylen, &slot);
cmdlen = redis_cmd_format_static(&cmd, "GET", "s", skey, skeylen);
efree(skey);
/* Attempt to kick off our command */
c->readonly = 1;
if (cluster_send_command(c,slot,cmd,cmdlen TSRMLS_CC)<0 || c->err) {
efree(cmd);
return FAILURE;
}
/* Clean up command */
efree(cmd);
/* Attempt to read reply */
reply = cluster_read_resp(c TSRMLS_CC);
if (!reply || c->err) {
if (reply) cluster_free_reply(reply, 1);
return FAILURE;
}
/* Push reply value to caller */
*val = reply->str;
*vallen = reply->len;
/* Clean up */
cluster_free_reply(reply, 0);
/* Success! */
return SUCCESS;
}
/* {{{ PS_WRITE_FUNC
*/
PS_WRITE_FUNC(rediscluster) {
redisCluster *c = PS_GET_MOD_DATA();
clusterReply *reply;
char *cmd, *skey;
int cmdlen, skeylen;
short slot;
/* Set up command and slot info */
skey = cluster_session_key(c, key, strlen(key), &skeylen, &slot);
cmdlen = redis_cmd_format_static(&cmd, "SETEX", "sds", skey, skeylen,
INI_INT("session.gc_maxlifetime"),
val, vallen);
efree(skey);
/* Attempt to send command */
c->readonly = 0;
if (cluster_send_command(c,slot,cmd,cmdlen TSRMLS_CC)<0 || c->err) {
efree(cmd);
return FAILURE;
}
/* Clean up our command */
efree(cmd);
/* Attempt to read reply */
reply = cluster_read_resp(c TSRMLS_CC);
if (!reply || c->err) {
if (reply) cluster_free_reply(reply, 1);
return FAILURE;
}
/* Clean up*/
cluster_free_reply(reply, 1);
return SUCCESS;
}
/* {{{ PS_DESTROY_FUNC(rediscluster)
*/
PS_DESTROY_FUNC(rediscluster) {
redisCluster *c = PS_GET_MOD_DATA();
clusterReply *reply;
char *cmd, *skey;
int cmdlen, skeylen;
short slot;
/* Set up command and slot info */
skey = cluster_session_key(c, key, strlen(key), &skeylen, &slot);
cmdlen = redis_cmd_format_static(&cmd, "DEL", "s", skey, skeylen);
efree(skey);
/* Attempt to send command */
if (cluster_send_command(c,slot,cmd,cmdlen TSRMLS_CC)<0 || c->err) {
if (reply) cluster_free_reply(reply, 1);
efree(cmd);
return FAILURE;
}
/* Clean up our command */
efree(cmd);
/* Attempt to read reply */
reply = cluster_read_resp(c TSRMLS_CC);
if (!reply || c->err) {
if (reply) cluster_free_reply(reply, 1);
return FAILURE;
}
/* Clean up our reply */
cluster_free_reply(reply, 1);
return SUCCESS;
}
/* {{{ PS_CLOSE_FUNC
*/
PS_CLOSE_FUNC(rediscluster)
{
redisCluster *c = PS_GET_MOD_DATA();
if (c) {
cluster_free(c);
PS_SET_MOD_DATA(NULL);
}
return SUCCESS;
}
/* {{{ PS_GC_FUNC
*/
PS_GC_FUNC(rediscluster) {
return SUCCESS;
}
#endif
/* vim: set tabstop=4 expandtab: */

View File

@@ -10,6 +10,12 @@ PS_WRITE_FUNC(redis);
PS_DESTROY_FUNC(redis);
PS_GC_FUNC(redis);
PS_OPEN_FUNC(rediscluster);
PS_CLOSE_FUNC(rediscluster);
PS_READ_FUNC(rediscluster);
PS_WRITE_FUNC(rediscluster);
PS_DESTROY_FUNC(rediscluster);
PS_GC_FUNC(rediscluster);
#endif
#endif