mirror of
https://github.com/php-win-ext/phpredis.git
synced 2026-03-24 00:52:16 +01:00
Add support for exponential backoff on retry
This commit is contained in:
90
backoff.c
Normal file
90
backoff.c
Normal file
@@ -0,0 +1,90 @@
|
||||
#include "common.h"
|
||||
|
||||
#include <ext/standard/php_rand.h>
|
||||
|
||||
#if PHP_VERSION_ID >= 70100
|
||||
#include <ext/standard/php_mt_rand.h>
|
||||
#else
|
||||
static zend_long php_mt_rand_range(zend_long min, zend_long max) {
|
||||
zend_long number = php_rand();
|
||||
RAND_RANGE(number, min, max, PHP_RAND_MAX);
|
||||
return number;
|
||||
}
|
||||
#endif
|
||||
|
||||
#include "backoff.h"
|
||||
|
||||
static zend_ulong random_range(zend_ulong min, zend_ulong max) {
|
||||
if (max < min) {
|
||||
return php_mt_rand_range(max, min);
|
||||
}
|
||||
|
||||
return php_mt_rand_range(min, max);
|
||||
}
|
||||
|
||||
static zend_ulong redis_default_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
zend_ulong backoff = retry_index ? self->base : random_range(0, self->base);
|
||||
return MIN(self->cap, backoff);
|
||||
}
|
||||
|
||||
static zend_ulong redis_constant_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
zend_ulong backoff = self->base;
|
||||
return MIN(self->cap, backoff);
|
||||
}
|
||||
|
||||
static zend_ulong redis_uniform_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
zend_ulong backoff = random_range(0, self->base);
|
||||
return MIN(self->cap, backoff);
|
||||
}
|
||||
|
||||
static zend_ulong redis_exponential_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
zend_ulong pow = MIN(retry_index, 10);
|
||||
zend_ulong backoff = self->base * (1 << pow);
|
||||
return MIN(self->cap, backoff);
|
||||
}
|
||||
|
||||
static zend_ulong redis_full_jitter_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
zend_ulong pow = MIN(retry_index, 10);
|
||||
zend_ulong backoff = self->base * (1 << pow);
|
||||
zend_ulong cap = MIN(self->cap, backoff);
|
||||
return random_range(0, self->cap);
|
||||
}
|
||||
|
||||
static zend_ulong redis_equal_jitter_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
zend_ulong pow = MIN(retry_index, 10);
|
||||
zend_ulong backoff = self->base * (1 << pow);
|
||||
zend_ulong temp = MIN(self->cap, backoff);
|
||||
return temp / 2 + random_range(0, temp) / 2;
|
||||
}
|
||||
|
||||
static zend_ulong redis_decorrelated_jitter_backoff(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
self->previous_backoff = random_range(self->base, self->previous_backoff * 3);
|
||||
return MIN(self->cap, self->previous_backoff);
|
||||
}
|
||||
|
||||
typedef zend_ulong (*redis_backoff_algorithm)(struct RedisBackoff *self, unsigned int retry_index);
|
||||
|
||||
static redis_backoff_algorithm redis_backoff_algorithms[REDIS_BACKOFF_ALGORITHMS] = {
|
||||
redis_default_backoff,
|
||||
redis_decorrelated_jitter_backoff,
|
||||
redis_full_jitter_backoff,
|
||||
redis_equal_jitter_backoff,
|
||||
redis_exponential_backoff,
|
||||
redis_uniform_backoff,
|
||||
redis_constant_backoff,
|
||||
};
|
||||
|
||||
void redis_initialize_backoff(struct RedisBackoff *self, unsigned long retry_interval) {
|
||||
self->algorithm = 0; // default backoff
|
||||
self->base = retry_interval;
|
||||
self->cap = retry_interval;
|
||||
self->previous_backoff = 0;
|
||||
}
|
||||
|
||||
void redis_backoff_reset(struct RedisBackoff *self) {
|
||||
self->previous_backoff = 0;
|
||||
}
|
||||
|
||||
zend_ulong redis_backoff_compute(struct RedisBackoff *self, unsigned int retry_index) {
|
||||
return redis_backoff_algorithms[self->algorithm](self, retry_index);
|
||||
}
|
||||
17
backoff.h
Normal file
17
backoff.h
Normal file
@@ -0,0 +1,17 @@
|
||||
#ifndef REDIS_BACKOFF_H
|
||||
#define REDIS_BACKOFF_H
|
||||
|
||||
/* {{{ struct RedisBackoff */
|
||||
struct RedisBackoff {
|
||||
unsigned int algorithm; /* index of algorithm function, returns backoff in microseconds*/
|
||||
zend_ulong base; /* base backoff in microseconds */
|
||||
zend_ulong cap; /* max backoff in microseconds */
|
||||
zend_ulong previous_backoff; /* previous backoff in microseconds */
|
||||
};
|
||||
/* }}} */
|
||||
|
||||
void redis_initialize_backoff(struct RedisBackoff *self, unsigned long retry_interval);
|
||||
void redis_backoff_reset(struct RedisBackoff *self);
|
||||
zend_ulong redis_backoff_compute(struct RedisBackoff *self, unsigned int retry_index);
|
||||
|
||||
#endif
|
||||
74
common.h
74
common.h
@@ -21,6 +21,8 @@
|
||||
#define NULL ((void *) 0)
|
||||
#endif
|
||||
|
||||
#include "backoff.h"
|
||||
|
||||
typedef enum {
|
||||
REDIS_SOCK_STATUS_FAILED = -1,
|
||||
REDIS_SOCK_STATUS_DISCONNECTED,
|
||||
@@ -83,6 +85,10 @@ typedef enum _PUBSUB_TYPE {
|
||||
#define REDIS_OPT_REPLY_LITERAL 8
|
||||
#define REDIS_OPT_COMPRESSION_LEVEL 9
|
||||
#define REDIS_OPT_NULL_MBULK_AS_NULL 10
|
||||
#define REDIS_OPT_MAX_RETRIES 11
|
||||
#define REDIS_OPT_BACKOFF_ALGORITHM 12
|
||||
#define REDIS_OPT_BACKOFF_BASE 13
|
||||
#define REDIS_OPT_BACKOFF_CAP 14
|
||||
|
||||
/* cluster options */
|
||||
#define REDIS_FAILOVER_NONE 0
|
||||
@@ -109,6 +115,16 @@ typedef enum {
|
||||
#define REDIS_SCAN_PREFIX 2
|
||||
#define REDIS_SCAN_NOPREFIX 3
|
||||
|
||||
/* BACKOFF_ALGORITHM options */
|
||||
#define REDIS_BACKOFF_ALGORITHMS 7
|
||||
#define REDIS_BACKOFF_ALGORITHM_DEFAULT 0
|
||||
#define REDIS_BACKOFF_ALGORITHM_DECORRELATED_JITTER 1
|
||||
#define REDIS_BACKOFF_ALGORITHM_FULL_JITTER 2
|
||||
#define REDIS_BACKOFF_ALGORITHM_EQUAL_JITTER 3
|
||||
#define REDIS_BACKOFF_ALGORITHM_EXPONENTIAL 4
|
||||
#define REDIS_BACKOFF_ALGORITHM_UNIFORM 5
|
||||
#define REDIS_BACKOFF_ALGORITHM_CONSTANT 6
|
||||
|
||||
/* GETBIT/SETBIT offset range limits */
|
||||
#define BITOP_MIN_OFFSET 0
|
||||
#define BITOP_MAX_OFFSET 4294967295U
|
||||
@@ -258,41 +274,43 @@ typedef enum {
|
||||
|
||||
/* {{{ struct RedisSock */
|
||||
typedef struct {
|
||||
php_stream *stream;
|
||||
php_stream_context *stream_ctx;
|
||||
zend_string *host;
|
||||
int port;
|
||||
zend_string *user;
|
||||
zend_string *pass;
|
||||
double timeout;
|
||||
double read_timeout;
|
||||
long retry_interval;
|
||||
redis_sock_status status;
|
||||
int persistent;
|
||||
int watching;
|
||||
zend_string *persistent_id;
|
||||
php_stream *stream;
|
||||
php_stream_context *stream_ctx;
|
||||
zend_string *host;
|
||||
int port;
|
||||
zend_string *user;
|
||||
zend_string *pass;
|
||||
double timeout;
|
||||
double read_timeout;
|
||||
long retry_interval;
|
||||
int max_retries;
|
||||
struct RedisBackoff backoff;
|
||||
redis_sock_status status;
|
||||
int persistent;
|
||||
int watching;
|
||||
zend_string *persistent_id;
|
||||
|
||||
redis_serializer serializer;
|
||||
int compression;
|
||||
int compression_level;
|
||||
long dbNumber;
|
||||
redis_serializer serializer;
|
||||
int compression;
|
||||
int compression_level;
|
||||
long dbNumber;
|
||||
|
||||
zend_string *prefix;
|
||||
zend_string *prefix;
|
||||
|
||||
short mode;
|
||||
struct fold_item *head;
|
||||
struct fold_item *current;
|
||||
short mode;
|
||||
struct fold_item *head;
|
||||
struct fold_item *current;
|
||||
|
||||
zend_string *pipeline_cmd;
|
||||
zend_string *pipeline_cmd;
|
||||
|
||||
zend_string *err;
|
||||
zend_string *err;
|
||||
|
||||
int scan;
|
||||
int scan;
|
||||
|
||||
int readonly;
|
||||
int reply_literal;
|
||||
int null_mbulk_as_null;
|
||||
int tcp_keepalive;
|
||||
int readonly;
|
||||
int reply_literal;
|
||||
int null_mbulk_as_null;
|
||||
int tcp_keepalive;
|
||||
} RedisSock;
|
||||
/* }}} */
|
||||
|
||||
|
||||
@@ -323,5 +323,5 @@ if test "$PHP_REDIS" != "no"; then
|
||||
fi
|
||||
|
||||
PHP_SUBST(REDIS_SHARED_LIBADD)
|
||||
PHP_NEW_EXTENSION(redis, redis.c redis_commands.c library.c redis_session.c redis_array.c redis_array_impl.c redis_cluster.c cluster_library.c redis_sentinel.c sentinel_library.c $lzf_sources, $ext_shared)
|
||||
PHP_NEW_EXTENSION(redis, redis.c redis_commands.c library.c redis_session.c redis_array.c redis_array_impl.c redis_cluster.c cluster_library.c redis_sentinel.c sentinel_library.c backoff.c $lzf_sources, $ext_shared)
|
||||
fi
|
||||
|
||||
@@ -5,7 +5,7 @@ ARG_ENABLE("redis-session", "whether to enable sessions", "yes");
|
||||
ARG_ENABLE("redis-igbinary", "whether to enable igbinary serializer support", "no");
|
||||
|
||||
if (PHP_REDIS != "no") {
|
||||
var sources = "redis.c redis_commands.c library.c redis_session.c redis_array.c redis_array_impl.c redis_cluster.c cluster_library.c redis_sentinel.c sentinel_library.c";
|
||||
var sources = "redis.c redis_commands.c library.c redis_session.c redis_array.c redis_array_impl.c redis_cluster.c cluster_library.c redis_sentinel.c sentinel_library.c backoff.c";
|
||||
if (PHP_REDIS_SESSION != "no") {
|
||||
ADD_EXTENSION_DEP("redis", "session");
|
||||
ADD_FLAG("CFLAGS_REDIS", ' /D PHP_SESSION=1 ');
|
||||
|
||||
19
library.c
19
library.c
@@ -301,7 +301,7 @@ redis_error_throw(RedisSock *redis_sock)
|
||||
PHP_REDIS_API int
|
||||
redis_check_eof(RedisSock *redis_sock, int no_throw)
|
||||
{
|
||||
int count;
|
||||
unsigned int retry_index;
|
||||
char *errmsg;
|
||||
|
||||
if (!redis_sock || !redis_sock->stream || redis_sock->status == REDIS_SOCK_STATUS_FAILED) {
|
||||
@@ -333,18 +333,17 @@ redis_check_eof(RedisSock *redis_sock, int no_throw)
|
||||
errmsg = "Connection lost and socket is in MULTI/watching mode";
|
||||
} else {
|
||||
errmsg = "Connection lost";
|
||||
/* TODO: configurable max retry count */
|
||||
for (count = 0; count < 10; ++count) {
|
||||
redis_backoff_reset(&redis_sock->backoff);
|
||||
for (retry_index = 0; retry_index < redis_sock->max_retries; ++retry_index) {
|
||||
/* close existing stream before reconnecting */
|
||||
if (redis_sock->stream) {
|
||||
redis_sock_disconnect(redis_sock, 1);
|
||||
}
|
||||
// Wait for a while before trying to reconnect
|
||||
if (redis_sock->retry_interval) {
|
||||
// Random factor to avoid having several (or many) concurrent connections trying to reconnect at the same time
|
||||
long retry_interval = (count ? redis_sock->retry_interval : (php_rand() % redis_sock->retry_interval));
|
||||
usleep(retry_interval);
|
||||
}
|
||||
/* Sleep based on our backoff algorithm */
|
||||
zend_ulong delay = redis_backoff_compute(&redis_sock->backoff, retry_index);
|
||||
if (delay != 0)
|
||||
usleep(delay);
|
||||
|
||||
/* reconnect */
|
||||
if (redis_sock_connect(redis_sock) == 0) {
|
||||
/* check for EOF again. */
|
||||
@@ -2150,6 +2149,8 @@ redis_sock_create(char *host, int host_len, int port,
|
||||
redis_sock->host = zend_string_init(host, host_len, 0);
|
||||
redis_sock->status = REDIS_SOCK_STATUS_DISCONNECTED;
|
||||
redis_sock->retry_interval = retry_interval * 1000;
|
||||
redis_sock->max_retries = 10;
|
||||
redis_initialize_backoff(&redis_sock->backoff, retry_interval);
|
||||
redis_sock->persistent = persistent;
|
||||
|
||||
if (persistent && persistent_id != NULL) {
|
||||
|
||||
@@ -88,6 +88,8 @@ http://pear.php.net/dtd/package-2.0.xsd">
|
||||
<file role='doc' name='arrays.markdown'/>
|
||||
<file role='doc' name='cluster.markdown'/>
|
||||
<file role='doc' name='sentinel.markdown'/>
|
||||
<file role='src' name='backoff.c'/>
|
||||
<file role='src' name='backoff.h'/>
|
||||
<file role='src' name='cluster_library.c'/>
|
||||
<file role='src' name='cluster_library.h'/>
|
||||
<file role='src' name='common.h'/>
|
||||
|
||||
16
redis.c
16
redis.c
@@ -775,6 +775,22 @@ static void add_class_constants(zend_class_entry *ce, int is_cluster) {
|
||||
zend_declare_class_constant_stringl(ce, "LEFT", 4, "left", 4);
|
||||
zend_declare_class_constant_stringl(ce, "RIGHT", 5, "right", 5);
|
||||
}
|
||||
|
||||
/* retry/backoff options*/
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_MAX_RETRIES"), REDIS_OPT_MAX_RETRIES);
|
||||
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_BACKOFF_ALGORITHM"), REDIS_OPT_BACKOFF_ALGORITHM);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_DEFAULT"), REDIS_BACKOFF_ALGORITHM_DEFAULT);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_CONSTANT"), REDIS_BACKOFF_ALGORITHM_CONSTANT);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_UNIFORM"), REDIS_BACKOFF_ALGORITHM_UNIFORM);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_EXPONENTIAL"), REDIS_BACKOFF_ALGORITHM_EXPONENTIAL);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_FULL_JITTER"), REDIS_BACKOFF_ALGORITHM_FULL_JITTER);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_EQUAL_JITTER"), REDIS_BACKOFF_ALGORITHM_EQUAL_JITTER);
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("BACKOFF_ALGORITHM_DECORRELATED_JITTER"), REDIS_BACKOFF_ALGORITHM_DECORRELATED_JITTER);
|
||||
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_BACKOFF_BASE"), REDIS_OPT_BACKOFF_BASE);
|
||||
|
||||
zend_declare_class_constant_long(ce, ZEND_STRL("OPT_BACKOFF_CAP"), REDIS_OPT_BACKOFF_CAP);
|
||||
}
|
||||
|
||||
static ZEND_RSRC_DTOR_FUNC(redis_connections_pool_dtor)
|
||||
|
||||
@@ -4308,6 +4308,14 @@ void redis_getoption_handler(INTERNAL_FUNCTION_PARAMETERS,
|
||||
RETURN_LONG(redis_sock->null_mbulk_as_null);
|
||||
case REDIS_OPT_FAILOVER:
|
||||
RETURN_LONG(c->failover);
|
||||
case REDIS_OPT_MAX_RETRIES:
|
||||
RETURN_LONG(redis_sock->max_retries);
|
||||
case REDIS_OPT_BACKOFF_ALGORITHM:
|
||||
RETURN_LONG(redis_sock->backoff.algorithm);
|
||||
case REDIS_OPT_BACKOFF_BASE:
|
||||
RETURN_LONG(redis_sock->backoff.base / 1000);
|
||||
case REDIS_OPT_BACKOFF_CAP:
|
||||
RETURN_LONG(redis_sock->backoff.cap / 1000);
|
||||
default:
|
||||
RETURN_FALSE;
|
||||
}
|
||||
@@ -4441,6 +4449,35 @@ void redis_setoption_handler(INTERNAL_FUNCTION_PARAMETERS,
|
||||
RETURN_TRUE;
|
||||
}
|
||||
break;
|
||||
case REDIS_OPT_MAX_RETRIES:
|
||||
val_long = zval_get_long(val);
|
||||
if(val_long >= 0) {
|
||||
redis_sock->max_retries = val_long;
|
||||
RETURN_TRUE;
|
||||
}
|
||||
break;
|
||||
case REDIS_OPT_BACKOFF_ALGORITHM:
|
||||
val_long = zval_get_long(val);
|
||||
if(val_long >= 0 &&
|
||||
val_long < REDIS_BACKOFF_ALGORITHMS) {
|
||||
redis_sock->backoff.algorithm = val_long;
|
||||
RETURN_TRUE;
|
||||
}
|
||||
break;
|
||||
case REDIS_OPT_BACKOFF_BASE:
|
||||
val_long = zval_get_long(val);
|
||||
if(val_long >= 0) {
|
||||
redis_sock->backoff.base = val_long * 1000;
|
||||
RETURN_TRUE;
|
||||
}
|
||||
break;
|
||||
case REDIS_OPT_BACKOFF_CAP:
|
||||
val_long = zval_get_long(val);
|
||||
if(val_long >= 0) {
|
||||
redis_sock->backoff.cap = val_long * 1000;
|
||||
RETURN_TRUE;
|
||||
}
|
||||
break;
|
||||
EMPTY_SWITCH_DEFAULT_CASE()
|
||||
}
|
||||
RETURN_FALSE;
|
||||
|
||||
@@ -5398,6 +5398,50 @@ class Redis_Test extends TestSuite
|
||||
$this->assertTrue(count($arr_all_keys) == 0);
|
||||
}
|
||||
|
||||
public function testMaxRetriesOption() {
|
||||
$maxRetriesExpected = 5;
|
||||
$this->redis->setOption(Redis::OPT_MAX_RETRIES, $maxRetriesExpected);
|
||||
$maxRetriesActual=$this->redis->getOption(Redis::OPT_MAX_RETRIES);
|
||||
$this->assertEquals($maxRetriesActual, $maxRetriesExpected);
|
||||
}
|
||||
|
||||
public function testBackoffOptions() {
|
||||
$this->redis->setOption(Redis::OPT_MAX_RETRIES, 5);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_MAX_RETRIES), 5);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_ALGORITHM, Redis::BACKOFF_ALGORITHM_DEFAULT);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_ALGORITHM), Redis::BACKOFF_ALGORITHM_DEFAULT);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_ALGORITHM, Redis::BACKOFF_ALGORITHM_CONSTANT);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_ALGORITHM), Redis::BACKOFF_ALGORITHM_CONSTANT);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_ALGORITHM, Redis::BACKOFF_ALGORITHM_UNIFORM);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_ALGORITHM), Redis::BACKOFF_ALGORITHM_UNIFORM);
|
||||
|
||||
$this->redis -> setOption(Redis::OPT_BACKOFF_ALGORITHM, Redis::BACKOFF_ALGORITHM_EXPONENTIAL);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_ALGORITHM), Redis::BACKOFF_ALGORITHM_EXPONENTIAL);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_ALGORITHM, Redis::BACKOFF_ALGORITHM_FULL_JITTER);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_ALGORITHM), Redis::BACKOFF_ALGORITHM_FULL_JITTER);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_ALGORITHM, Redis::BACKOFF_ALGORITHM_DECORRELATED_JITTER);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_ALGORITHM), Redis::BACKOFF_ALGORITHM_DECORRELATED_JITTER);
|
||||
|
||||
$this->assertFalse($this->redis->setOption(Redis::OPT_BACKOFF_ALGORITHM, 55555));
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_BASE, 500);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_BASE), 500);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_BASE, 750);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_BASE), 750);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_CAP, 500);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_CAP), 500);
|
||||
|
||||
$this->redis->setOption(Redis::OPT_BACKOFF_CAP, 750);
|
||||
$this->assertEquals($this->redis->getOption(Redis::OPT_BACKOFF_CAP), 750);
|
||||
}
|
||||
|
||||
public function testHScan() {
|
||||
if (version_compare($this->version, "2.8.0") < 0) {
|
||||
$this->markTestSkipped();
|
||||
|
||||
Reference in New Issue
Block a user