Binary protocol support

This commit is contained in:
Mikael Johansson
2007-11-03 09:19:07 +00:00
parent f816771d27
commit 6a79abc8f3
21 changed files with 1503 additions and 577 deletions

View File

@@ -72,6 +72,12 @@ if test "$PHP_MEMCACHE" != "no"; then
session_inc_path="$abs_srcdir"
elif test -f "$phpincludedir/ext/session/php_session.h"; then
session_inc_path="$phpincludedir"
else
for i in php php4 php5 php6; do
if test -f "$prefix/include/$i/ext/session/php_session.h"; then
session_inc_path="$prefix/include/$i"
fi
done
fi
if test "$session_inc_path" = ""; then
@@ -86,7 +92,7 @@ if test "$PHP_MEMCACHE" != "no"; then
AC_MSG_RESULT([enabled])
AC_DEFINE(HAVE_MEMCACHE_SESSION,1,[Whether memcache session handler is enabled])
AC_DEFINE(HAVE_MEMCACHE,1,[Whether you want memcache support])
PHP_NEW_EXTENSION(memcache, memcache.c memcache_pool.c memcache_queue.c memcache_session.c memcache_standard_hash.c memcache_consistent_hash.c, $ext_shared,,-I$session_inc_path)
PHP_NEW_EXTENSION(memcache, memcache.c memcache_pool.c memcache_queue.c memcache_ascii_protocol.c memcache_binary_protocol.c memcache_standard_hash.c memcache_consistent_hash.c memcache_session.c, $ext_shared,,-I$session_inc_path)
ifdef([PHP_ADD_EXTENSION_DEP],
[
PHP_ADD_EXTENSION_DEP(memcache, session)
@@ -94,7 +100,7 @@ if test "$PHP_MEMCACHE" != "no"; then
else
AC_MSG_RESULT([disabled])
AC_DEFINE(HAVE_MEMCACHE,1,[Whether you want memcache support])
PHP_NEW_EXTENSION(memcache, memcache.c memcache_pool.c memcache_queue.c memcache_standard_hash.c memcache_consistent_hash.c, $ext_shared)
PHP_NEW_EXTENSION(memcache, memcache.c memcache_pool.c memcache_queue.c memcache_ascii_protocol.c memcache_binary_protocol.c memcache_standard_hash.c memcache_consistent_hash.c, $ext_shared)
fi
dnl this is needed to build the extension with phpize and -Wall

View File

@@ -155,6 +155,23 @@ static PHP_INI_MH(OnUpdateFailoverAttempts) /* {{{ */
}
/* }}} */
static PHP_INI_MH(OnUpdateProtocol) /* {{{ */
{
if (!strcasecmp(new_value, "ascii")) {
MEMCACHE_G(protocol) = MMC_ASCII_PROTOCOL;
}
else if (!strcasecmp(new_value, "binary")) {
MEMCACHE_G(protocol) = MMC_BINARY_PROTOCOL;
}
else {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "memcache.protocol must be in set {ascii, binary} ('%s' given)", new_value);
return FAILURE;
}
return SUCCESS;
}
/* }}} */
static PHP_INI_MH(OnUpdateHashStrategy) /* {{{ */
{
if (!strcasecmp(new_value, "standard")) {
@@ -209,6 +226,7 @@ PHP_INI_BEGIN()
STD_PHP_INI_ENTRY("memcache.max_failover_attempts", "20", PHP_INI_ALL, OnUpdateFailoverAttempts, max_failover_attempts, zend_memcache_globals, memcache_globals)
STD_PHP_INI_ENTRY("memcache.default_port", "11211", PHP_INI_ALL, OnUpdateLong, default_port, zend_memcache_globals, memcache_globals)
STD_PHP_INI_ENTRY("memcache.chunk_size", "32768", PHP_INI_ALL, OnUpdateChunkSize, chunk_size, zend_memcache_globals, memcache_globals)
STD_PHP_INI_ENTRY("memcache.protocol", "ascii", PHP_INI_ALL, OnUpdateProtocol, protocol, zend_memcache_globals, memcache_globals)
STD_PHP_INI_ENTRY("memcache.hash_strategy", "consistent", PHP_INI_ALL, OnUpdateHashStrategy, hash_strategy, zend_memcache_globals, memcache_globals)
STD_PHP_INI_ENTRY("memcache.hash_function", "crc32", PHP_INI_ALL, OnUpdateHashFunction, hash_function, zend_memcache_globals, memcache_globals)
STD_PHP_INI_ENTRY("memcache.redundancy", "1", PHP_INI_ALL, OnUpdateRedundancy, redundancy, zend_memcache_globals, memcache_globals)
@@ -330,10 +348,10 @@ static int mmc_get_pool(zval *id, mmc_pool_t **pool TSRMLS_DC) /* {{{ */
}
/* }}} */
int mmc_stored_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
parses a SET/ADD/REPLACE response line, param is a zval pointer to store result into {{{ */
int mmc_stored_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
handles SET/ADD/REPLACE response, param is a zval pointer to store result into {{{ */
{
if (mmc_str_left((char *)value, "STORED", value_len, sizeof("STORED")-1)) {
if (response == MMC_OK) {
if (param != NULL && Z_TYPE_P((zval *)param) == IS_NULL) {
ZVAL_TRUE((zval *)param);
}
@@ -341,20 +359,20 @@ int mmc_stored_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned
}
/* return FALSE or catch memory errors without failover */
if (mmc_str_left((char *)value, "NOT_STORED", value_len, sizeof("NOT_STORED")-1) ||
mmc_str_left((char *)value, "SERVER_ERROR out of memory", value_len, sizeof("SERVER_ERROR out of memory")-1) ||
mmc_str_left((char *)value, "SERVER_ERROR object too large", value_len, sizeof("SERVER_ERROR object too large")-1)) {
if (response == MMC_RESPONSE_EXISTS ||
mmc_str_left(message, "SERVER_ERROR out of memory", message_len, sizeof("SERVER_ERROR out of memory")-1) ||
mmc_str_left(message, "SERVER_ERROR object too large", message_len, sizeof("SERVER_ERROR object too large")-1)) {
if (param != NULL) {
ZVAL_FALSE((zval *)param);
}
return MMC_REQUEST_DONE;
}
return mmc_request_failure(mmc, request->io, (char *)value, value_len, 0 TSRMLS_CC);
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* }}} */
static void php_mmc_store(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len) /* {{{ */
static void php_mmc_store(INTERNAL_FUNCTION_PARAMETERS, int op) /* {{{ */
{
mmc_pool_t *pool;
mmc_request_t *request;
@@ -410,16 +428,22 @@ static void php_mmc_store(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len)
/* allocate request */
if (return_value_used) {
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
request = mmc_pool_request(pool, MMC_PROTO_TCP,
mmc_stored_handler, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
}
else {
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
request = mmc_pool_request(pool, MMC_PROTO_TCP,
mmc_stored_handler, NULL, mmc_pool_failover_handler, NULL TSRMLS_CC);
}
if (mmc_prepare_key_ex(key, key_len, request->key, &(request->key_len)) != MMC_OK) {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid key");
mmc_pool_release(pool, request);
continue;
}
/* assemble command */
if (mmc_prepare_store(pool, request, cmd, cmd_len, ZSTR_VAL(key), key_len, flags, exptime, *arrval TSRMLS_CC) != MMC_OK) {
if (pool->protocol->store(pool, request, op, ZSTR_VAL(key), key_len, flags, exptime, *arrval TSRMLS_CC) != MMC_OK) {
mmc_pool_release(pool, request);
continue;
}
@@ -435,11 +459,16 @@ static void php_mmc_store(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len)
}
else {
/* allocate request */
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
mmc_stored_handler, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_stored_handler, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
if (mmc_prepare_key_ex(Z_STRVAL_P(keys), Z_STRLEN_P(keys), request->key, &(request->key_len)) != MMC_OK) {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid key");
mmc_pool_release(pool, request);
RETURN_FALSE;
}
/* assemble command */
if (mmc_prepare_store(pool, request, cmd, cmd_len, Z_STRVAL_P(keys), Z_STRLEN_P(keys), flags, exptime, value TSRMLS_CC) != MMC_OK) {
if (pool->protocol->store(pool, request, op, request->key, request->key_len, flags, exptime, value TSRMLS_CC) != MMC_OK) {
mmc_pool_release(pool, request);
RETURN_FALSE;
}
@@ -459,33 +488,33 @@ static void php_mmc_store(INTERNAL_FUNCTION_PARAMETERS, char *cmd, int cmd_len)
}
/* }}} */
static int mmc_deleted_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_deleted_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
parses a DELETED response line, param is a zval pointer to store result into {{{ */
{
if (mmc_str_left((char *)value, "DELETED", value_len, sizeof("DELETED")-1)) {
if (response == MMC_OK) {
if (param != NULL && Z_TYPE_P((zval *)param) == IS_NULL) {
ZVAL_TRUE((zval *)param);
}
return MMC_REQUEST_DONE;
}
if (mmc_str_left((char *)value, "NOT_FOUND", value_len, sizeof("NOT_FOUND")-1)) {
if (response == MMC_RESPONSE_NOT_FOUND) {
if (param != NULL) {
ZVAL_FALSE((zval *)param);
}
return MMC_REQUEST_DONE;
}
return mmc_request_failure(mmc, request->io, (char *)value, value_len, 0 TSRMLS_CC);
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* }}} */
static int mmc_numeric_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_numeric_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
parses a numeric response line, param is a zval pointer to store result into {{{ */
{
/* must contain digit(s) + \r\n */
if (value_len < 3) {
return mmc_request_failure(mmc, request->io, (char *)value, value_len, 0 TSRMLS_CC);
/* must contain digit(s) */
if (message_len < 1) {
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* append return value to result array */
@@ -497,11 +526,11 @@ static int mmc_numeric_handler(mmc_t *mmc, mmc_request_t *request, void *value,
if (Z_TYPE_P((zval *)param) == IS_ARRAY) {
zval *result;
MAKE_STD_ZVAL(result);
ZVAL_LONG(result, atol((char *)value));
ZVAL_LONG(result, atol(message));
add_assoc_zval_ex((zval *)param, request->key, request->key_len + 1, result);
}
else {
ZVAL_LONG((zval *)param, atol((char *)value));
ZVAL_LONG((zval *)param, atol(message));
}
}
@@ -509,24 +538,24 @@ static int mmc_numeric_handler(mmc_t *mmc, mmc_request_t *request, void *value,
}
/* }}} */
static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, const char *cmd, unsigned int cmd_len, int deleted) /*
static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, int deleted, int invert) /*
sends one or several commands which have a single optional numeric parameter (incr, decr, delete) {{{ */
{
mmc_pool_t *pool;
zval *mmc_object = getThis();
zval *keys;
long value = 1;
long value = 1, defval = 0, exptime = 0;
mmc_request_t *request;
mmc_request_value_handler value_handler;
mmc_request_response_handler response_handler;
if (mmc_object == NULL) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oz|l", &mmc_object, memcache_pool_ce, &keys, &value) == FAILURE) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oz|lll", &mmc_object, memcache_pool_ce, &keys, &value, &defval, &exptime) == FAILURE) {
return;
}
}
else {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|l", &keys, &value) == FAILURE) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|lll", &keys, &value, &defval, &exptime) == FAILURE) {
return;
}
}
@@ -536,10 +565,10 @@ static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, const char *cmd, unsig
}
if (deleted) {
value_handler = mmc_deleted_handler;
response_handler = mmc_deleted_handler;
}
else {
value_handler = mmc_numeric_handler;
response_handler = mmc_numeric_handler;
}
if (Z_TYPE_P(keys) == IS_ARRAY) {
@@ -554,12 +583,14 @@ static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, const char *cmd, unsig
/* allocate request */
if (return_value_used) {
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
value_handler, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
request = mmc_pool_request(
pool, MMC_PROTO_TCP, response_handler, return_value,
mmc_pool_failover_handler, NULL TSRMLS_CC);
}
else {
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
value_handler, NULL, mmc_pool_failover_handler, NULL TSRMLS_CC);
request = mmc_pool_request(
pool, MMC_PROTO_TCP, response_handler, NULL,
mmc_pool_failover_handler, NULL TSRMLS_CC);
}
if (mmc_prepare_key(*key, request->key, &(request->key_len)) != MMC_OK) {
@@ -568,16 +599,12 @@ static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, const char *cmd, unsig
continue;
}
smart_str_appendl(&(request->sendbuf.value), cmd, cmd_len);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), request->key, request->key_len);
if (value > 0) {
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), value);
if (deleted) {
pool->protocol->delete(request, request->key, request->key_len, value);
}
else {
pool->protocol->mutate(request, request->key, request->key_len, invert ? -value : value, defval, exptime);
}
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
/* schedule request */
if (mmc_pool_schedule_key(pool, request->key, request->key_len, request, MEMCACHE_G(redundancy) TSRMLS_CC) != MMC_OK) {
@@ -597,8 +624,8 @@ static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, const char *cmd, unsig
}
/* allocate request */
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
value_handler, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
request = mmc_pool_request(pool, MMC_PROTO_TCP,
response_handler, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
if (mmc_prepare_key(keys, request->key, &(request->key_len)) != MMC_OK) {
mmc_pool_release(pool, request);
@@ -606,17 +633,13 @@ static void php_mmc_numeric(INTERNAL_FUNCTION_PARAMETERS, const char *cmd, unsig
RETURN_FALSE;
}
smart_str_appendl(&(request->sendbuf.value), cmd, cmd_len);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), request->key, request->key_len);
if (value > 0) {
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), value);
if (deleted) {
pool->protocol->delete(request, request->key, request->key_len, value);
}
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
else {
pool->protocol->mutate(request, request->key, request->key_len, invert ? -value : value, defval, exptime);
}
/* schedule request */
if (mmc_pool_schedule_key(pool, request->key, request->key_len, request, MEMCACHE_G(redundancy) TSRMLS_CC) != MMC_OK) {
RETURN_FALSE;
@@ -883,12 +906,7 @@ static int mmc_stats_parse_generic(char *start, char *end, zval *result TSRMLS_D
array_init(result);
}
/* "stats maps" returns "\n" delimited lines, other commands uses "\r\n" */
if (*end == '\r') {
end--;
}
if (start <= end) {
if (start < end) {
if ((space = php_memnstr(start, " ", 1, end)) != NULL) {
key = estrndup(start, space - start);
add_assoc_stringl_ex(result, key, space - start + 1, space + 1, end - space, 1);
@@ -898,6 +916,9 @@ static int mmc_stats_parse_generic(char *start, char *end, zval *result TSRMLS_D
add_next_index_stringl(result, start, end - start, 1);
}
}
else {
return 0;
}
return 1;
}
@@ -1217,19 +1238,24 @@ PHP_FUNCTION(memcache_get_server_status)
}
/* }}} */
static int mmc_version_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_version_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
parses the VERSION response line, param is a zval pointer to store version into {{{ */
{
unsigned int version_len = value_len - (sizeof("VERSION ")-1) - (sizeof("\r\n")-1);
char *version = emalloc(version_len + 1);
if (sscanf((char *)value, "VERSION %s", version) != 1) {
efree(version);
return mmc_request_failure(mmc, request->io, (char *)value, value_len, 0 TSRMLS_CC);
if (response != MMC_RESPONSE_ERROR) {
char *version = emalloc(message_len + 1);
if (sscanf(message, "VERSION %s", version) == 1) {
ZVAL_STRING((zval *)param, version, 0);
}
else {
efree(version);
ZVAL_STRINGL((zval *)param, (char *)message, message_len, 1);
}
return MMC_REQUEST_DONE;
}
ZVAL_STRINGL((zval *)param, version, version_len, 0);
return MMC_REQUEST_DONE;
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* }}} */
@@ -1240,6 +1266,7 @@ PHP_FUNCTION(memcache_get_version)
mmc_pool_t *pool;
zval *mmc_object = getThis();
int i;
mmc_request_t *request;
if (mmc_object == NULL) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O", &mmc_object, memcache_pool_ce) == FAILURE) {
@@ -1254,9 +1281,12 @@ PHP_FUNCTION(memcache_get_version)
RETVAL_FALSE;
for (i=0; i<pool->num_servers; i++) {
/* run command and check for valid return value */
if (mmc_pool_schedule_command(pool, pool->servers[i], "version\r\n", sizeof("version\r\n")-1,
mmc_request_parse_line, mmc_version_handler, return_value TSRMLS_CC) == MMC_OK) {
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_version_handler, return_value, NULL, NULL TSRMLS_CC);
pool->protocol->version(request);
if (mmc_pool_schedule(pool, pool->servers[i], request TSRMLS_CC) == MMC_OK) {
mmc_pool_run(pool TSRMLS_CC);
if (Z_TYPE_P(return_value) == IS_STRING) {
break;
}
@@ -1269,7 +1299,7 @@ PHP_FUNCTION(memcache_get_version)
Adds new item. Item with such key should not exist. */
PHP_FUNCTION(memcache_add)
{
php_mmc_store(INTERNAL_FUNCTION_PARAM_PASSTHRU, "add", sizeof("add")-1);
php_mmc_store(INTERNAL_FUNCTION_PARAM_PASSTHRU, MMC_OP_ADD);
}
/* }}} */
@@ -1277,7 +1307,7 @@ PHP_FUNCTION(memcache_add)
Sets the value of an item. Item may exist or not */
PHP_FUNCTION(memcache_set)
{
php_mmc_store(INTERNAL_FUNCTION_PARAM_PASSTHRU, "set", sizeof("set")-1);
php_mmc_store(INTERNAL_FUNCTION_PARAM_PASSTHRU, MMC_OP_SET);
}
/* }}} */
@@ -1285,11 +1315,11 @@ PHP_FUNCTION(memcache_set)
Replaces existing item. Returns false if item doesn't exist */
PHP_FUNCTION(memcache_replace)
{
php_mmc_store(INTERNAL_FUNCTION_PARAM_PASSTHRU, "replace", sizeof("replace")-1);
php_mmc_store(INTERNAL_FUNCTION_PARAM_PASSTHRU, MMC_OP_REPLACE);
}
/* }}} */
static int mmc_value_handler_multi(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_value_handler_multi(mmc_t *mmc, mmc_request_t *request, const char *key, unsigned int key_len, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
receives a multiple values, param is a zval pointer array to store value in {{{ */
{
zval *arrval;
@@ -1299,14 +1329,14 @@ static int mmc_value_handler_multi(mmc_t *mmc, mmc_request_t *request, void *val
if (Z_TYPE_P((zval *)param) != IS_ARRAY) {
array_init((zval *)param);
}
add_assoc_zval_ex((zval *)param, request->value.key, request->value.key_len + 1, arrval);
add_assoc_zval_ex((zval *)param, (char *)key, key_len + 1, arrval);
/* request more data (more values or END line) */
return MMC_REQUEST_AGAIN;
}
/* }}} */
int mmc_value_handler_single(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
int mmc_value_handler_single(mmc_t *mmc, mmc_request_t *request, const char *key, unsigned int key_len, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
receives a single value, param is a zval pointer to store value to {{{ */
{
*((zval *)param) = *((zval *)value);
@@ -1319,9 +1349,6 @@ int mmc_value_handler_single(mmc_t *mmc, mmc_request_t *request, void *value, un
static int mmc_value_failover_handler(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request, void *param TSRMLS_DC) /*
uses keys and return value to reschedule requests to other servers, param is a zval ** pointer {{{ */
{
char keytmp[MMC_MAX_KEY_LEN + 1];
unsigned int keytmp_len;
zval **key, *keys = ((zval **)param)[0], *return_value = ((zval **)param)[0];
HashPosition pos;
@@ -1335,13 +1362,9 @@ static int mmc_value_failover_handler(mmc_pool_t *pool, mmc_t *mmc, mmc_request_
while (zend_hash_get_current_data_ex(Z_ARRVAL_P(keys), (void **)&key, &pos) == SUCCESS) {
zend_hash_move_forward_ex(Z_ARRVAL_P(keys), &pos);
if (mmc_prepare_key(*key, keytmp, &keytmp_len) != MMC_OK) {
continue;
}
/* re-schedule key if it does not exist in return value array */
if (!zend_hash_exists(Z_ARRVAL_P(return_value), keytmp, keytmp_len)) {
mmc_pool_schedule_get(pool, MMC_PROTO_UDP, keytmp, keytmp_len,
if (!zend_hash_exists(Z_ARRVAL_P(return_value), Z_STRVAL_PP(key), Z_STRLEN_PP(key) + 1)) {
mmc_pool_schedule_get(pool, MMC_PROTO_UDP, *key,
mmc_value_handler_multi, return_value,
mmc_value_failover_handler, param, request TSRMLS_CC);
}
@@ -1378,9 +1401,6 @@ PHP_FUNCTION(memcache_get)
ZVAL_FALSE(return_value);
if (Z_TYPE_P(keys) == IS_ARRAY) {
char keytmp[MMC_MAX_KEY_LEN + 1];
unsigned int keytmp_len;
zval **key;
HashPosition pos;
@@ -1392,13 +1412,8 @@ PHP_FUNCTION(memcache_get)
while (zend_hash_get_current_data_ex(Z_ARRVAL_P(keys), (void **)&key, &pos) == SUCCESS) {
zend_hash_move_forward_ex(Z_ARRVAL_P(keys), &pos);
if (mmc_prepare_key(*key, keytmp, &keytmp_len) != MMC_OK) {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid key");
continue;
}
/* schedule request */
mmc_pool_schedule_get(pool, MMC_PROTO_UDP, keytmp, keytmp_len,
mmc_pool_schedule_get(pool, MMC_PROTO_UDP, *key,
mmc_value_handler_multi, return_value,
mmc_value_failover_handler, failover_handler_param, NULL TSRMLS_CC);
}
@@ -1407,8 +1422,10 @@ PHP_FUNCTION(memcache_get)
mmc_request_t *request;
/* allocate request */
request = mmc_pool_request(pool, MMC_PROTO_UDP, mmc_request_parse_value,
mmc_value_handler_single, return_value, mmc_pool_failover_handler, NULL TSRMLS_CC);
request = mmc_pool_request_get(
pool, MMC_PROTO_UDP,
mmc_value_handler_single, return_value,
mmc_pool_failover_handler, NULL TSRMLS_CC);
if (mmc_prepare_key(keys, request->key, &(request->key_len)) != MMC_OK) {
mmc_pool_release(pool, request);
@@ -1416,10 +1433,8 @@ PHP_FUNCTION(memcache_get)
return;
}
smart_str_appendl(&(request->sendbuf.value), "get ", sizeof("get ")-1);
smart_str_appendl(&(request->sendbuf.value), request->key, request->key_len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
pool->protocol->get(request, keys, request->key, request->key_len);
/* schedule request */
if (mmc_pool_schedule_key(pool, request->key, request->key_len, request, 1 TSRMLS_CC) != MMC_OK) {
return;
@@ -1431,37 +1446,40 @@ PHP_FUNCTION(memcache_get)
}
/* }}} */
static int mmc_stats_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_stats_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
parses the stats response line, param is a zval pointer to store stats into {{{ */
{
if (!mmc_str_left((char *)value, "ERROR", value_len, sizeof("ERROR")-1) &&
!mmc_str_left((char *)value, "CLIENT_ERROR", value_len, sizeof("CLIENT_ERROR")-1) &&
!mmc_str_left((char *)value, "SERVER_ERROR", value_len, sizeof("SERVER_ERROR")-1))
if (response != MMC_RESPONSE_ERROR)
{
if (mmc_str_left((char *)value, "RESET", value_len, sizeof("RESET")-1)) {
char *line = (char *)message;
if (mmc_str_left(line, "RESET", message_len, sizeof("RESET")-1)) {
ZVAL_TRUE((zval *)param);
return MMC_REQUEST_DONE;
}
else if (mmc_str_left((char *)value, "STAT ", value_len, sizeof("STAT ")-1)) {
if (mmc_stats_parse_stat((char *)value + sizeof("STAT ")-1, (char *)value + value_len - sizeof("\r\n"), (zval *)param TSRMLS_CC)) {
else if (mmc_str_left(line, "STAT ", message_len, sizeof("STAT ")-1)) {
if (mmc_stats_parse_stat(line + sizeof("STAT ")-1, line + message_len - 1, (zval *)param TSRMLS_CC)) {
return MMC_REQUEST_AGAIN;
}
}
else if (mmc_str_left((char *)value, "ITEM ", value_len, sizeof("ITEM ")-1)) {
if (mmc_stats_parse_item((char *)value + sizeof("ITEM ")-1, (char *)value + value_len - sizeof("\r\n"), (zval *)param TSRMLS_CC)) {
else if (mmc_str_left(line, "ITEM ", message_len, sizeof("ITEM ")-1)) {
if (mmc_stats_parse_item(line + sizeof("ITEM ")-1, line + message_len - 1, (zval *)param TSRMLS_CC)) {
return MMC_REQUEST_AGAIN;
}
}
else if (mmc_str_left((char *)value, "END", value_len, sizeof("END")-1)) {
else if (mmc_str_left(line, "END", message_len, sizeof("END")-1)) {
return MMC_REQUEST_DONE;
}
else if (mmc_stats_parse_generic((char *)value, (char *)value + value_len - sizeof("\n"), (zval *)param TSRMLS_CC)) {
else if (mmc_stats_parse_generic(line, line + message_len, (zval *)param TSRMLS_CC)) {
return MMC_REQUEST_AGAIN;
}
zval_dtor((zval *)param);
ZVAL_FALSE((zval *)param);
return MMC_REQUEST_FAILURE;
}
ZVAL_FALSE((zval *)param);
return MMC_REQUEST_FAILURE;
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* }}} */
@@ -1483,9 +1501,10 @@ PHP_FUNCTION(memcache_get_stats)
mmc_pool_t *pool;
zval *mmc_object = getThis();
char *cmd, *type = NULL;
int i, cmd_len, type_len = 0;
char *type = NULL;
int i, type_len = 0;
long slabid = 0, limit = MMC_DEFAULT_CACHEDUMP_LIMIT;
mmc_request_t *request;
if (mmc_object == NULL) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|sll", &mmc_object, memcache_pool_ce, &type, &type_len, &slabid, &limit) == FAILURE) {
@@ -1507,24 +1526,16 @@ PHP_FUNCTION(memcache_get_stats)
RETURN_FALSE;
}
if (slabid) {
cmd_len = spprintf(&cmd, 0, "stats %s %ld %ld\r\n", type, slabid, limit);
}
else if (type) {
cmd_len = spprintf(&cmd, 0, "stats %s\r\n", type);
}
else {
cmd_len = spprintf(&cmd, 0, "stats\r\n");
}
ZVAL_FALSE(return_value);
for (i=0; i<pool->num_servers; i++) {
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_stats_handler, return_value, NULL, NULL TSRMLS_CC);
pool->protocol->stats(request, type, slabid, limit);
/* run command and check for valid return value */
if (mmc_pool_schedule_command(pool, pool->servers[i], cmd, cmd_len,
mmc_request_parse_line, mmc_stats_handler, return_value TSRMLS_CC) == MMC_OK)
{
if (mmc_pool_schedule(pool, pool->servers[i], request TSRMLS_CC) == MMC_OK) {
mmc_pool_run(pool TSRMLS_CC);
if (Z_TYPE_P(return_value) != IS_BOOL || Z_BVAL_P(return_value)) {
break;
}
@@ -1533,7 +1544,6 @@ PHP_FUNCTION(memcache_get_stats)
/* execute all requests */
mmc_pool_run(pool TSRMLS_CC);
efree(cmd);
}
/* }}} */
@@ -1544,9 +1554,10 @@ PHP_FUNCTION(memcache_get_extended_stats)
mmc_pool_t *pool;
zval *mmc_object = getThis(), *stats;
char *host, *cmd, *type = NULL;
int i, host_len, cmd_len, type_len = 0;
char *host, *type = NULL;
int i, host_len, type_len = 0;
long slabid = 0, limit = MMC_DEFAULT_CACHEDUMP_LIMIT;
mmc_request_t *request;
if (mmc_object == NULL) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|sll", &mmc_object, memcache_pool_ce, &type, &type_len, &slabid, &limit) == FAILURE) {
@@ -1567,16 +1578,6 @@ PHP_FUNCTION(memcache_get_extended_stats)
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid stats type");
RETURN_FALSE;
}
if (slabid) {
cmd_len = spprintf(&cmd, 0, "stats %s %ld %ld\r\n", type, slabid, limit);
}
else if (type) {
cmd_len = spprintf(&cmd, 0, "stats %s\r\n", type);
}
else {
cmd_len = spprintf(&cmd, 0, "stats\r\n");
}
array_init(return_value);
@@ -1588,17 +1589,14 @@ PHP_FUNCTION(memcache_get_extended_stats)
add_assoc_zval_ex(return_value, host, host_len + 1, stats);
efree(host);
/* schedule command */
mmc_pool_schedule_command(pool, pool->servers[i], cmd, cmd_len,
mmc_request_parse_line, mmc_stats_handler, stats TSRMLS_CC);
/* begin sending requests immediatly */
mmc_pool_select(pool, 0 TSRMLS_CC);
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_stats_handler, stats, NULL, NULL TSRMLS_CC);
pool->protocol->stats(request, type, slabid, limit);
mmc_pool_schedule(pool, pool->servers[i], request TSRMLS_CC);
}
/* execute all requests */
mmc_pool_run(pool TSRMLS_CC);
efree(cmd);
}
/* }}} */
@@ -1651,23 +1649,23 @@ PHP_FUNCTION(memcache_set_compress_threshold)
Deletes existing item */
PHP_FUNCTION(memcache_delete)
{
php_mmc_numeric(INTERNAL_FUNCTION_PARAM_PASSTHRU, "delete", sizeof("delete")-1, 1);
php_mmc_numeric(INTERNAL_FUNCTION_PARAM_PASSTHRU, 1, 0);
}
/* }}} */
/* {{{ proto mixed memcache_increment(object memcache, mixed key [, int value ])
/* {{{ proto mixed memcache_increment(object memcache, mixed key [, int value [, int defval [, int exptime ] ] ])
Increments existing variable */
PHP_FUNCTION(memcache_increment)
{
php_mmc_numeric(INTERNAL_FUNCTION_PARAM_PASSTHRU, "incr", sizeof("incr")-1, 0);
php_mmc_numeric(INTERNAL_FUNCTION_PARAM_PASSTHRU, 0, 0);
}
/* }}} */
/* {{{ proto mixed memcache_decrement(object memcache, mixed key [, int value ])
/* {{{ proto mixed memcache_decrement(object memcache, mixed key [, int value [, int defval [, int exptime ] ] ])
Decrements existing variable */
PHP_FUNCTION(memcache_decrement)
{
php_mmc_numeric(INTERNAL_FUNCTION_PARAM_PASSTHRU, "decr", sizeof("decr")-1, 0);
php_mmc_numeric(INTERNAL_FUNCTION_PARAM_PASSTHRU, 0, 1);
}
/* }}} */
@@ -1700,35 +1698,36 @@ PHP_FUNCTION(memcache_close)
}
/* }}} */
static int mmc_flush_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_flush_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
parses the OK response line, param is an int pointer to increment on success {{{ */
{
if (!mmc_str_left((char *)value, "OK", value_len, sizeof("OK")-1)) {
return mmc_request_failure(mmc, request->io, (char *)value, value_len, 0 TSRMLS_CC);
if (response == MMC_OK) {
(*((int *)param))++;
return MMC_REQUEST_DONE;
}
(*((int *)param))++;
return MMC_REQUEST_DONE;
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* }}} */
/* {{{ proto bool memcache_flush( object memcache [, int timestamp ] )
Flushes cache, optionally at the specified time */
/* {{{ proto bool memcache_flush( object memcache [, int delay ] )
Flushes cache, optionally at after the specified delay */
PHP_FUNCTION(memcache_flush)
{
mmc_pool_t *pool;
zval *mmc_object = getThis();
char cmd[sizeof("flush_all")-1 + 1 + MAX_LENGTH_OF_LONG + sizeof("\r\n")-1 + 1];
unsigned int cmd_len, i, responses = 0;
long timestamp = 0;
mmc_request_t *request;
unsigned int i, responses = 0;
long delay = 0;
if (mmc_object == NULL) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|l", &mmc_object, memcache_pool_ce, &timestamp) == FAILURE) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|l", &mmc_object, memcache_pool_ce, &delay) == FAILURE) {
return;
}
}
else {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &timestamp) == FAILURE) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &delay) == FAILURE) {
return;
}
}
@@ -1737,19 +1736,14 @@ PHP_FUNCTION(memcache_flush)
RETURN_FALSE;
}
if (timestamp > 0) {
cmd_len = sprintf(cmd, "flush_all %ld\r\n", timestamp);
}
else {
cmd_len = sprintf(cmd, "flush_all\r\n");
}
for (i=0; i<pool->num_servers; i++) {
mmc_pool_schedule_command(pool, pool->servers[i], cmd, cmd_len,
mmc_request_parse_line, mmc_flush_handler, &responses TSRMLS_CC);
/* begin sending requests immediatly */
mmc_pool_select(pool, 0 TSRMLS_CC);
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_flush_handler, &responses, NULL, NULL TSRMLS_CC);
pool->protocol->flush(request, delay);
if (mmc_pool_schedule(pool, pool->servers[i], request TSRMLS_CC) == MMC_OK) {
/* begin sending requests immediatly */
mmc_pool_select(pool, 0 TSRMLS_CC);
}
}
/* execute all requests */

333
memcache_ascii_protocol.c Normal file
View File

@@ -0,0 +1,333 @@
/*
+----------------------------------------------------------------------+
| PHP Version 5 |
+----------------------------------------------------------------------+
| Copyright (c) 1997-2007 The PHP Group |
+----------------------------------------------------------------------+
| This source file is subject to version 3.0 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_0.txt. |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Authors: Antony Dovgal <tony2001@phpclub.net> |
| Mikael Johansson <mikael AT synd DOT info> |
+----------------------------------------------------------------------+
*/
/* $Id$ */
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "memcache_pool.h"
#include "ext/standard/php_smart_str.h"
typedef struct mmc_ascii_request {
mmc_request_t base; /* enable cast to mmc_request_t */
struct { /* stores value info while the body is being read */
char key[MMC_MAX_KEY_LEN + 1]; /* key buffer to use on failover of single-key requests */
unsigned int flags;
unsigned long length;
uint64_t cas; /* CAS counter */
} value;
} mmc_ascii_request_t;
static int mmc_server_read_value(mmc_t *, mmc_request_t * TSRMLS_DC);
static int mmc_stream_get_line(mmc_stream_t *io, char **line TSRMLS_DC) /*
attempts to read a line from server, returns the line size or 0 if no complete line was available {{{ */
{
size_t returned_len = 0;
io->readline(io, io->input.value + io->input.idx, MMC_BUFFER_SIZE - io->input.idx, &returned_len TSRMLS_CC);
io->input.idx += returned_len;
if (io->input.idx && io->input.value[io->input.idx - 1] == '\n') {
int result = io->input.idx;
*line = io->input.value;
io->input.idx = 0;
return result;
}
return 0;
}
/* }}} */
static int mmc_request_parse_response(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads a generic response header and delegates it to response_handler {{{ */
{
char *line;
int line_len = mmc_stream_get_line(request->io, &line TSRMLS_CC);
if (line_len > 0) {
int response;
if (mmc_str_left(line, "OK", line_len, sizeof("OK")-1) ||
mmc_str_left(line, "STORED", line_len, sizeof("STORED")-1) ||
mmc_str_left(line, "DELETED", line_len, sizeof("DELETED")-1))
{
response = MMC_OK;
}
else if (mmc_str_left(line, "NOT_FOUND", line_len, sizeof("NOT_FOUND")-1)) {
response = MMC_RESPONSE_NOT_FOUND;
}
else if (mmc_str_left(line, "NOT_STORED", line_len, sizeof("NOT_STORED")-1)) {
response = MMC_RESPONSE_EXISTS;
}
else if (
mmc_str_left(line, "ERROR", line_len, sizeof("ERROR")-1) ||
mmc_str_left(line, "SERVER_ERROR", line_len, sizeof("SERVER_ERROR")-1) ||
mmc_str_left(line, "CLIENT_ERROR", line_len, sizeof("CLIENT_ERROR")-1))
{
response = MMC_RESPONSE_ERROR;
}
else {
response = MMC_RESPONSE_UNKNOWN;
}
return request->response_handler(mmc, request, response, line, line_len - (sizeof("\r\n")-1), request->response_handler_param TSRMLS_CC);
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static int mmc_request_parse_value(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads and parses the VALUE <key> <flags> <size> header and then reads the value body {{{ */
{
char *line;
int line_len;
mmc_ascii_request_t *req = (mmc_ascii_request_t *)request;
line_len = mmc_stream_get_line(request->io, &line TSRMLS_CC);
if (line_len > 0) {
if (mmc_str_left(line, "END", line_len, sizeof("END")-1)) {
return MMC_REQUEST_DONE;
}
if (sscanf(line, MMC_VALUE_HEADER, req->value.key, &(req->value.flags), &(req->value.length)) != 3) {
return mmc_server_failure(mmc, request->io, "Malformed VALUE header", 0 TSRMLS_CC);
}
/* memory for data + \r\n */
mmc_buffer_alloc(&(request->readbuf), req->value.length + 2);
/* allow read_value handler to read the value body */
request->parse = mmc_server_read_value;
/* read more, php streams buffer input which must be read if available */
return MMC_REQUEST_AGAIN;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static int mmc_server_read_value(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
read the value body into the buffer {{{ */
{
mmc_ascii_request_t *req = (mmc_ascii_request_t *)request;
request->readbuf.idx +=
request->io->read(request->io, request->readbuf.value.c + request->readbuf.idx, req->value.length + 2 - request->readbuf.idx TSRMLS_CC);
/* done reading? */
if (request->readbuf.idx >= req->value.length + 2) {
int result = mmc_unpack_value(
mmc, request, &(request->readbuf), req->value.key, strlen(req->value.key),
req->value.flags, req->value.length TSRMLS_CC);
/* allow parse_value to read next VALUE or END line */
request->parse = mmc_request_parse_value;
mmc_buffer_reset(&(request->readbuf));
return result;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static mmc_request_t *mmc_ascii_create_request() /* {{{ */
{
mmc_ascii_request_t *request = emalloc(sizeof(mmc_ascii_request_t));
memset(request, 0, sizeof(*request));
return (mmc_request_t *)request;
}
/* }}} */
static void mmc_ascii_get(mmc_request_t *request, zval *zkey, const char *key, unsigned int key_len) /* {{{ */
{
request->parse = mmc_request_parse_value;
smart_str_appendl(&(request->sendbuf.value), "get ", sizeof("get ")-1);
smart_str_appendl(&(request->sendbuf.value), key, key_len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
}
/* }}} */
static void mmc_ascii_begin_get(mmc_request_t *request) /* {{{ */
{
request->parse = mmc_request_parse_value;
smart_str_appendl(&(request->sendbuf.value), "get", sizeof("get")-1);
}
/* }}} */
static void mmc_ascii_append_get(mmc_request_t *request, zval *zkey, const char *key, unsigned int key_len) /* {{{ */
{
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), key, key_len);
}
/* }}} */
static void mmc_ascii_end_get(mmc_request_t *request) /* {{{ */
{
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
}
/* }}} */
static int mmc_ascii_store(
mmc_pool_t *pool, mmc_request_t *request, int op, const char *key, unsigned int key_len, unsigned int flags, unsigned int exptime, zval *value TSRMLS_DC) /* {{{ */
{
int status;
mmc_buffer_t buffer;
request->parse = mmc_request_parse_response;
memset(&buffer, 0, sizeof(buffer));
status = mmc_pack_value(pool, &buffer, value, &flags TSRMLS_CC);
if (status != MMC_OK) {
return status;
}
switch (op) {
case MMC_OP_SET:
smart_str_appendl(&(request->sendbuf.value), "set", sizeof("set")-1);
break;
case MMC_OP_ADD:
smart_str_appendl(&(request->sendbuf.value), "add", sizeof("add")-1);
break;
case MMC_OP_REPLACE:
smart_str_appendl(&(request->sendbuf.value), "replace", sizeof("replace")-1);
break;
}
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), key, key_len);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), flags);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), exptime);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), buffer.value.len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
smart_str_appendl(&(request->sendbuf.value), buffer.value.c, buffer.value.len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
mmc_buffer_free(&buffer);
return MMC_OK;
}
/* }}} */
static void mmc_ascii_delete(mmc_request_t *request, const char *key, unsigned int key_len, unsigned int exptime) /* {{{ */
{
request->parse = mmc_request_parse_response;
smart_str_appendl(&(request->sendbuf.value), "delete", sizeof("delete")-1);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), key, key_len);
if (exptime > 0) {
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), exptime);
}
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
}
/* }}} */
static void mmc_ascii_mutate(mmc_request_t *request, const char *key, unsigned int key_len, long value, long defval, unsigned int exptime) /* {{{ */
{
request->parse = mmc_request_parse_response;
if (value >= 0) {
smart_str_appendl(&(request->sendbuf.value), "incr", sizeof("incr")-1);
}
else {
smart_str_appendl(&(request->sendbuf.value), "decr", sizeof("decr")-1);
}
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), key, key_len);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), value >= 0 ? value : -value);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
}
/* }}} */
static void mmc_ascii_flush(mmc_request_t *request, unsigned int exptime) /* {{{ */
{
request->parse = mmc_request_parse_response;
smart_str_appendl(&(request->sendbuf.value), "flush_all", sizeof("flush_all")-1);
if (exptime > 0) {
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), exptime);
}
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
}
/* }}} */
static void mmc_ascii_version(mmc_request_t *request) /* {{{ */
{
request->parse = mmc_request_parse_response;
smart_str_appendl(&(request->sendbuf.value), "version\r\n", sizeof("version\r\n")-1);
}
/* }}} */
static void mmc_ascii_stats(mmc_request_t *request, const char *type, long slabid, long limit) /* {{{ */
{
char *cmd;
unsigned int cmd_len;
request->parse = mmc_request_parse_response;
if (slabid) {
cmd_len = spprintf(&cmd, 0, "stats %s %ld %ld\r\n", type, slabid, limit);
}
else if (type) {
cmd_len = spprintf(&cmd, 0, "stats %s\r\n", type);
}
else {
cmd_len = spprintf(&cmd, 0, "stats\r\n");
}
smart_str_appendl(&(request->sendbuf.value), cmd, cmd_len);
efree(cmd);
}
/* }}} */
mmc_protocol_t mmc_ascii_protocol = {
mmc_ascii_create_request,
mmc_request_reset,
mmc_request_free,
mmc_ascii_get,
mmc_ascii_begin_get,
mmc_ascii_append_get,
mmc_ascii_end_get,
mmc_ascii_store,
mmc_ascii_delete,
mmc_ascii_mutate,
mmc_ascii_flush,
mmc_ascii_version,
mmc_ascii_stats
};
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: noet sw=4 ts=4 fdm=marker
* vim<600: noet sw=4 ts=4
*/

496
memcache_binary_protocol.c Normal file
View File

@@ -0,0 +1,496 @@
/*
+----------------------------------------------------------------------+
| PHP Version 5 |
+----------------------------------------------------------------------+
| Copyright (c) 1997-2007 The PHP Group |
+----------------------------------------------------------------------+
| This source file is subject to version 3.0 of the PHP license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.php.net/license/3_0.txt. |
| If you did not receive a copy of the PHP license and are unable to |
| obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Authors: Antony Dovgal <tony2001@phpclub.net> |
| Mikael Johansson <mikael AT synd DOT info> |
+----------------------------------------------------------------------+
*/
/* $Id$ */
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdint.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "memcache_pool.h"
#include "ext/standard/php_smart_str.h"
#if __BYTE_ORDER == __BIG_ENDIAN
# define ntohll(x) (x)
# define htonll(x) (x)
#elif __BYTE_ORDER == __LITTLE_ENDIAN
# include <byteswap.h>
# define ntohll(x) bswap_64(x)
# define htonll(x) bswap_64(x)
#else
# error "Could not determine byte order: __BYTE_ORDER uncorrectly defined"
#endif
#define MMC_REQUEST_MAGIC 0x0f
#define MMC_RESPONSE_MAGIC 0xf0
#define MMC_OP_GET 0x00
#define MMC_OP_DELETE 0x04
#define MMC_OP_MUTATE 0x05
#define MMC_OP_FLUSH 0x07
#define MMC_OP_GETQ 0x08
#define MMC_OP_NOOP 0x09
#define MMC_OP_VERSION 0x0a
typedef struct mmc_binary_request {
mmc_request_t base; /* enable cast to mmc_request_t */
mmc_queue_t keys; /* mmc_queue_t<zval *>, reqid -> key mappings */
struct {
uint8_t error; /* error received in current request */
uint32_t reqid; /* current reqid being processed */
uint32_t reqid_end; /* last reqid in a stream of requests */
} command;
struct { /* stores value info while the body is being read */
unsigned int flags;
unsigned long length;
uint64_t cas; /* CAS counter */
} value;
} mmc_binary_request_t;
typedef struct mmc_request_header {
uint8_t magic;
uint8_t command;
uint8_t key_len;
uint8_t _reserved;
uint32_t reqid; /* opaque request id */
uint32_t length; /* trailing body length (not including this header) */
} mmc_request_header_t;
typedef struct mmc_store_request_header {
mmc_request_header_t base;
uint32_t flags;
uint32_t exptime;
} mmc_store_request_header_t;
typedef struct mmc_delete_request_header {
mmc_request_header_t base;
uint32_t exptime;
} mmc_delete_request_header_t;
typedef struct mmc_mutate_request_header {
mmc_request_header_t base;
int64_t value;
int64_t defval;
uint32_t exptime;
} mmc_mutate_request_header_t;
typedef struct mmc_flush_request_header {
mmc_request_header_t base;
uint32_t exptime;
} mmc_flush_request_header_t;
typedef struct mmc_response_header {
uint8_t magic;
uint8_t command;
uint8_t error;
uint8_t _reserved;
uint32_t reqid; /* echo'ed from request */
uint32_t length; /* trailing body length (not including this header) */
} mmc_response_header_t;
typedef struct mmc_get_response_header {
uint32_t flags;
} mmc_get_response_header_t;
static int mmc_request_read_response(mmc_t *, mmc_request_t * TSRMLS_DC);
static int mmc_request_parse_value(mmc_t *, mmc_request_t * TSRMLS_DC);
static int mmc_request_read_value(mmc_t *, mmc_request_t * TSRMLS_DC);
static inline char *mmc_stream_get(mmc_stream_t *io, size_t bytes TSRMLS_DC) /*
attempts to read a number of bytes from server, returns the a pointer to the buffer on success, NULL if the complete number of bytes could not be read {{{ */
{
io->input.idx += io->read(io, io->input.value + io->input.idx, bytes - io->input.idx TSRMLS_CC);
if (io->input.idx >= bytes) {
io->input.idx = 0;
return io->input.value;
}
return NULL;
}
/* }}} */
static inline void mmc_decode_response_header(mmc_response_header_t *header) /* {{{ */
{
header->reqid = ntohl(header->reqid);
header->length = ntohl(header->length);
}
/* }}} */
static int mmc_request_parse_response(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads a generic response header and reads the response body {{{ */
{
mmc_response_header_t *header;
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
header = (mmc_response_header_t *)mmc_stream_get(request->io, sizeof(*header) TSRMLS_CC);
if (header != NULL) {
mmc_decode_response_header(header);
if (header->magic != MMC_RESPONSE_MAGIC) {
return mmc_server_failure(mmc, request->io, "Malformed server response (invalid magic byte)", 0 TSRMLS_CC);
}
if (header->length == 0) {
return request->response_handler(mmc, request, header->error, "", 0, request->response_handler_param TSRMLS_CC);
}
req->command.error = header->error;
req->value.length = header->length;
/* memory for data + \0 */
mmc_buffer_alloc(&(request->readbuf), req->value.length + 1);
/* allow read_response handler to read the response body */
request->parse = mmc_request_read_response;
/* read more, php streams buffer input which must be read if available */
return MMC_REQUEST_AGAIN;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static int mmc_request_read_response(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
read the response body into the buffer {{{ */
{
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
request->readbuf.idx +=
request->io->read(request->io, request->readbuf.value.c + request->readbuf.idx, req->value.length - request->readbuf.idx TSRMLS_CC);
/* done reading? */
if (request->readbuf.idx >= req->value.length) {
return request->response_handler(mmc, request, req->command.error, request->readbuf.value.c, req->value.length, request->response_handler_param TSRMLS_CC);
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static int mmc_request_parse_value_header(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads and parses the header and then reads the value header {{{ */
{
mmc_response_header_t *header;
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
header = (mmc_response_header_t *)mmc_stream_get(request->io, sizeof(*header) TSRMLS_CC);
if (header != NULL) {
mmc_decode_response_header(header);
if (header->magic != MMC_RESPONSE_MAGIC) {
return mmc_server_failure(mmc, request->io, "Malformed server response (invalid magic byte)", 0 TSRMLS_CC);
}
/* read response body in case of error */
if (header->error != 0) {
if (header->length == 0) {
return request->response_handler(mmc, request, header->error, "", 0, request->response_handler_param TSRMLS_CC);
}
req->command.error = header->error;
req->value.length = header->length;
/* memory for data + \0 */
mmc_buffer_alloc(&(request->readbuf), req->value.length + 1);
/* allow read_response handler to read the response body */
request->parse = mmc_request_read_response;
}
else {
/* last command in multi-get */
if (header->command == MMC_OP_NOOP) {
return MMC_REQUEST_DONE;
}
req->command.reqid = header->reqid;
req->value.length = header->length - sizeof(mmc_get_response_header_t);
/* allow parse_value_ handler to read the value specific header */
request->parse = mmc_request_parse_value;
}
/* read more, php streams buffer input which must be read if available */
return MMC_REQUEST_AGAIN;
}
return MMC_REQUEST_MORE;
}
static int mmc_request_parse_value(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads and parses the value header and then reads the value body {{{ */
{
mmc_get_response_header_t *header;
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
header = (mmc_get_response_header_t *)mmc_stream_get(request->io, sizeof(*header) TSRMLS_CC);
if (header != NULL) {
req->value.flags = ntohl(header->flags);
/* memory for data + \0 */
mmc_buffer_alloc(&(request->readbuf), req->value.length + 1);
/* allow read_value handler to read the value body */
request->parse = mmc_request_read_value;
/* read more, php streams buffer input which must be read if available */
return MMC_REQUEST_AGAIN;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static int mmc_request_read_value(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
read the value body into the buffer {{{ */
{
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
request->readbuf.idx +=
request->io->read(request->io, request->readbuf.value.c + request->readbuf.idx, req->value.length - request->readbuf.idx TSRMLS_CC);
/* done reading? */
if (request->readbuf.idx >= req->value.length) {
zval *key;
int result;
key = (zval *)mmc_queue_item(&(req->keys), req->command.reqid);
result = mmc_unpack_value(
mmc, request, &(request->readbuf), Z_STRVAL_P(key), Z_STRLEN_P(key),
req->value.flags, req->value.length TSRMLS_CC);
if (result == MMC_REQUEST_AGAIN) {
if (req->command.reqid >= req->command.reqid_end) {
return MMC_REQUEST_DONE;
}
/* allow parse_value to read next VALUE */
request->parse = mmc_request_parse_value_header;
mmc_buffer_reset(&(request->readbuf));
}
return result;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
static inline void mmc_pack_header(mmc_request_header_t *header, uint8_t command, unsigned int key_len, unsigned int reqid, unsigned int length) /* {{{ */
{
header->magic = MMC_REQUEST_MAGIC;
header->command = command;
header->key_len = key_len & 0xff;
header->_reserved = 0;
header->reqid = htonl(reqid);
header->length = htonl(length);
}
/* }}} */
static mmc_request_t *mmc_binary_create_request() /* {{{ */
{
mmc_binary_request_t *request = emalloc(sizeof(mmc_binary_request_t));
memset(request, 0, sizeof(*request));
return (mmc_request_t *)request;
}
/* }}} */
static void mmc_binary_reset_request(mmc_request_t *request) /* {{{ */
{
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
mmc_queue_reset(&(req->keys));
mmc_request_reset(request);
}
/* }}} */
static void mmc_binary_free_request(mmc_request_t *request) /* {{{ */
{
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
mmc_queue_free(&(req->keys));
mmc_request_free(request);
}
/* }}} */
static void mmc_binary_get(mmc_request_t *request, zval *zkey, const char *key, unsigned int key_len) /* {{{ */
{
mmc_request_header_t header;
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
request->parse = mmc_request_parse_value_header;
/* reqid/opaque is the index into the collection of key pointers */
req->command.reqid_end = req->keys.len;
mmc_pack_header(&header, MMC_OP_GET, key_len, req->command.reqid_end, key_len);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
smart_str_appendl(&(request->sendbuf.value), key, key_len);
/* store key to be used by the response handler */
mmc_queue_push(&(req->keys), zkey);
}
/* }}} */
static void mmc_binary_begin_get(mmc_request_t *request) /* {{{ */
{
request->parse = mmc_request_parse_value_header;
}
/* }}} */
static void mmc_binary_append_get(mmc_request_t *request, zval *zkey, const char *key, unsigned int key_len) /* {{{ */
{
mmc_request_header_t header;
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
/* reqid/opaque is the index into the collection of key pointers */
mmc_pack_header(&header, MMC_OP_GETQ, key_len, req->keys.len, key_len);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
smart_str_appendl(&(request->sendbuf.value), key, key_len);
/* store key to be used by the response handler */
mmc_queue_push(&(req->keys), zkey);
}
/* }}} */
static void mmc_binary_end_get(mmc_request_t *request) /* {{{ */
{
mmc_request_header_t header;
mmc_binary_request_t *req = (mmc_binary_request_t *)request;
req->command.reqid_end = req->keys.len;
mmc_pack_header(&header, MMC_OP_NOOP, 0, req->command.reqid_end, 0);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
}
/* }}} */
static int mmc_binary_store(
mmc_pool_t *pool, mmc_request_t *request, int op, const char *key, unsigned int key_len, unsigned int flags, unsigned int exptime, zval *value TSRMLS_DC) /* {{{ */
{
int status, prevlen;
mmc_store_request_header_t *header;
request->parse = mmc_request_parse_response;
prevlen = request->sendbuf.value.len;
/* allocate space for header */
mmc_buffer_alloc(&(request->sendbuf), sizeof(*header));
request->sendbuf.value.len += sizeof(*header);
/* append key and data */
smart_str_appendl(&(request->sendbuf.value), key, key_len);
status = mmc_pack_value(pool, &(request->sendbuf), value, &flags TSRMLS_CC);
if (status != MMC_OK) {
return status;
}
/* initialize header */
header = (mmc_store_request_header_t *)(request->sendbuf.value.c + prevlen);
mmc_pack_header(&(header->base), op, key_len, 0, request->sendbuf.value.len - prevlen - sizeof(header->base));
header->flags = htonl(flags);
header->exptime = htonl(exptime);
return MMC_OK;
}
/* }}} */
static void mmc_binary_delete(mmc_request_t *request, const char *key, unsigned int key_len, unsigned int exptime) /* {{{ */
{
mmc_delete_request_header_t header;
request->parse = mmc_request_parse_response;
mmc_pack_header(&(header.base), MMC_OP_DELETE, key_len, 0, sizeof(header) - sizeof(header.base) + key_len);
header.exptime = htonl(exptime);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
smart_str_appendl(&(request->sendbuf.value), key, key_len);
}
/* }}} */
static void mmc_binary_mutate(mmc_request_t *request, const char *key, unsigned int key_len, long value, long defval, unsigned int exptime) /* {{{ */
{
mmc_mutate_request_header_t header;
request->parse = mmc_request_parse_response;
mmc_pack_header(&(header.base), MMC_OP_MUTATE, key_len, 0, sizeof(header) - sizeof(header.base) + key_len);
header.value = htonll(value);
header.defval = htonll(defval);
header.exptime = htonl(exptime);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
smart_str_appendl(&(request->sendbuf.value), key, key_len);
}
/* }}} */
static void mmc_binary_flush(mmc_request_t *request, unsigned int exptime) /* {{{ */
{
mmc_flush_request_header_t header;
request->parse = mmc_request_parse_response;
mmc_pack_header(&(header.base), MMC_OP_FLUSH, 0, 0, sizeof(header) - sizeof(header.base));
header.exptime = htonl(exptime);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
}
/* }}} */
static void mmc_binary_version(mmc_request_t *request) /* {{{ */
{
mmc_request_header_t header;
request->parse = mmc_request_parse_response;
mmc_pack_header(&header, MMC_OP_VERSION, 0, 0, 0);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
}
/* }}} */
static void mmc_binary_stats(mmc_request_t *request, const char *type, long slabid, long limit) /* {{{ */
{
/* stats not supported */
mmc_request_header_t header;
request->parse = mmc_request_parse_response;
mmc_pack_header(&header, MMC_OP_NOOP, 0, 0, 0);
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
}
/* }}} */
mmc_protocol_t mmc_binary_protocol = {
mmc_binary_create_request,
mmc_binary_reset_request,
mmc_binary_free_request,
mmc_binary_get,
mmc_binary_begin_get,
mmc_binary_append_get,
mmc_binary_end_get,
mmc_binary_store,
mmc_binary_delete,
mmc_binary_mutate,
mmc_binary_flush,
mmc_binary_version,
mmc_binary_stats
};
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: noet sw=4 ts=4 fdm=marker
* vim<600: noet sw=4 ts=4
*/

View File

@@ -119,7 +119,7 @@ static void mmc_consistent_pupulate_buckets(mmc_consistent_state_t *state) /* {{
}
/* }}} */
mmc_t *mmc_consistent_find_server(void *s, const char *key, int key_len TSRMLS_DC) /* {{{ */
mmc_t *mmc_consistent_find_server(void *s, const char *key, unsigned int key_len TSRMLS_DC) /* {{{ */
{
mmc_consistent_state_t *state = s;

View File

@@ -38,14 +38,15 @@ ZEND_DECLARE_MODULE_GLOBALS(memcache)
static void mmc_pool_switch(mmc_pool_t *);
static inline void mmc_buffer_alloc(mmc_buffer_t *buffer, unsigned int size) /* {{{ */
inline void mmc_buffer_alloc(mmc_buffer_t *buffer, unsigned int size) /*
ensures space for an additional size bytes {{{ */
{
register size_t newlen;
smart_str_alloc((&(buffer->value)), size, 0);
}
/* }}} */
static inline void mmc_buffer_free(mmc_buffer_t *buffer) /* {{{ */
inline void mmc_buffer_free(mmc_buffer_t *buffer) /* {{{ */
{
if (buffer->value.c != NULL) {
smart_str_free(&(buffer->value));
@@ -54,11 +55,10 @@ static inline void mmc_buffer_free(mmc_buffer_t *buffer) /* {{{ */
}
/* }}} */
static unsigned int mmc_hash_crc32(const char *key, int key_len) /*
static unsigned int mmc_hash_crc32(const char *key, unsigned int key_len) /*
CRC32 hash {{{ */
{
unsigned int crc = ~0;
int i;
unsigned int i, crc = ~0;
for (i=0; i<key_len; i++) {
CRC32(crc, key[i]);
@@ -68,11 +68,10 @@ static unsigned int mmc_hash_crc32(const char *key, int key_len) /*
}
/* }}} */
static unsigned int mmc_hash_fnv1a(const char *key, int key_len) /*
static unsigned int mmc_hash_fnv1a(const char *key, unsigned int key_len) /*
FNV-1a hash {{{ */
{
unsigned int hval = FNV_32_INIT;
int i;
unsigned int i, hval = FNV_32_INIT;
for (i=0; i<key_len; i++) {
hval ^= (unsigned int)key[i];
@@ -131,33 +130,16 @@ static char *mmc_stream_readline_wrapper(mmc_stream_t *io, char *buf, size_t max
}
/* }}} */
static int mmc_stream_get_line(mmc_stream_t *io, char **line TSRMLS_DC) /*
attempts to read a line from server, returns the line size or 0 if no complete line was available {{{ */
void mmc_request_reset(mmc_request_t *request) /* {{{ */
{
size_t returned_len = 0;
io->readline(io, io->input.value + io->input.idx, MMC_BUFFER_SIZE - io->input.idx, &returned_len TSRMLS_CC);
io->input.idx += returned_len;
if (io->input.idx && io->input.value[io->input.idx - 1] == '\n') {
int result = io->input.idx;
*line = io->input.value;
io->input.idx = 0;
return result;
}
return 0;
}
/* }}} */
static inline mmc_request_t *mmc_request_new() /* {{{ */
{
mmc_request_t *request = emalloc(sizeof(mmc_request_t));
memset(request, 0, sizeof(*request));
return request;
request->key_len = 0;
mmc_buffer_reset(&(request->sendbuf));
mmc_queue_reset(&(request->failed_servers));
request->failed_index = 0;
}
/* }}} */
static inline void mmc_request_free(mmc_request_t *request) /* {{{ */
void mmc_request_free(mmc_request_t *request) /* {{{ */
{
mmc_buffer_free(&(request->sendbuf));
mmc_buffer_free(&(request->readbuf));
@@ -178,6 +160,10 @@ static inline int mmc_request_send(mmc_t *mmc, mmc_request_t *request TSRMLS_DC)
return MMC_REQUEST_DONE;
}
if (php_stream_eof(request->io->stream)) {
return mmc_server_failure(mmc, request->io, "Write failed (socket was unexpectedly closed)", 0 TSRMLS_CC);
}
return MMC_REQUEST_MORE;
}
/* }}} */
@@ -246,25 +232,56 @@ static int mmc_request_read_udp(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
}
/* }}} */
static int mmc_compress(const char *data, unsigned long data_len, char **result, unsigned long *result_len) /*
compresses value into a buffer, returns MMC_OK on success {{{ */
static void mmc_compress(mmc_pool_t *pool, mmc_buffer_t *buffer, const char *value, int value_len, unsigned int *flags, int copy TSRMLS_DC) /* {{{ */
{
int status;
*result_len = data_len + (data_len / 1000) + 25 + 1; /* some magic from zlib.c */
*result = emalloc(*result_len);
if (MMC_COMPRESSION_LEVEL >= 0) {
status = compress2((unsigned char *)*result, result_len, (unsigned const char *)data, data_len, MMC_COMPRESSION_LEVEL);
} else {
status = compress((unsigned char *)*result, result_len, (unsigned const char *)data, data_len);
/* autocompress large values */
if (pool->compress_threshold && value_len >= pool->compress_threshold) {
*flags |= MMC_COMPRESSED;
}
if (status == Z_OK) {
return MMC_OK;
}
if (*flags & MMC_COMPRESSED) {
int status;
mmc_buffer_t prev;
unsigned long result_len = value_len * (1 - pool->min_compress_savings);
if (copy) {
/* value is already in output buffer */
prev = *buffer;
/* allocate space for prev header + result */
memset(buffer, 0, sizeof(*buffer));
mmc_buffer_alloc(buffer, prev.value.len + result_len);
efree(*result);
return MMC_REQUEST_FAILURE;
/* append prev header */
smart_str_appendl(&(buffer->value), prev.value.c, prev.value.len - value_len);
buffer->idx = prev.idx;
}
else {
/* allocate space directly in buffer */
mmc_buffer_alloc(buffer, result_len);
}
if (MMC_COMPRESSION_LEVEL >= 0) {
status = compress2((unsigned char *)buffer->value.c + buffer->value.len, &result_len, (unsigned const char *)value, value_len, MMC_COMPRESSION_LEVEL);
} else {
status = compress((unsigned char *)buffer->value.c + buffer->value.len, &result_len, (unsigned const char *)value, value_len);
}
if (status == Z_OK) {
buffer->value.len += result_len;
}
else {
smart_str_appendl(&(buffer->value), value, value_len);
*flags &= ~MMC_COMPRESSED;
}
if (copy) {
mmc_buffer_free(&prev);
}
}
else if (!copy) {
smart_str_appendl(&(buffer->value), value, value_len);
}
}
/* }}}*/
@@ -287,121 +304,59 @@ static int mmc_uncompress(const char *data, unsigned long data_len, char **resul
}
/* }}}*/
int mmc_prepare_store(
mmc_pool_t *pool, mmc_request_t *request, const char *cmd, unsigned int cmd_len,
const char *key, unsigned int key_len, unsigned int flags, unsigned int exptime, zval *value TSRMLS_DC) /*
assembles a SET/ADD/REPLACE request does into the buffer, returns MMC_OK on success {{{ */
int mmc_pack_value(mmc_pool_t *pool, mmc_buffer_t *buffer, zval *value, unsigned int *flags TSRMLS_DC) /*
does serialization and compression to pack a zval into the buffer {{{ */
{
char *data = NULL;
unsigned int data_len, data_free = 0;
if (mmc_prepare_key_ex(key, key_len, request->key, &(request->key_len)) != MMC_OK) {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid key");
return MMC_REQUEST_FAILURE;
}
switch (Z_TYPE_P(value)) {
case IS_STRING:
data = Z_STRVAL_P(value);
data_len = Z_STRLEN_P(value);
flags &= ~MMC_SERIALIZED;
*flags &= ~MMC_SERIALIZED;
mmc_compress(pool, buffer, Z_STRVAL_P(value), Z_STRLEN_P(value), flags, 0 TSRMLS_CC);
break;
case IS_LONG:
case IS_DOUBLE:
case IS_BOOL:
*flags &= ~MMC_SERIALIZED;
convert_to_string(value);
data = Z_STRVAL_P(value);
data_len = Z_STRLEN_P(value);
flags &= ~MMC_SERIALIZED;
mmc_compress(pool, buffer, Z_STRVAL_P(value), Z_STRLEN_P(value), flags, 0 TSRMLS_CC);
break;
default: {
php_serialize_data_t value_hash;
zval value_copy, *value_copy_ptr;
smart_str buf = {0};
size_t prev_len = buffer->value.len;
/* FIXME: we should be using 'Z' instead of this, but unfortunately it's PHP5-only */
value_copy = *value;
zval_copy_ctor(&value_copy);
value_copy_ptr = &value_copy;
/* TODO: serialize straight into buffer (voids auto-compression) */
PHP_VAR_SERIALIZE_INIT(value_hash);
php_var_serialize(&buf, &value_copy_ptr, &value_hash TSRMLS_CC);
php_var_serialize(&(buffer->value), &value_copy_ptr, &value_hash TSRMLS_CC);
PHP_VAR_SERIALIZE_DESTROY(value_hash);
/* you're trying to save null or something went really wrong */
if (buf.c == NULL) {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Failed to serialize value");
/* trying to save null or something went really wrong */
if (buffer->value.c == NULL || buffer->value.len == prev_len) {
zval_dtor(&value_copy);
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Failed to serialize value");
return MMC_REQUEST_FAILURE;
}
data = buf.c;
data_len = buf.len;
data_free = 1;
flags |= MMC_SERIALIZED;
*flags |= MMC_SERIALIZED;
zval_dtor(&value_copy);
mmc_compress(pool, buffer, buffer->value.c + prev_len, buffer->value.len - prev_len, flags, 1 TSRMLS_CC);
}
}
/* autocompress large values */
if (pool->compress_threshold && data_len >= pool->compress_threshold) {
flags |= MMC_COMPRESSED;
}
if (flags & MMC_COMPRESSED) {
char *result;
unsigned long result_len;
/* TODO: compress straight into buffer */
if (mmc_compress(data, data_len, &result, &result_len) == MMC_OK) {
if (result_len < data_len * (1 - pool->min_compress_savings)) {
if (data_free) {
efree(data);
}
data = result;
data_len = result_len;
data_free = 1;
}
else {
efree(result);
flags &= ~MMC_COMPRESSED;
}
}
else {
flags &= ~MMC_COMPRESSED;
}
}
/* assemble command */
smart_str_appendl(&(request->sendbuf.value), cmd, cmd_len);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), request->key, request->key_len);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), flags);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), exptime);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_append_unsigned(&(request->sendbuf.value), data_len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
smart_str_appendl(&(request->sendbuf.value), data, data_len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
if (data_free) {
efree(data);
}
return MMC_OK;
}
/* }}} */
static int mmc_unpack_value(mmc_t *mmc, mmc_request_t *request, mmc_buffer_t *buffer TSRMLS_DC) /*
does uncompression and unserializing to construct a zval {{{ */
int mmc_unpack_value(
mmc_t *mmc, mmc_request_t *request, mmc_buffer_t *buffer,
const char *key, unsigned int key_len, unsigned int flags, unsigned int bytes TSRMLS_DC) /*
does uncompression and unserializing to reconstruct a zval {{{ */
{
char *data = NULL;
unsigned long data_len;
@@ -409,17 +364,17 @@ static int mmc_unpack_value(mmc_t *mmc, mmc_request_t *request, mmc_buffer_t *bu
zval value;
INIT_ZVAL(value);
if (request->value.flags & MMC_COMPRESSED) {
if (mmc_uncompress(buffer->value.c, request->value.bytes, &data, &data_len) != MMC_OK) {
if (flags & MMC_COMPRESSED) {
if (mmc_uncompress(buffer->value.c, bytes, &data, &data_len) != MMC_OK) {
return mmc_server_failure(mmc, request->io, "Failed to uncompress data", 0 TSRMLS_CC);
}
}
else {
data = buffer->value.c;
data_len = request->value.bytes;
data_len = bytes;
}
if (request->value.flags & MMC_SERIALIZED) {
if (flags & MMC_SERIALIZED) {
php_unserialize_data_t var_hash;
const unsigned char *p = (unsigned char *)data;
zval *object = &value;
@@ -427,99 +382,35 @@ static int mmc_unpack_value(mmc_t *mmc, mmc_request_t *request, mmc_buffer_t *bu
PHP_VAR_UNSERIALIZE_INIT(var_hash);
if (!php_var_unserialize(&object, &p, p + data_len, &var_hash TSRMLS_CC)) {
PHP_VAR_UNSERIALIZE_DESTROY(var_hash);
if (request->value.flags & MMC_COMPRESSED) {
if (flags & MMC_COMPRESSED) {
efree(data);
}
return mmc_server_failure(mmc, request->io, "Failed to unserialize data", 0 TSRMLS_CC);
}
PHP_VAR_UNSERIALIZE_DESTROY(var_hash);
if (request->value.flags & MMC_COMPRESSED) {
if (flags & MMC_COMPRESSED) {
efree(data);
}
/* delegate to value handler */
return request->value_handler(mmc, request, object, 0, request->value_handler_param TSRMLS_CC);
return request->value_handler(mmc, request, key, key_len, object, 0, request->value_handler_param TSRMLS_CC);
}
else {
/* room for \0 since buffer contains trailing \r\n and uncompress allocates + 1 */
data[data_len] = '\0';
ZVAL_STRINGL(&value, data, data_len, 0);
if (!(request->value.flags & MMC_COMPRESSED)) {
if (!(flags & MMC_COMPRESSED)) {
mmc_buffer_release(buffer);
}
/* delegate to value handler */
return request->value_handler(mmc, request, &value, 0, request->value_handler_param TSRMLS_CC);
return request->value_handler(mmc, request, key, key_len, &value, 0, request->value_handler_param TSRMLS_CC);
}
}
/* }}}*/
static int mmc_server_read_value(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
read the value body into the buffer {{{ */
{
request->readbuf.idx +=
request->io->read(request->io, request->readbuf.value.c + request->readbuf.idx, request->value.bytes + 2 - request->readbuf.idx TSRMLS_CC);
/* done reading? */
if (request->readbuf.idx >= request->value.bytes + 2) {
int result = mmc_unpack_value(mmc, request, &(request->readbuf) TSRMLS_CC);
/* allow parse_value to read next VALUE or END line */
request->parse = mmc_request_parse_value;
mmc_buffer_reset(&(request->readbuf));
return result;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
int mmc_request_parse_line(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads a single line and delegates it to value_handler {{{ */
{
char *line;
int line_len = mmc_stream_get_line(request->io, &line TSRMLS_CC);
if (line_len > 0) {
return request->value_handler(mmc, request, line, line_len, request->value_handler_param TSRMLS_CC);
}
return MMC_REQUEST_MORE;
}
/* }}}*/
int mmc_request_parse_value(mmc_t *mmc, mmc_request_t *request TSRMLS_DC) /*
reads and parses the VALUE <key> <flags> <size> header and then reads the value body {{{ */
{
char *line;
int line_len = mmc_stream_get_line(request->io, &line TSRMLS_CC);
if (line_len > 0) {
if (mmc_str_left(line, "END", line_len, sizeof("END")-1)) {
return MMC_REQUEST_DONE;
}
if (sscanf(line, MMC_VALUE_HEADER, request->value.key, &(request->value.flags), &(request->value.bytes)) != 3) {
return mmc_server_failure(mmc, request->io, "Malformed VALUE header", 0 TSRMLS_CC);
}
request->value.key_len = strlen(request->value.key);
/* memory for data + \r\n */
mmc_buffer_alloc(&(request->readbuf), request->value.bytes + 2);
/* allow read_value handler to read the value body */
request->parse = mmc_server_read_value;
/* read more, php streams buffer input which must be read if available */
return MMC_REQUEST_AGAIN;
}
return MMC_REQUEST_MORE;
}
/* }}}*/
mmc_t *mmc_server_new(
const char *host, int host_len, unsigned short tcp_port, unsigned short udp_port,
@@ -647,16 +538,14 @@ int mmc_server_failure(mmc_t *mmc, mmc_stream_t *io, const char *error, int errn
}
/* }}} */
int mmc_request_failure(mmc_t *mmc, mmc_stream_t *io, char *value, unsigned int value_len, int errnum TSRMLS_DC) /*
int mmc_request_failure(mmc_t *mmc, mmc_stream_t *io, const char *message, unsigned int message_len, int errnum TSRMLS_DC) /*
checks for a valid server generated error message and calls mmc_server_failure() {{{ */
{
if (mmc_str_left(value, "ERROR", value_len, sizeof("ERROR")-1) ||
mmc_str_left(value, "CLIENT_ERROR", value_len, sizeof("CLIENT_ERROR")-1) ||
mmc_str_left(value, "SERVER_ERROR", value_len, sizeof("SERVER_ERROR")-1))
if (mmc_str_left(message, "ERROR", message_len, sizeof("ERROR")-1) ||
mmc_str_left(message, "CLIENT_ERROR", message_len, sizeof("CLIENT_ERROR")-1) ||
mmc_str_left(message, "SERVER_ERROR", message_len, sizeof("SERVER_ERROR")-1))
{
// Trim off trailing \r\n
value[value_len - 2] = '\0';
return mmc_server_failure(mmc, io, value, errnum TSRMLS_CC);
return mmc_server_failure(mmc, io, message, errnum TSRMLS_CC);
}
return mmc_server_failure(mmc, io, "Malformed server response", errnum TSRMLS_CC);
@@ -831,6 +720,14 @@ mmc_pool_t *mmc_pool_new(TSRMLS_D) /* {{{ */
mmc_pool_t *pool = emalloc(sizeof(mmc_pool_t));
memset(pool, 0, sizeof(*pool));
switch (MEMCACHE_G(protocol)) {
case MMC_BINARY_PROTOCOL:
pool->protocol = &mmc_binary_protocol;
break;
default:
pool->protocol = &mmc_ascii_protocol;
}
switch (MEMCACHE_G(hash_strategy)) {
case MMC_CONSISTENT_HASH:
pool->hash = &mmc_consistent_hash;
@@ -887,7 +784,7 @@ void mmc_pool_free(mmc_pool_t *pool TSRMLS_DC) /* {{{ */
/* requests are owned by us so free them */
while ((request = mmc_queue_pop(&(pool->free_requests))) != NULL) {
mmc_request_free(request);
pool->protocol->free_request(request);
}
mmc_queue_free(&(pool->free_requests));
@@ -981,20 +878,22 @@ static int mmc_pool_failover_handler_null(mmc_pool_t *pool, mmc_t *mmc, mmc_requ
}
/* }}}*/
mmc_request_t *mmc_pool_request(mmc_pool_t *pool, int protocol, mmc_request_parser parse,
mmc_request_value_handler value_handler, void *value_handler_param,
mmc_request_failover_handler failover_handler, void *failover_handler_param TSRMLS_DC) /*
allocates a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
static int mmc_pool_response_handler_null(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
always returns failure {{{ */
{
return MMC_REQUEST_DONE;
}
/* }}}*/
static inline mmc_request_t *mmc_pool_request_alloc(mmc_pool_t *pool, int protocol,
mmc_request_failover_handler failover_handler, void *failover_handler_param TSRMLS_DC) /* {{{ */
{
mmc_request_t *request = mmc_queue_pop(&(pool->free_requests));
if (request == NULL) {
request = mmc_request_new();
request = pool->protocol->create_request();
}
else {
request->key_len = 0;
mmc_buffer_reset(&(request->sendbuf));
mmc_queue_reset(&(request->failed_servers));
request->failed_index = 0;
pool->protocol->reset_request(request);
}
request->protocol = protocol;
@@ -1004,10 +903,6 @@ mmc_request_t *mmc_pool_request(mmc_pool_t *pool, int protocol, mmc_request_pars
smart_str_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
}
request->parse = parse;
request->value_handler = value_handler;
request->value_handler_param = value_handler_param;
request->failover_handler = failover_handler != NULL ? failover_handler : mmc_pool_failover_handler_null;
request->failover_handler_param = failover_handler_param;
@@ -1015,11 +910,47 @@ mmc_request_t *mmc_pool_request(mmc_pool_t *pool, int protocol, mmc_request_pars
}
/* }}} */
mmc_request_t *mmc_pool_request(mmc_pool_t *pool, int protocol,
mmc_request_response_handler response_handler, void *response_handler_param,
mmc_request_failover_handler failover_handler, void *failover_handler_param TSRMLS_DC) /*
allocates a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
{
mmc_request_t *request = mmc_pool_request_alloc(pool, protocol, failover_handler, failover_handler_param TSRMLS_CC);
request->response_handler = response_handler;
request->response_handler_param = response_handler_param;
return request;
}
/* }}} */
mmc_request_t *mmc_pool_request_get(mmc_pool_t *pool, int protocol,
mmc_request_value_handler value_handler, void *value_handler_param,
mmc_request_failover_handler failover_handler, void *failover_handler_param TSRMLS_DC) /*
allocates a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
{
mmc_request_t *request = mmc_pool_request(
pool, protocol,
mmc_pool_response_handler_null, NULL,
failover_handler, failover_handler_param TSRMLS_CC);
request->value_handler = value_handler;
request->value_handler_param = value_handler_param;
return request;
}
/* }}} */
mmc_request_t *mmc_pool_clone_request(mmc_pool_t *pool, mmc_request_t *request TSRMLS_DC) /*
clones a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
{
mmc_request_t *clone = mmc_pool_request(pool, request->protocol, request->parse,
request->value_handler, request->value_handler_param, NULL, NULL TSRMLS_CC);
mmc_request_t *clone = mmc_pool_request_alloc(pool, request->protocol, NULL, NULL TSRMLS_CC);
clone->response_handler = request->response_handler;
clone->response_handler_param = request->response_handler_param;
clone->value_handler = request->value_handler;
clone->value_handler_param = request->value_handler_param;
/* copy payload parser */
clone->parse = request->parse;
/* copy key */
memcpy(clone->key, request->key, request->key_len);
@@ -1121,8 +1052,7 @@ int mmc_pool_schedule_key(mmc_pool_t *pool, const char *key, unsigned int key_le
mmc_t *mmc;
unsigned int last_index = 0;
mmc_queue_t skip_servers;
memset(&skip_servers, 0, sizeof(skip_servers));
mmc_queue_t skip_servers = {0};
/* schedule the first request */
mmc = mmc_pool_find(pool, key, key_len TSRMLS_CC);
@@ -1147,13 +1077,22 @@ int mmc_pool_schedule_key(mmc_pool_t *pool, const char *key, unsigned int key_le
/* }}} */
int mmc_pool_schedule_get(
mmc_pool_t *pool, int protocol, const char *key, unsigned int key_len,
mmc_pool_t *pool, int protocol, zval *zkey,
mmc_request_value_handler value_handler, void *value_handler_param,
mmc_request_failover_handler failover_handler, void *failover_handler_param,
mmc_request_t *failed_request TSRMLS_DC) /*
schedules a get command against a server {{{ */
{
mmc_t *mmc = mmc_pool_find(pool, key, key_len TSRMLS_CC);
mmc_t *mmc;
char key[MMC_MAX_KEY_LEN + 1];
unsigned int key_len;
if (mmc_prepare_key(zkey, key, &key_len) != MMC_OK) {
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Invalid key");
return MMC_REQUEST_FAILURE;
}
mmc = mmc_pool_find(pool, key, key_len TSRMLS_CC);
if (!mmc_server_valid(mmc TSRMLS_CC)) {
return MMC_REQUEST_FAILURE;
}
@@ -1161,54 +1100,38 @@ int mmc_pool_schedule_get(
if (mmc->buildreq == NULL) {
mmc_queue_push(&(pool->pending), mmc);
mmc->buildreq = mmc_pool_request(pool, protocol, mmc_request_parse_value,
value_handler, value_handler_param, failover_handler, failover_handler_param TSRMLS_CC);
mmc->buildreq = mmc_pool_request_get(
pool, protocol, value_handler, value_handler_param,
failover_handler, failover_handler_param TSRMLS_CC);
if (failed_request != NULL) {
mmc_queue_copy(&(failed_request->failed_servers), &(mmc->buildreq->failed_servers));
mmc->buildreq->failed_index = failed_request->failed_index;
}
smart_str_appendl(&(mmc->buildreq->sendbuf.value), "get", sizeof("get")-1);
pool->protocol->begin_get(mmc->buildreq);
}
else if (protocol == MMC_PROTO_UDP && mmc->buildreq->sendbuf.value.len + key_len + 3 > MMC_MAX_UDP_LEN) {
/* datagram if full, schedule for delivery */
smart_str_appendl(&(mmc->buildreq->sendbuf.value), "\r\n", sizeof("\r\n")-1);
pool->protocol->end_get(mmc->buildreq);
mmc_pool_schedule(pool, mmc, mmc->buildreq TSRMLS_CC);
/* begin sending requests immediatly */
mmc_pool_select(pool, 0 TSRMLS_CC);
mmc->buildreq = mmc_pool_request(pool, protocol, mmc_request_parse_value,
value_handler, value_handler_param, failover_handler, failover_handler_param TSRMLS_CC);
mmc->buildreq = mmc_pool_request_get(
pool, protocol, value_handler, value_handler_param,
failover_handler, failover_handler_param TSRMLS_CC);
if (failed_request != NULL) {
mmc_queue_copy(&(failed_request->failed_servers), &(mmc->buildreq->failed_servers));
mmc->buildreq->failed_index = failed_request->failed_index;
}
smart_str_appendl(&(mmc->buildreq->sendbuf.value), "get", sizeof("get")-1);
pool->protocol->begin_get(mmc->buildreq);
}
smart_str_appendl(&(mmc->buildreq->sendbuf.value), " ", 1);
smart_str_appendl(&(mmc->buildreq->sendbuf.value), key, key_len);
return MMC_OK;
}
/* }}} */
int mmc_pool_schedule_command(
mmc_pool_t *pool, mmc_t *mmc, char *cmd, unsigned int cmd_len, mmc_request_parser parse,
mmc_request_value_handler value_handler, void *value_handler_param TSRMLS_DC) /*
schedules a single command against a server {{{ */
{
mmc_request_t *request = mmc_pool_request(pool, MMC_PROTO_TCP, parse, value_handler, value_handler_param, NULL, NULL TSRMLS_CC);
smart_str_appendl(&(request->sendbuf.value), cmd, cmd_len);
if (mmc_pool_schedule(pool, mmc, request TSRMLS_CC) != MMC_OK) {
return MMC_REQUEST_FAILURE;
}
pool->protocol->append_get(mmc->buildreq, zkey, key, key_len);
return MMC_OK;
}
/* }}} */
@@ -1300,10 +1223,19 @@ void mmc_pool_select(mmc_pool_t *pool, long timeout TSRMLS_DC) /*
if (result <= 0) {
for (i=0; i < sending->len; i++) {
mmc_select_failure(pool, mmc_queue_item(sending, i), ((mmc_t *)mmc_queue_item(sending, i))->sendreq, result TSRMLS_CC);
mmc = (mmc_t *)mmc_queue_item(sending, i);
if (!FD_ISSET(mmc->sendreq->io->fd, &wfds)) {
mmc_queue_remove(sending, mmc);
mmc_select_failure(pool, mmc, mmc->sendreq, result TSRMLS_CC);
}
}
for (i=0; i < reading->len; i++) {
mmc_select_failure(pool, mmc_queue_item(reading, i), ((mmc_t *)mmc_queue_item(reading, i))->readreq, result TSRMLS_CC);
mmc = (mmc_t *)mmc_queue_item(reading, i);
if (!FD_ISSET(mmc->readreq->io->fd, &rfds)) {
mmc_queue_remove(reading, mmc);
mmc_select_failure(pool, mmc, mmc->readreq, result TSRMLS_CC);
}
}
}
@@ -1336,6 +1268,7 @@ void mmc_pool_select(mmc_pool_t *pool, long timeout TSRMLS_DC) /*
break;
case MMC_REQUEST_MORE:
/* send more data to socket */
break;
default:
@@ -1412,6 +1345,14 @@ void mmc_pool_select(mmc_pool_t *pool, long timeout TSRMLS_DC) /*
break;
case MMC_REQUEST_MORE:
/* read more data from socket */
if (php_stream_eof(mmc->readreq->io->stream)) {
result = mmc_server_failure(mmc, mmc->readreq->io, "Read failed (socket was unexpectedly closed)", 0 TSRMLS_CC);
if (result == MMC_REQUEST_FAILURE) {
/* take server offline and failover requests */
mmc_server_deactivate(pool, mmc TSRMLS_CC);
}
}
break;
case MMC_REQUEST_AGAIN:
@@ -1441,7 +1382,7 @@ void mmc_pool_run(mmc_pool_t *pool TSRMLS_DC) /*
{
mmc_t *mmc;
while ((mmc = mmc_queue_pop(&(pool->pending))) != NULL) {
smart_str_appendl(&(mmc->buildreq->sendbuf.value), "\r\n", sizeof("\r\n")-1);
pool->protocol->end_get(mmc->buildreq);
mmc_pool_schedule(pool, mmc, mmc->buildreq TSRMLS_CC);
mmc->buildreq = NULL;
}
@@ -1494,3 +1435,13 @@ inline int mmc_prepare_key(zval *key, char *result, unsigned int *result_len) /
}
}
/* }}} */
/*
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: noet sw=4 ts=4 fdm=marker
* vim<600: noet sw=4 ts=4
*/

View File

@@ -44,37 +44,42 @@
#define MMC_SERIALIZED 1
#define MMC_COMPRESSED 2
#define MMC_BUFFER_SIZE 4096
#define MMC_MAX_UDP_LEN 1400
#define MMC_MAX_KEY_LEN 250
#define MMC_VALUE_HEADER "VALUE %250s %u %lu" /* keep in sync with MMC_MAX_KEY_LEN */
#define MMC_BUFFER_SIZE 4096
#define MMC_MAX_UDP_LEN 1400
#define MMC_MAX_KEY_LEN 250
#define MMC_VALUE_HEADER "VALUE %250s %u %lu" /* keep in sync with MMC_MAX_KEY_LEN */
#define MMC_COMPRESSION_LEVEL Z_DEFAULT_COMPRESSION
#define MMC_DEFAULT_SAVINGS 0.2 /* minimum 20% savings for compression to be used */
#define MMC_COMPRESSION_LEVEL Z_DEFAULT_COMPRESSION
#define MMC_DEFAULT_SAVINGS 0.2 /* minimum 20% savings for compression to be used */
#define MMC_PROTO_TCP 0
#define MMC_PROTO_UDP 1
#define MMC_STATUS_FAILED -1 /* server/stream is down */
#define MMC_STATUS_DISCONNECTED 0 /* stream is disconnected, ie. new connection */
#define MMC_STATUS_UNKNOWN 1 /* stream is in unknown state, ie. non-validated persistent connection */
#define MMC_STATUS_CONNECTED 2 /* stream is connected */
#define MMC_STATUS_FAILED -1 /* server/stream is down */
#define MMC_STATUS_DISCONNECTED 0 /* stream is disconnected, ie. new connection */
#define MMC_STATUS_UNKNOWN 1 /* stream is in unknown state, ie. non-validated persistent connection */
#define MMC_STATUS_CONNECTED 2 /* stream is connected */
#define MMC_OK 0
#define MMC_OK 0
#define MMC_REQUEST_FAILURE -1 /* operation failed, failover to other server */
#define MMC_REQUEST_DONE 0 /* ok result, or reading/writing is done */
#define MMC_REQUEST_MORE 1 /* more data follows in another packet, try select() again */
#define MMC_REQUEST_AGAIN 2 /* more data follows in this packet, try read/write again */
#define MMC_REQUEST_RETRY 3 /* retry/reschedule request */
#define MMC_REQUEST_FAILURE -1 /* operation failed, failover to other server */
#define MMC_REQUEST_DONE 0 /* ok result, or reading/writing is done */
#define MMC_REQUEST_MORE 1 /* more data follows in another packet, try select() again */
#define MMC_REQUEST_AGAIN 2 /* more data follows in this packet, try read/write again */
#define MMC_REQUEST_RETRY 3 /* retry/reschedule request */
#define MMC_STANDARD_HASH 1
#define MMC_CONSISTENT_HASH 2
#define MMC_HASH_CRC32 1 /* CRC32 hash function */
#define MMC_HASH_FNV1A 2 /* FNV-1a hash function */
#define MMC_RESPONSE_UNKNOWN -2
#define MMC_RESPONSE_ERROR -1
#define MMC_RESPONSE_NOT_FOUND 1 /* same as binary protocol */
#define MMC_RESPONSE_EXISTS 2 /* same as binary protocol */
#define MMC_CONSISTENT_POINTS 160 /* points per server */
#define MMC_CONSISTENT_BUCKETS 1024 /* number of precomputed buckets, should be power of 2 */
#define MMC_STANDARD_HASH 1
#define MMC_CONSISTENT_HASH 2
#define MMC_HASH_CRC32 1 /* CRC32 hash function */
#define MMC_HASH_FNV1A 2 /* FNV-1a hash function */
#define MMC_CONSISTENT_POINTS 160 /* points per server */
#define MMC_CONSISTENT_BUCKETS 1024 /* number of precomputed buckets, should be power of 2 */
/* io buffer */
typedef struct mmc_buffer {
@@ -85,6 +90,9 @@ typedef struct mmc_buffer {
#define mmc_buffer_release(b) memset((b), 0, sizeof(*(b)))
#define mmc_buffer_reset(b) (b)->value.len = (b)->idx = 0
inline void mmc_buffer_alloc(mmc_buffer_t *, unsigned int);
inline void mmc_buffer_free(mmc_buffer_t *);
/* stream handlers */
typedef struct mmc_stream mmc_stream_t;
@@ -116,35 +124,35 @@ typedef struct mmc_request mmc_request_t;
typedef int (*mmc_request_reader)(mmc_t *mmc, mmc_request_t *request TSRMLS_DC);
typedef int (*mmc_request_parser)(mmc_t *mmc, mmc_request_t *request TSRMLS_DC);
typedef int (*mmc_request_value_handler)(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC);
typedef int (*mmc_request_value_handler)(mmc_t *mmc, mmc_request_t *request, const char *key, unsigned int key_len, void *value, unsigned int value_len, void *param TSRMLS_DC);
typedef int (*mmc_request_response_handler)(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC);
typedef int (*mmc_request_failover_handler)(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request, void *param TSRMLS_DC);
void mmc_request_reset(mmc_request_t *);
void mmc_request_free(mmc_request_t *);
/* server request */
struct mmc_request {
mmc_stream_t *io; /* stream this request is sending on */
mmc_buffer_t sendbuf; /* the command to send to server */
mmc_buffer_t readbuf; /* used when reading values */
char key[MMC_MAX_KEY_LEN + 1]; /* key buffer to use on failover of single-key requests */
unsigned int key_len;
unsigned int protocol; /* protocol encoding of request */
mmc_queue_t failed_servers; /* servers this request has failed at */
unsigned int failed_index; /* last index probed on failure */
mmc_request_reader read; /* handles reading (and validating datagrams) */
mmc_request_parser parse; /* parses read values */
mmc_request_value_handler value_handler; /* called when values has been parsed */
void *value_handler_param; /* parameter to value handler */
mmc_stream_t *io; /* stream this request is sending on */
mmc_buffer_t sendbuf; /* request buffer */
mmc_buffer_t readbuf; /* response buffer */
char key[MMC_MAX_KEY_LEN + 1]; /* key buffer to use on failover of single-key requests */
unsigned int key_len;
unsigned int protocol; /* protocol encoding of request {tcp, udp} */
mmc_queue_t failed_servers; /* mmc_queue_t<mmc_t *>, servers this request has failed at */
unsigned int failed_index; /* last index probed on failure */
mmc_request_reader read; /* handles reading (and validating datagrams) */
mmc_request_parser parse; /* called to parse response payload */
mmc_request_value_handler value_handler; /* called when result value have been parsed */
void *value_handler_param;
mmc_request_response_handler response_handler; /* called when a non-value response has been received */
void *response_handler_param;
mmc_request_failover_handler failover_handler; /* called to perform failover of failed request */
void *failover_handler_param; /* parameter to failover handler */
struct { /* stores value info while it's body is being read */
char key[MMC_MAX_KEY_LEN + 1];
unsigned int key_len;
unsigned int flags;
unsigned long bytes;
} value;
void *failover_handler_param;
struct {
uint16_t reqid; /* id of the request, increasing value set by client */
uint16_t seqid; /* next expected seqid */
uint16_t total; /* number of datagrams in response */
uint16_t reqid; /* id of the request, increasing value set by client */
uint16_t seqid; /* next expected seqid */
uint16_t total; /* number of datagrams in response */
} udp;
};
@@ -173,12 +181,60 @@ struct mmc {
int errnum; /* last error code */
};
/* wire protocol */
#define MMC_ASCII_PROTOCOL 1
#define MMC_BINARY_PROTOCOL 2
/* same as in binary protocol */
#define MMC_OP_SET 1
#define MMC_OP_ADD 2
#define MMC_OP_REPLACE 3
typedef mmc_request_t * (*mmc_protocol_create_request)();
typedef void (*mmc_protocol_reset_request)(mmc_request_t *request);
typedef void (*mmc_protocol_free_request)(mmc_request_t *request);
typedef void (*mmc_protocol_get)(mmc_request_t *request, zval *zkey, const char *key, unsigned int key_len);
typedef void (*mmc_protocol_begin_get)(mmc_request_t *request);
typedef void (*mmc_protocol_append_get)(mmc_request_t *request, zval *zkey, const char *key, unsigned int key_len);
typedef void (*mmc_protocol_end_get)(mmc_request_t *request);
typedef int (*mmc_protocol_store)(mmc_pool_t *pool, mmc_request_t *request, int op, const char *key, unsigned int key_len, unsigned int flags, unsigned int exptime, zval *value TSRMLS_DC);
typedef void (*mmc_protocol_delete)(mmc_request_t *request, const char *key, unsigned int key_len, unsigned int exptime);
typedef void (*mmc_protocol_mutate)(mmc_request_t *request, const char *key, unsigned int key_len, long value, long defval, unsigned int exptime);
typedef void (*mmc_protocol_flush)(mmc_request_t *request, unsigned int exptime);
typedef void (*mmc_protocol_stats)(mmc_request_t *request, const char *type, long slabid, long limit);
typedef void (*mmc_protocol_version)(mmc_request_t *request);
typedef struct mmc_protocol {
mmc_protocol_create_request create_request;
mmc_protocol_reset_request reset_request;
mmc_protocol_free_request free_request;
mmc_protocol_get get;
mmc_protocol_begin_get begin_get;
mmc_protocol_append_get append_get;
mmc_protocol_end_get end_get;
mmc_protocol_store store;
mmc_protocol_delete delete;
mmc_protocol_mutate mutate;
mmc_protocol_flush flush;
mmc_protocol_version version;
mmc_protocol_stats stats;
} mmc_protocol_t;
extern mmc_protocol_t mmc_ascii_protocol;
extern mmc_protocol_t mmc_binary_protocol;
/* hashing strategy */
typedef unsigned int (*mmc_hash_function)(const char *, int);
typedef unsigned int (*mmc_hash_function)(const char *key, unsigned int key_len);
typedef void * (*mmc_hash_create_state)(mmc_hash_function);
typedef void (*mmc_hash_free_state)(void *);
typedef mmc_t * (*mmc_hash_find_server)(void *, const char *, int TSRMLS_DC);
typedef void (*mmc_hash_add_server)(void *, mmc_t *, unsigned int);
typedef void (*mmc_hash_free_state)(void *state);
typedef mmc_t * (*mmc_hash_find_server)(void *state, const char *key, unsigned int key_len TSRMLS_DC);
typedef void (*mmc_hash_add_server)(void *state, mmc_t *mmc, unsigned int weight);
typedef struct mmc_hash {
mmc_hash_create_state create_state;
@@ -191,8 +247,8 @@ extern mmc_hash_t mmc_standard_hash;
extern mmc_hash_t mmc_consistent_hash;
/* 32 bit magic FNV-1a prime and init */
#define FNV_32_PRIME 0x01000193
#define FNV_32_INIT 0x811c9dc5
#define FNV_32_PRIME 0x01000193
#define FNV_32_INIT 0x811c9dc5
/* failure callback prototype */
typedef void (*mmc_failure_callback)(mmc_pool_t *pool, mmc_t *mmc, void *param TSRMLS_DC);
@@ -201,7 +257,8 @@ typedef void (*mmc_failure_callback)(mmc_pool_t *pool, mmc_t *mmc, void *param T
struct mmc_pool {
mmc_t **servers;
int num_servers;
mmc_hash_t *hash; /* hash strategy methods */
mmc_protocol_t *protocol; /* wire protocol */
mmc_hash_t *hash; /* hash strategy */
void *hash_state; /* strategy specific state */
mmc_queue_t *sending; /* mmc_queue_t<mmc_t *>, connections that want to send */
mmc_queue_t *reading; /* mmc_queue_t<mmc_t *>, connections that want to read */
@@ -221,11 +278,7 @@ void mmc_server_free(mmc_t * TSRMLS_DC);
void mmc_server_disconnect(mmc_t *mmc, mmc_stream_t *io TSRMLS_DC);
int mmc_server_valid(mmc_t * TSRMLS_DC);
int mmc_server_failure(mmc_t *, mmc_stream_t *, const char *, int TSRMLS_DC);
int mmc_request_failure(mmc_t *, mmc_stream_t *, char *, unsigned int, int TSRMLS_DC);
/* request functions */
int mmc_request_parse_line(mmc_t *, mmc_request_t * TSRMLS_DC);
int mmc_request_parse_value(mmc_t *, mmc_request_t * TSRMLS_DC);
int mmc_request_failure(mmc_t *, mmc_stream_t *, const char *, unsigned int, int TSRMLS_DC);
/* pool functions */
mmc_pool_t *mmc_pool_new(TSRMLS_D);
@@ -239,8 +292,11 @@ mmc_t *mmc_pool_find(mmc_pool_t *, const char *, unsigned int TSRMLS_DC);
int mmc_pool_schedule(mmc_pool_t *, mmc_t *, mmc_request_t * TSRMLS_DC);
int mmc_pool_failover_handler(mmc_pool_t *, mmc_t *, mmc_request_t *, void * TSRMLS_DC);
mmc_request_t *mmc_pool_request(mmc_pool_t *, int, mmc_request_parser,
mmc_request_t *mmc_pool_request(mmc_pool_t *, int,
mmc_request_response_handler, void *, mmc_request_failover_handler, void * TSRMLS_DC);
mmc_request_t *mmc_pool_request_get(mmc_pool_t *, int,
mmc_request_value_handler, void *, mmc_request_failover_handler, void * TSRMLS_DC);
#define mmc_pool_release(p, r) mmc_queue_push(&((p)->free_requests), (r))
int mmc_prepare_store(
@@ -248,12 +304,13 @@ int mmc_prepare_store(
const char *, unsigned int, unsigned int, unsigned int, zval * TSRMLS_DC);
int mmc_pool_schedule_key(mmc_pool_t *, const char *, unsigned int, mmc_request_t *, unsigned int TSRMLS_DC);
int mmc_pool_schedule_get(mmc_pool_t *, int, const char *, unsigned int,
int mmc_pool_schedule_get(mmc_pool_t *, int, zval *,
mmc_request_value_handler, void *, mmc_request_failover_handler, void *, mmc_request_t * TSRMLS_DC);
int mmc_pool_schedule_command(mmc_pool_t *, mmc_t *, char *, unsigned int,
mmc_request_parser, mmc_request_value_handler, void * TSRMLS_DC);
/* utility functions */
int mmc_pack_value(mmc_pool_t *, mmc_buffer_t *, zval *, unsigned int * TSRMLS_DC);
int mmc_unpack_value(mmc_t *, mmc_request_t *, mmc_buffer_t *, const char *, unsigned int, unsigned int, unsigned int TSRMLS_DC);
inline int mmc_prepare_key_ex(const char *, unsigned int, char *, unsigned int *);
inline int mmc_prepare_key(zval *, char *, unsigned int *);
@@ -263,6 +320,7 @@ inline int mmc_prepare_key(zval *, char *, unsigned int *);
ZEND_BEGIN_MODULE_GLOBALS(memcache)
long default_port;
long chunk_size;
long protocol;
long hash_strategy;
long hash_function;
long allow_failover;

View File

@@ -148,20 +148,19 @@ PS_CLOSE_FUNC(memcache)
}
/* }}} */
static mmc_request_t *php_mmc_session_read_request(mmc_pool_t *pool, const char *key, zval *result TSRMLS_DC) /* {{{ */
static mmc_request_t *php_mmc_session_read_request(mmc_pool_t *pool, zval *zkey, zval *result TSRMLS_DC) /* {{{ */
{
mmc_request_t *request = mmc_pool_request(pool, MMC_PROTO_UDP, mmc_request_parse_value,
mmc_value_handler_single, result, mmc_pool_failover_handler, NULL TSRMLS_CC);
mmc_request_t *request = mmc_pool_request_get(
pool, MMC_PROTO_UDP,
mmc_value_handler_single, result,
mmc_pool_failover_handler, NULL TSRMLS_CC);
if (mmc_prepare_key_ex(key, strlen(key), request->key, &(request->key_len)) != MMC_OK) {
if (mmc_prepare_key_ex(Z_STRVAL_P(zkey), Z_STRLEN_P(zkey), request->key, &(request->key_len)) != MMC_OK) {
mmc_pool_release(pool, request);
return NULL;
}
smart_str_appendl(&(request->sendbuf.value), "get ", sizeof("get ")-1);
smart_str_appendl(&(request->sendbuf.value), request->key, request->key_len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
pool->protocol->get(request, zkey, request->key, request->key_len);
return request;
}
/* }}} */
@@ -173,17 +172,18 @@ PS_READ_FUNC(memcache)
mmc_pool_t *pool = PS_GET_MOD_DATA();
if (pool != NULL) {
zval result;
zval result, zkey;
mmc_t *mmc;
mmc_request_t *request;
mmc_queue_t skip_servers;
unsigned int last_index = 0;
ZVAL_FALSE(&result);
ZVAL_STRING(&zkey, (char *)key, 0);
memset(&skip_servers, 0, sizeof(skip_servers));
/* create request */
request = php_mmc_session_read_request(pool, key, &result TSRMLS_CC);
request = php_mmc_session_read_request(pool, &zkey, &result TSRMLS_CC);
if (request == NULL) {
return FAILURE;
}
@@ -199,7 +199,7 @@ PS_READ_FUNC(memcache)
/* retry missing value (possibly due to server restart) */
while (Z_TYPE(result) != IS_STRING && skip_servers.len < MEMCACHE_G(session_redundancy)-1 && skip_servers.len < pool->num_servers) {
request = php_mmc_session_read_request(pool, key, &result TSRMLS_CC);
request = php_mmc_session_read_request(pool, &zkey, &result TSRMLS_CC);
if (request == NULL) {
break;
}
@@ -240,14 +240,19 @@ PS_WRITE_FUNC(memcache)
mmc_request_t *request;
/* allocate request */
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
request = mmc_pool_request(pool, MMC_PROTO_TCP,
mmc_stored_handler, &result, mmc_pool_failover_handler, NULL TSRMLS_CC);
if (mmc_prepare_key_ex(key, strlen(key), request->key, &(request->key_len)) != MMC_OK) {
mmc_pool_release(pool, request);
return FAILURE;
}
ZVAL_NULL(&result);
ZVAL_STRINGL(&value, (char *)val, vallen, 0);
/* assemble command */
if (mmc_prepare_store(pool, request, "set", sizeof("set")-1, key, strlen(key), 0, INI_INT("session.gc_maxlifetime"), &value TSRMLS_CC) != MMC_OK) {
if (pool->protocol->store(pool, request, MMC_OP_SET, request->key, request->key_len, 0, INI_INT("session.gc_maxlifetime"), &value TSRMLS_CC) != MMC_OK) {
mmc_pool_release(pool, request);
return FAILURE;
}
@@ -269,17 +274,16 @@ PS_WRITE_FUNC(memcache)
}
/* }}} */
static int mmc_deleted_handler(mmc_t *mmc, mmc_request_t *request, void *value, unsigned int value_len, void *param TSRMLS_DC) /*
static int mmc_deleted_handler(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param TSRMLS_DC) /*
parses a DELETED response line, param is a zval pointer to store result into {{{ */
{
if (mmc_str_left((char *)value, "DELETED", value_len, sizeof("DELETED")-1) ||
mmc_str_left((char *)value, "NOT_FOUND", value_len, sizeof("NOT_FOUND")-1))
if (response == MMC_OK || response == MMC_RESPONSE_NOT_FOUND)
{
ZVAL_TRUE((zval *)param);
return MMC_REQUEST_DONE;
}
return mmc_request_failure(mmc, request->io, (char *)value, value_len, 0 TSRMLS_CC);
return mmc_request_failure(mmc, request->io, message, message_len, 0 TSRMLS_CC);
}
/* }}} */
@@ -294,7 +298,7 @@ PS_DESTROY_FUNC(memcache)
mmc_request_t *request;
/* allocate request */
request = mmc_pool_request(pool, MMC_PROTO_TCP, mmc_request_parse_line,
request = mmc_pool_request(pool, MMC_PROTO_TCP,
mmc_deleted_handler, &result, mmc_pool_failover_handler, NULL TSRMLS_CC);
ZVAL_FALSE(&result);
@@ -303,10 +307,7 @@ PS_DESTROY_FUNC(memcache)
return FAILURE;
}
smart_str_appendl(&(request->sendbuf.value), "delete", sizeof("delete")-1);
smart_str_appendl(&(request->sendbuf.value), " ", 1);
smart_str_appendl(&(request->sendbuf.value), request->key, request->key_len);
smart_str_appendl(&(request->sendbuf.value), "\r\n", sizeof("\r\n")-1);
pool->protocol->delete(request, request->key, request->key_len, 0);
/* schedule request */
if (mmc_pool_schedule_key(pool, request->key, request->key_len, request, MEMCACHE_G(session_redundancy) TSRMLS_CC) != MMC_OK) {

View File

@@ -56,7 +56,7 @@ void mmc_standard_free_state(void *s) /* {{{ */
}
/* }}} */
mmc_t *mmc_standard_find_server(void *s, const char *key, int key_len TSRMLS_DC) /* {{{ */
mmc_t *mmc_standard_find_server(void *s, const char *key, unsigned int key_len TSRMLS_DC) /* {{{ */
{
mmc_standard_state_t *state = s;

View File

@@ -31,12 +31,13 @@ http://pear.php.net/dtd/package-2.0.xsd">
<api>3.0.0</api>
</version>
<stability>
<release>beta</release>
<api>beta</api>
<release>alpha</release>
<api>alpha</api>
</stability>
<license uri="http://www.php.net/license">PHP License</license>
<notes>
- UDP support
- Binary protocol support
- Non-blocking IO using select()
- Pipelined multi-set/delete/increment/decrement
- Key and session redundancy (values are written to N mirrors)
@@ -60,6 +61,8 @@ http://pear.php.net/dtd/package-2.0.xsd">
<file name="memcache_pool.c" role="src" />
<file name="memcache_queue.c" role="src" />
<file name="memcache_session.c" role="src" />
<file name="memcache_ascii_protocol.c" role="src" />
<file name="memcache_binary_protocol.c" role="src" />
<file name="memcache_standard_hash.c" role="src" />
<file name="memcache_consistent_hash.c" role="src" />
<file name="memcache.dsp" role="src" />

View File

@@ -66,8 +66,8 @@ PHP_FUNCTION(memcache_flush);
/* internal functions */
mmc_t *mmc_find_persistent(const char *, int, unsigned short, unsigned short, int, int TSRMLS_DC);
int mmc_value_handler_single(mmc_t *, mmc_request_t *, void *, unsigned int, void * TSRMLS_DC);
int mmc_stored_handler(mmc_t *, mmc_request_t *, void *, unsigned int, void * TSRMLS_DC);
int mmc_value_handler_single(mmc_t *, mmc_request_t *, const char *, unsigned int, void *, unsigned int, void * TSRMLS_DC);
int mmc_stored_handler(mmc_t *, mmc_request_t *, int, const char *, unsigned int, void * TSRMLS_DC);
/* session handler struct */
#if HAVE_MEMCACHE_SESSION

View File

@@ -14,13 +14,16 @@ memcache_set($memcache, $key, $var, false, 10);
$result = memcache_get($memcache, $key);
var_dump($result);
$result = memcache_get($memcache, Array($key, $key."1"));
var_dump($result);
$result = memcache_get($memcache, array($key, $key."1"));
var_dump(count($result));
if (is_array($result)) {
$keys = array_keys($result);
var_dump(strtr($keys[0], "\r\n\0", "___"));
}
?>
--EXPECT--
--EXPECTF--
string(4) "test"
array(1) {
["test____-_really_strange_key"]=>
string(4) "test"
}
int(1)
string(28) "test%skey"

View File

@@ -8,18 +8,20 @@ memcache_get_version() & memcache_get_stats()
include 'connect.inc';
$version = memcache_get_version($memcache);
var_dump($version);
$stats = memcache_get_stats($memcache);
if ($version) {
echo "version ok\n";
if (ini_get('memcache.protocol') == 'binary') {
var_dump($stats === false);
var_dump(1);
}
if ($stats) {
echo "stats ok\n";
else {
var_dump(count($stats) > 10);
var_dump(count($stats));
}
?>
--EXPECT--
version ok
stats ok
--EXPECTF--
string(%d) "%s"
bool(true)
int(%d)

View File

@@ -8,18 +8,20 @@ memcache->getVersion() & memcache->getStats()
include 'connect.inc';
$version = $memcache->getVersion();
var_dump($version);
$stats = $memcache->getStats();
if ($version) {
echo "version ok\n";
if (ini_get('memcache.protocol') == 'binary') {
var_dump($stats === false);
var_dump(1);
}
if ($stats) {
echo "stats ok\n";
else {
var_dump(count($stats) > 10);
var_dump(count($stats));
}
?>
--EXPECT--
version ok
stats ok
--EXPECTF--
string(%d) "%s"
bool(true)
int(%d)

View File

@@ -1,7 +1,7 @@
--TEST--
memcache->getExtendedStats()
--SKIPIF--
<?php include 'connect.inc'; if (!isset($host2)) die('skip $host2 not set'); ?>
<?php include 'connect.inc'; if (!isset($host2)) die('skip $host2 not set'); if (ini_get('memcache.protocol') == 'binary') die('skip binary protocol does not support stats'); ?>
--FILE--
<?php

View File

@@ -1,7 +1,7 @@
--TEST--
memcache_get_extended_stats()
--SKIPIF--
<?php include 'connect.inc'; if (!isset($host2)) die('skip $host2 not set'); ?>
<?php include 'connect.inc'; if (!isset($host2)) die('skip $host2 not set'); if (ini_get('memcache.protocol') == 'binary') die('skip binary protocol does not support stats'); ?>
--FILE--
<?php

View File

@@ -1,7 +1,7 @@
--TEST--
memcache->getStats() with arguments
--SKIPIF--
<?php include 'connect.inc'; ?>
<?php include 'connect.inc'; if (ini_get('memcache.protocol') == 'binary') die('skip binary protocol does not support stats'); ?>
--FILE--
<?php
@@ -34,10 +34,8 @@ var_dump($result[key($result)][1]);
$result = $memcache->getStats('items');
var_dump($result['items'][$slab]['number']);
$result = $memcache->getStats('sizes');
var_dump($result['64']);
print "\n";
//$result = $memcache->getStats('sizes');
//var_dump($result['64']);
$result = $memcache->getExtendedStats('abc');
var_dump($result["$host:$port"]);
@@ -59,8 +57,6 @@ string(%d) "%d"
string(%d) "%d"
string(%d) "%d"
string(%d) "%d"
string(%d) "%d"
Warning: %s: Invalid stats type %s
NULL

View File

@@ -1,7 +1,7 @@
--TEST--
memcache::connect() with unix domain socket
--SKIPIF--
<?php include 'connect.inc'; if (!isset($domainsocket)) die('skip $domainsocket not set'); ?>
<?php include 'connect.inc'; if (!isset($domainsocket)) die('skip $domainsocket not set'); if (ini_get('memcache.protocol') == 'binary') die('skip binary protocol does not support unix domain sockets'); ?>
--FILE--
<?php

View File

@@ -7,6 +7,11 @@ ini_set('memcache.session_redundancy')
include 'connect.inc';
if (!isset($udpPort))
$udpPort = 0;
if (!isset($udpPort2))
$udpPort2 = 0;
ini_set('memcache.session_redundancy', 10);
ini_set('session.save_handler', 'memcache');
ini_set('session.save_path', "tcp://$host:$port?udp_port=$udpPort, tcp://$host2:$port2?udp_port=$udpPort2");

73
tests/046.phpt Normal file
View File

@@ -0,0 +1,73 @@
--TEST--
hash strategies and functions
--SKIPIF--
<?php include 'connect.inc'; if (!isset($host2)) die('skip $host2 not set'); ?>
--FILE--
<?php
include 'connect.inc';
$var1 = 'test1';
$var2 = 'test2';
$memcache1 = memcache_connect($host, $port);
$memcache2 = memcache_pconnect($host2, $port2);
foreach ($balanceKeys as $strategy => $functions) {
foreach ($functions as $function => $keys) {
ini_set('memcache.hash_strategy', $strategy);
ini_set('memcache.hash_function', $function);
list ($balanceKey1, $balanceKey2) = $keys;
print "\n$strategy:$function\n";
$memcache = new Memcache();
$memcache->addServer($host, $port);
$memcache->addServer($host2, $port2);
$memcache1->set($balanceKey1, '', false, 2);
$memcache1->set($balanceKey2, '', false, 2);
$memcache2->set($balanceKey1, '', false, 2);
$memcache2->set($balanceKey2, '', false, 2);
$memcache->set($balanceKey1, $var1, false, 2); // hashes to $host2
$memcache->set($balanceKey2, $var2, false, 2); // hashes to $host1
$result4 = $memcache1->get($balanceKey1); // return false; key1 is at $host2
$result5 = $memcache1->get($balanceKey2);
$result6 = $memcache2->get($balanceKey1);
$result7 = $memcache2->get($balanceKey2); // return false; key2 is at $host1
var_dump($result4);
var_dump($result5);
var_dump($result6);
var_dump($result7);
}
}
?>
--EXPECT--
consistent:crc32
string(0) ""
string(5) "test2"
string(5) "test1"
string(0) ""
consistent:fnv
string(0) ""
string(5) "test2"
string(5) "test1"
string(0) ""
standard:crc32
string(0) ""
string(5) "test2"
string(5) "test1"
string(0) ""
standard:fnv
string(0) ""
string(5) "test2"
string(5) "test1"
string(0) ""

View File

@@ -16,6 +16,10 @@ $host2 = "localhost";
$port2 = 11212;
$udpPort2 = 11212;
//ini_set('memcache.hash_strategy', 'standard');
//ini_set('memcache.hash_function', 'fnv');
//ini_set('memcache.protocol', 'binary');
/* Start a server listening to a unix domain socket
*
* mkdir /var/run/memcached
@@ -30,21 +34,20 @@ $nonExistingPort = 11213;
$udpPacketSize = 1400;
switch (strtolower(ini_get('memcache.hash_strategy'))) {
case 'consistent':
// with preloaded buckets
$balanceKey1 = 'key1_abc';
$balanceKey2 = 'key2_abcde';
// without preloaded buckets
//$balanceKey1 = 'key1_abcde';
//$balanceKey2 = 'key2_abcdefg';
break;
$balanceKeys = array(
'consistent' => array(
'crc32' => array('key1_abc', 'key2_abcde'),
'fnv' => array('key1_a', 'key2_2534534'),
),
'standard' => array(
'crc32' => array('load_test_key1', 'load_test_key2'),
'fnv' => array('key1_ab', 'key2_a'),
),
);
default:
$balanceKey1 = 'load_test_key1';
$balanceKey2 = 'load_test_key2';
}
$strat = strtolower(ini_get('memcache.hash_strategy'));
$func = strtolower(ini_get('memcache.hash_function'));
list ($balanceKey1, $balanceKey2) = $balanceKeys[$strat][$func];
$memcache = memcache_connect($host, $port);