mirror of
https://github.com/php-win-ext/phpredis.git
synced 2026-03-24 00:52:16 +01:00
Merge pull request #2123 from phpredis/pubsub
Refactor subscribe/unsubscribe
This commit is contained in:
@@ -1857,8 +1857,8 @@ PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *
|
||||
|
||||
// Set up our callback pointers
|
||||
zval z_ret, z_args[4];
|
||||
sctx->cb.retval = &z_ret;
|
||||
sctx->cb.params = z_args;
|
||||
sctx->cb.fci.retval = &z_ret;
|
||||
sctx->cb.fci.params = z_args;
|
||||
|
||||
/* We're in a subscribe loop */
|
||||
c->subscribed_slot = c->cmd_slot;
|
||||
@@ -1911,12 +1911,10 @@ PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *
|
||||
}
|
||||
|
||||
// Set arg count
|
||||
sctx->cb.param_count = tab_idx;
|
||||
sctx->cb.fci.param_count = tab_idx;
|
||||
|
||||
// Execute our callback
|
||||
if (zend_call_function(&(sctx->cb), &(sctx->cb_cache)) !=
|
||||
SUCCESS)
|
||||
{
|
||||
if (zend_call_function(&sctx->cb.fci, &sctx->cb.fci_cache) != SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
2
common.h
2
common.h
@@ -289,7 +289,7 @@ typedef struct {
|
||||
int persistent;
|
||||
int watching;
|
||||
zend_string *persistent_id;
|
||||
|
||||
HashTable *subs;
|
||||
redis_serializer serializer;
|
||||
int compression;
|
||||
int compression_level;
|
||||
|
||||
136
library.c
136
library.c
@@ -438,58 +438,87 @@ redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
ht_free_subs(zval *data)
|
||||
{
|
||||
efree(Z_PTR_P(data));
|
||||
}
|
||||
|
||||
PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
RedisSock *redis_sock, zval *z_tab,
|
||||
void *ctx)
|
||||
{
|
||||
HashTable *subs;
|
||||
subscribeCallback *cb;
|
||||
subscribeContext *sctx = (subscribeContext*)ctx;
|
||||
zval *z_tmp, z_resp;
|
||||
|
||||
ALLOC_HASHTABLE(subs);
|
||||
zend_hash_init(subs, 0, NULL, ht_free_subs, 0);
|
||||
// Consume response(s) from subscribe, which will vary on argc
|
||||
while(sctx->argc--) {
|
||||
ZVAL_NULL(&z_resp);
|
||||
if (!redis_sock_read_multibulk_reply_zval(
|
||||
INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)
|
||||
) {
|
||||
efree(sctx);
|
||||
return -1;
|
||||
goto error;
|
||||
}
|
||||
|
||||
// We'll need to find the command response
|
||||
if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_resp), 0)) == NULL) {
|
||||
zval_dtor(&z_resp);
|
||||
efree(sctx);
|
||||
return -1;
|
||||
goto error;
|
||||
}
|
||||
|
||||
// Make sure the command response matches the command we called
|
||||
if(strcasecmp(Z_STRVAL_P(z_tmp), sctx->kw) !=0) {
|
||||
zval_dtor(&z_resp);
|
||||
efree(sctx);
|
||||
return -1;
|
||||
goto error;
|
||||
}
|
||||
|
||||
if ((z_tmp = zend_hash_index_find(Z_ARRVAL(z_resp), 1)) == NULL) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
zend_hash_str_update_mem(subs, Z_STRVAL_P(z_tmp), Z_STRLEN_P(z_tmp),
|
||||
&sctx->cb, sizeof(sctx->cb));
|
||||
|
||||
zval_dtor(&z_resp);
|
||||
}
|
||||
|
||||
zval z_ret, z_args[4];
|
||||
sctx->cb.retval = &z_ret;
|
||||
sctx->cb.params = z_args;
|
||||
efree(sctx);
|
||||
|
||||
if (redis_sock->subs) {
|
||||
zend_string *zkey;
|
||||
|
||||
ZEND_HASH_FOREACH_STR_KEY_PTR(subs, zkey, cb) {
|
||||
zend_hash_update_mem(redis_sock->subs, zkey, cb, sizeof(*cb));
|
||||
} ZEND_HASH_FOREACH_END();
|
||||
zend_hash_destroy(subs);
|
||||
efree(subs);
|
||||
|
||||
RETVAL_TRUE;
|
||||
return SUCCESS;
|
||||
}
|
||||
|
||||
redis_sock->subs = subs;
|
||||
/* Multibulk response, {[pattern], type, channel, payload } */
|
||||
while(1) {
|
||||
zval *z_type, *z_chan, *z_pat = NULL, *z_data;
|
||||
while (redis_sock->subs) {
|
||||
zval z_ret, z_args[4], *z_type, *z_chan, *z_pat = NULL, *z_data;
|
||||
HashTable *ht_tab;
|
||||
int tab_idx=1, is_pmsg;
|
||||
int tab_idx = 1, is_pmsg = 0;
|
||||
|
||||
ZVAL_NULL(&z_resp);
|
||||
if (!redis_sock_read_multibulk_reply_zval(
|
||||
INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)) break;
|
||||
INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp)
|
||||
) {
|
||||
goto failure;
|
||||
}
|
||||
|
||||
ht_tab = Z_ARRVAL(z_resp);
|
||||
|
||||
if ((z_type = zend_hash_index_find(ht_tab, 0)) == NULL ||
|
||||
Z_TYPE_P(z_type) != IS_STRING
|
||||
) {
|
||||
break;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
// Check for message or pmessage
|
||||
@@ -498,13 +527,14 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
{
|
||||
is_pmsg = *Z_STRVAL_P(z_type)=='p';
|
||||
} else {
|
||||
break;
|
||||
zval_dtor(&z_resp);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Extract pattern if it's a pmessage
|
||||
if(is_pmsg) {
|
||||
if ((z_pat = zend_hash_index_find(ht_tab, tab_idx++)) == NULL) {
|
||||
break;
|
||||
goto failure;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -512,7 +542,11 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
if ((z_chan = zend_hash_index_find(ht_tab, tab_idx++)) == NULL ||
|
||||
(z_data = zend_hash_index_find(ht_tab, tab_idx++)) == NULL
|
||||
) {
|
||||
break;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
if ((cb = zend_hash_str_find_ptr(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan))) == NULL) {
|
||||
goto failure;
|
||||
}
|
||||
|
||||
// Different args for SUBSCRIBE and PSUBSCRIBE
|
||||
@@ -527,13 +561,13 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
}
|
||||
|
||||
// Set arg count
|
||||
sctx->cb.param_count = tab_idx;
|
||||
cb->fci.param_count = tab_idx;
|
||||
cb->fci.retval = &z_ret;
|
||||
cb->fci.params = z_args;
|
||||
|
||||
// Execute callback
|
||||
if(zend_call_function(&(sctx->cb), &(sctx->cb_cache))
|
||||
==FAILURE)
|
||||
{
|
||||
break;
|
||||
if (zend_call_function(&cb->fci, &cb->fci_cache) != SUCCESS) {
|
||||
goto failure;
|
||||
}
|
||||
|
||||
// If we have a return value free it
|
||||
@@ -541,11 +575,18 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
zval_dtor(&z_resp);
|
||||
}
|
||||
|
||||
// This is an error state, clean up
|
||||
zval_dtor(&z_resp);
|
||||
efree(sctx);
|
||||
RETVAL_TRUE;
|
||||
return SUCCESS;
|
||||
|
||||
return -1;
|
||||
// This is an error state, clean up
|
||||
error:
|
||||
efree(sctx);
|
||||
zend_hash_destroy(subs);
|
||||
efree(subs);
|
||||
failure:
|
||||
zval_dtor(&z_resp);
|
||||
RETVAL_FALSE;
|
||||
return FAILURE;
|
||||
}
|
||||
|
||||
PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
@@ -553,31 +594,45 @@ PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS,
|
||||
void *ctx)
|
||||
{
|
||||
subscribeContext *sctx = (subscribeContext*)ctx;
|
||||
zval *z_chan, zv, *z_ret = &zv, z_resp;
|
||||
int i;
|
||||
zval *z_chan, z_ret, z_resp;
|
||||
|
||||
array_init(z_ret);
|
||||
array_init(&z_ret);
|
||||
|
||||
for (i = 0; i < sctx->argc; i++) {
|
||||
while (sctx->argc--) {
|
||||
ZVAL_NULL(&z_resp);
|
||||
if (!redis_sock_read_multibulk_reply_zval(
|
||||
INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, &z_resp) ||
|
||||
(z_chan = zend_hash_index_find(Z_ARRVAL(z_resp), 1)) == NULL
|
||||
) {
|
||||
zval_dtor(z_ret);
|
||||
return -1;
|
||||
efree(sctx);
|
||||
zval_dtor(&z_resp);
|
||||
zval_dtor(&z_ret);
|
||||
RETVAL_FALSE;
|
||||
return FAILURE;
|
||||
}
|
||||
|
||||
add_assoc_bool(z_ret, Z_STRVAL_P(z_chan), 1);
|
||||
if (!redis_sock->subs ||
|
||||
!zend_hash_str_exists(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan))
|
||||
) {
|
||||
add_assoc_bool_ex(&z_ret, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan), 0);
|
||||
} else {
|
||||
zend_hash_str_del(redis_sock->subs, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan));
|
||||
add_assoc_bool_ex(&z_ret, Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan), 1);
|
||||
}
|
||||
|
||||
zval_dtor(&z_resp);
|
||||
}
|
||||
|
||||
efree(sctx);
|
||||
|
||||
RETVAL_ZVAL(z_ret, 0, 1);
|
||||
if (redis_sock->subs && !zend_hash_num_elements(redis_sock->subs)) {
|
||||
zend_hash_destroy(redis_sock->subs);
|
||||
efree(redis_sock->subs);
|
||||
redis_sock->subs = NULL;
|
||||
}
|
||||
|
||||
// Success
|
||||
return 0;
|
||||
RETVAL_ZVAL(&z_ret, 0, 1);
|
||||
return SUCCESS;
|
||||
}
|
||||
|
||||
PHP_REDIS_API zval *
|
||||
@@ -2851,6 +2906,11 @@ PHP_REDIS_API void redis_free_socket(RedisSock *redis_sock)
|
||||
if (redis_sock->host) {
|
||||
zend_string_release(redis_sock->host);
|
||||
}
|
||||
if (redis_sock->subs) {
|
||||
zend_hash_destroy(redis_sock->subs);
|
||||
efree(redis_sock->subs);
|
||||
redis_sock->subs = NULL;
|
||||
}
|
||||
redis_sock_free_auth(redis_sock);
|
||||
free_reply_callbacks(redis_sock);
|
||||
efree(redis_sock);
|
||||
|
||||
@@ -319,7 +319,7 @@ public function persist(string $key): bool;
|
||||
/** @return bool|Redis */
|
||||
public function psetex(string $key, int $expire, mixed $value);
|
||||
|
||||
public function psubscribe(array $patterns): void;
|
||||
public function psubscribe(array $patterns, callable $cb): bool;
|
||||
|
||||
public function pttl(string $key): int;
|
||||
|
||||
@@ -327,7 +327,7 @@ public function persist(string $key): bool;
|
||||
|
||||
public function pubsub(string $command, mixed $arg = null): mixed;
|
||||
|
||||
public function punsubscribe(array $patterns): array;
|
||||
public function punsubscribe(array $patterns): bool|array;
|
||||
|
||||
public function rPop(string $key, int $count = 0): bool|string|array;
|
||||
|
||||
@@ -439,7 +439,7 @@ public function persist(string $key): bool;
|
||||
/** @return int|Redis */
|
||||
public function strlen(string $key);
|
||||
|
||||
public function subscribe(string $channel, string ...$other_channels): array;
|
||||
public function subscribe(array $channels, callable $cb): bool;
|
||||
|
||||
public function swapdb(string $src, string $dst): bool;
|
||||
|
||||
@@ -455,7 +455,7 @@ public function persist(string $key): bool;
|
||||
*/
|
||||
public function unlink(array|string $key, string ...$other_keys);
|
||||
|
||||
public function unsubscribe(string $channel, string ...$other_channels): array;
|
||||
public function unsubscribe(array $channels): bool|array;
|
||||
|
||||
/** @return bool|Redis */
|
||||
public function unwatch();
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/* This is a generated file, edit the .stub.php file instead.
|
||||
* Stub hash: 9671c30926e8d581a126833360b123c8ae2dd913 */
|
||||
* Stub hash: efcda1ed028d65d0b4848d32133dc0e32f17871f */
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis___construct, 0, 0, 0)
|
||||
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, options, IS_ARRAY, 0, "null")
|
||||
@@ -541,8 +541,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psetex, 0, 0, 3)
|
||||
ZEND_ARG_TYPE_INFO(0, value, IS_MIXED, 0)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_psubscribe, 0, 1, IS_VOID, 0)
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_psubscribe, 0, 2, _IS_BOOL, 0)
|
||||
ZEND_ARG_TYPE_INFO(0, patterns, IS_ARRAY, 0)
|
||||
ZEND_ARG_TYPE_INFO(0, cb, IS_CALLABLE, 0)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_pttl arginfo_class_Redis_hLen
|
||||
@@ -557,7 +558,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_pubsub, 0, 1, IS_MIX
|
||||
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, arg, IS_MIXED, 0, "null")
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_punsubscribe, 0, 1, IS_ARRAY, 0)
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_MASK_EX(arginfo_class_Redis_punsubscribe, 0, 1, MAY_BE_BOOL|MAY_BE_ARRAY)
|
||||
ZEND_ARG_TYPE_INFO(0, patterns, IS_ARRAY, 0)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
@@ -732,9 +733,9 @@ ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_strlen arginfo_class_Redis_decr
|
||||
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_subscribe, 0, 1, IS_ARRAY, 0)
|
||||
ZEND_ARG_TYPE_INFO(0, channel, IS_STRING, 0)
|
||||
ZEND_ARG_VARIADIC_TYPE_INFO(0, other_channels, IS_STRING, 0)
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_subscribe, 0, 2, _IS_BOOL, 0)
|
||||
ZEND_ARG_TYPE_INFO(0, channels, IS_ARRAY, 0)
|
||||
ZEND_ARG_TYPE_INFO(0, cb, IS_CALLABLE, 0)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Redis_swapdb, 0, 2, _IS_BOOL, 0)
|
||||
@@ -750,7 +751,9 @@ ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_unlink arginfo_class_Redis_del
|
||||
|
||||
#define arginfo_class_Redis_unsubscribe arginfo_class_Redis_subscribe
|
||||
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_MASK_EX(arginfo_class_Redis_unsubscribe, 0, 1, MAY_BE_BOOL|MAY_BE_ARRAY)
|
||||
ZEND_ARG_TYPE_INFO(0, channels, IS_ARRAY, 0)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_unwatch arginfo_class_Redis___destruct
|
||||
|
||||
|
||||
@@ -1101,7 +1101,7 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
|
||||
char *key;
|
||||
|
||||
if (zend_parse_parameters(ZEND_NUM_ARGS(), "af", &z_arr,
|
||||
&(sctx->cb), &(sctx->cb_cache)) == FAILURE)
|
||||
&sctx->cb.fci, &sctx->cb.fci_cache) == FAILURE)
|
||||
{
|
||||
efree(sctx);
|
||||
return FAILURE;
|
||||
|
||||
@@ -14,11 +14,15 @@
|
||||
if (slot) *slot = cluster_hash_key(key,key_len);
|
||||
|
||||
/* Simple container so we can push subscribe context out */
|
||||
typedef struct {
|
||||
zend_fcall_info fci;
|
||||
zend_fcall_info_cache fci_cache;
|
||||
} subscribeCallback;
|
||||
|
||||
typedef struct subscribeContext {
|
||||
char *kw;
|
||||
int argc;
|
||||
zend_fcall_info cb;
|
||||
zend_fcall_info_cache cb_cache;
|
||||
subscribeCallback cb;
|
||||
} subscribeContext;
|
||||
|
||||
/* Construct a raw command */
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/* This is a generated file, edit the .stub.php file instead.
|
||||
* Stub hash: 9671c30926e8d581a126833360b123c8ae2dd913 */
|
||||
* Stub hash: efcda1ed028d65d0b4848d32133dc0e32f17871f */
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis___construct, 0, 0, 0)
|
||||
ZEND_ARG_INFO(0, options)
|
||||
@@ -472,8 +472,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psetex, 0, 0, 3)
|
||||
ZEND_ARG_INFO(0, value)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psubscribe, 0, 0, 1)
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_psubscribe, 0, 0, 2)
|
||||
ZEND_ARG_INFO(0, patterns)
|
||||
ZEND_ARG_INFO(0, cb)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_pttl arginfo_class_Redis__prefix
|
||||
@@ -488,7 +489,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_pubsub, 0, 0, 1)
|
||||
ZEND_ARG_INFO(0, arg)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_punsubscribe arginfo_class_Redis_psubscribe
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_punsubscribe, 0, 0, 1)
|
||||
ZEND_ARG_INFO(0, patterns)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_rPop arginfo_class_Redis_lPop
|
||||
|
||||
@@ -640,9 +643,9 @@ ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_strlen arginfo_class_Redis__prefix
|
||||
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_subscribe, 0, 0, 1)
|
||||
ZEND_ARG_INFO(0, channel)
|
||||
ZEND_ARG_VARIADIC_INFO(0, other_channels)
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_subscribe, 0, 0, 2)
|
||||
ZEND_ARG_INFO(0, channels)
|
||||
ZEND_ARG_INFO(0, cb)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_swapdb arginfo_class_Redis_rpoplpush
|
||||
@@ -655,7 +658,9 @@ ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_unlink arginfo_class_Redis_del
|
||||
|
||||
#define arginfo_class_Redis_unsubscribe arginfo_class_Redis_subscribe
|
||||
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Redis_unsubscribe, 0, 0, 1)
|
||||
ZEND_ARG_INFO(0, channels)
|
||||
ZEND_END_ARG_INFO()
|
||||
|
||||
#define arginfo_class_Redis_unwatch arginfo_class_Redis___destruct
|
||||
|
||||
|
||||
Reference in New Issue
Block a user