diff --git a/common.h b/common.h index 4026682..100b45e 100644 --- a/common.h +++ b/common.h @@ -134,6 +134,17 @@ typedef enum { #define MULTI 1 #define PIPELINE 2 +#define PHPREDIS_DEBUG_LOGGING 0 + +#if PHPREDIS_DEBUG_LOGGING == 1 +#define redisDbgFmt(fmt, ...) \ + php_printf("%s:%d:%s(): " fmt "\n", __FILE__, __LINE__, __func__, __VA_ARGS__) +#define redisDbgStr(str) phpredisDebugFmt("%s", str) +#else +#define redisDbgFmt(fmt, ...) ((void)0) +#define redisDbgStr(str) ((void)0) +#endif + #define IS_ATOMIC(redis_sock) (redis_sock->mode == ATOMIC) #define IS_MULTI(redis_sock) (redis_sock->mode & MULTI) #define IS_PIPELINE(redis_sock) (redis_sock->mode & PIPELINE) diff --git a/library.c b/library.c index 25e53a7..bc29fd6 100644 --- a/library.c +++ b/library.c @@ -2182,6 +2182,82 @@ static int redis_stream_liveness_check(php_stream *stream) { SUCCESS : FAILURE; } +/* Try to get the underlying socket FD for use with poll/select. + * Returns -1 on failure. */ +static int redis_stream_fd_for_select(php_stream *stream) { + php_socket_t fd; + int flags; + + flags = PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL; + if (php_stream_cast(stream, flags, (void*)&fd, 1) == FAILURE) + return -1; + + return fd; +} + +static int redis_detect_dirty_config(void) { + int val = INI_INT("redis.pconnect.pool_detect_dirty"); + + if (val >= 0 && val <= 2) + return val; + else if (val > 2) + return 2; + else + return 0; +} + +static int redis_pool_poll_timeout(void) { + int val = INI_INT("redis.pconnect.pool_poll_timeout"); + if (val >= 0) + return val; + + return 0; +} + +#define REDIS_POLL_FD_SET(_pfd, _fd, _events) \ + (_pfd).fd = _fd; (_pfd).events = _events; (_pfd).revents = 0 + +/* Try to determine if the socket is out of sync (has unconsumed replies) */ +static int redis_stream_detect_dirty(php_stream *stream) { + php_socket_t fd; + php_pollfd pfd; + int rv, action; + + /* Short circuit if this is disabled */ + if ((action = redis_detect_dirty_config()) == 0) + return SUCCESS; + + /* Seek past unconsumed bytes if we detect them */ + if (stream->readpos < stream->writepos) { + redisDbgFmt("%s on unconsumed buffer (%ld < %ld)", + action > 1 ? "Aborting" : "Seeking", + (long)stream->readpos, (long)stream->writepos); + + /* Abort if we are configured to immediately fail */ + if (action == 1) + return FAILURE; + + /* Seek to the end of buffered data */ + zend_off_t offset = stream->writepos - stream->readpos; + if (php_stream_seek(stream, offset, SEEK_CUR) == FAILURE) + return FAILURE; + } + + /* Get the underlying FD */ + if ((fd = redis_stream_fd_for_select(stream)) == -1) + return FAILURE; + + /* We want to detect a readable socket (it shouln't be) */ + REDIS_POLL_FD_SET(pfd, fd, PHP_POLLREADABLE); + rv = php_poll2(&pfd, 1, redis_pool_poll_timeout()); + + /* If we detect the socket is readable, it's dirty which is + * a failure. Otherwise as best we can tell it's good. + * TODO: We could attempt to consume up to N bytes */ + redisDbgFmt("Detected %s socket", rv > 0 ? "readable" : "unreadable"); + return rv == 0 ? SUCCESS : FAILURE; +} + static int redis_sock_check_liveness(RedisSock *redis_sock) { @@ -2310,9 +2386,12 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock) redis_sock->stream = *(php_stream **)zend_llist_get_last(&p->list); zend_llist_remove_tail(&p->list); - if (redis_sock_check_liveness(redis_sock) == SUCCESS) { + if (redis_stream_detect_dirty(redis_sock->stream) == SUCCESS && + redis_sock_check_liveness(redis_sock) == SUCCESS) + { return SUCCESS; } + p->nb_active--; } diff --git a/redis.c b/redis.c index de1b6cb..c96092d 100644 --- a/redis.c +++ b/redis.c @@ -104,6 +104,8 @@ PHP_INI_BEGIN() PHP_INI_ENTRY("redis.pconnect.pooling_enabled", "1", PHP_INI_ALL, NULL) PHP_INI_ENTRY("redis.pconnect.connection_limit", "0", PHP_INI_ALL, NULL) PHP_INI_ENTRY("redis.pconnect.echo_check_liveness", "1", PHP_INI_ALL, NULL) + PHP_INI_ENTRY("redis.pconnect.pool_detect_dirty", "0", PHP_INI_ALL, NULL) + PHP_INI_ENTRY("redis.pconnect.pool_poll_timeout", "0", PHP_INI_ALL, NULL) PHP_INI_ENTRY("redis.pconnect.pool_pattern", "", PHP_INI_ALL, NULL) /* redis session */