Rollback the automatic resharding distributor

I accidentally pulled this when getting some of the pull requests
integrated (git flow style) for this release.  I like the idea
for sure, but I think it needs more detailed documentation and
further testing.

At the very least, I need to understand it :)
This commit is contained in:
michael-grunder
2013-08-31 19:20:34 -07:00
parent f5fc23cbe2
commit eb0bbbafa0
2 changed files with 26 additions and 134 deletions

View File

@@ -88,46 +88,6 @@ In order to control the distribution of keys by hand, you can provide a custom f
For instance, instanciate a RedisArray object with `new RedisArray(array("us-host", "uk-host", "de-host"), array("distributor" => "dist"));` and write a function called "dist" that will return `2` for all the keys that should end up on the "de-host" server.
You may also provide an array of 2 values that will be used as follows:
- The first value is the initial amount of shards in use before the resharding (the x first shards specified in the constructor)
- The second value is the resharding level, or number of resharding iterations.
For instance, suppose you started with 4 shards as follows:
<pre>
0 => 0 1 2 3
</pre>
After 1 iteration of resharding, keys will be assigned to the following servers:
<pre>
1 => 0 4 1 5 2 6 3 7
</pre>
After 2 iterations, keys will be assigned to the following servers:
<pre>
2 => 0 8 4 12 1 9 5 13 2 10 6 14 3 11 7 15
</pre>
After 3 iterations, keys will be assigned to the following servers:
<pre>
3 => 0 16 8 24 4 20 12 28 1 17 9 25 5 21 13 29 2 18 10 26 6 22 14 30 3 19 11 27 7 23 15 31
</pre>
And so on...
The idea here is to be able to reshard the keys easily, without moving keys from 1 server to another.
The procedure to adopt is simple:
For each initial shard, setup a slave. For instance, for shard 1 we setup slave 5.
Keys will now be assigned to either shard 1 or shard 5. Once the application sees the new settings, just setup shard 5 as a master. Then, in order to reclaim memory, just cleanup keys from shard 1 that belong to shard 5 and vice-versa.
On the next iteration, setup a new slave 9 for shard 1 and a new slave 13 for shard 5.
Update the application settings, disconnect the new slaves and clean up the shards from keys that don't belong there anymore.
Apply the same procedure for each resharding iteration.
### Example
<pre>
$ra = new RedisArray(array("host1", "host2", "host3", "host4", "host5", "host6", "host7", "host8"), array("distributor" => array(2, 2)));

View File

@@ -426,105 +426,37 @@ ra_call_distributor(RedisArray *ra, const char *key, int key_len, int *pos TSRML
return 1;
}
zend_bool
ra_check_distributor(RedisArray *ra, int *num_original, int *num_reshards TSRMLS_DC) {
if(Z_TYPE_P(ra->z_dist) == IS_ARRAY) {
zval **z_original_pp;
zval **z_reshards_pp;
HashTable *shards = Z_ARRVAL_P(ra->z_dist);
if (zend_hash_num_elements(shards) != 2) {
return 0;
}
if (zend_hash_index_find(shards, 0, (void **)&z_original_pp) != SUCCESS ||
zend_hash_index_find(shards, 1, (void **)&z_reshards_pp) != SUCCESS) {
return 0;
}
if (Z_TYPE_PP(z_original_pp) == IS_LONG) {
if ((*num_original = Z_LVAL_PP(z_original_pp)) == 0) {
return 0;
}
}
else if (Z_TYPE_PP(z_original_pp) == IS_STRING) {
if ((*num_original = atol(Z_STRVAL_PP(z_original_pp))) == 0) {
return 0;
}
}
else {
return 0;
}
if (Z_TYPE_PP(z_reshards_pp) == IS_LONG) {
if ((*num_reshards = Z_LVAL_PP(z_reshards_pp)) == 0) {
return 0;
}
}
else if (Z_TYPE_PP(z_reshards_pp) == IS_STRING) {
if ((*num_reshards = atol(Z_STRVAL_PP(z_reshards_pp))) == 0) {
return 0;
}
}
else {
return 0;
}
return 1;
}
return 0;
}
zval *
ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_DC) {
uint32_t hash;
char *out;
int pos = 0, out_len;
uint32_t hash;
char *out;
int pos, out_len;
/* extract relevant part of the key */
out = ra_extract_key(ra, key, key_len, &out_len TSRMLS_CC);
if(!out)
return NULL;
/* extract relevant part of the key */
out = ra_extract_key(ra, key, key_len, &out_len TSRMLS_CC);
if(!out)
return NULL;
if(ra->z_dist) {
char *error = NULL;
int num_original, num_reshards;
if (ra_check_distributor(ra, &num_original, &num_reshards TSRMLS_CC)) {
if (num_reshards < 1 || ra->count != (num_original * (1 << num_reshards))) {
return NULL;
}
/* Calculate original hash */
hash = rcrc32(out, out_len);
efree(out);
uint64_t h64 = hash;
h64 *= num_original;
h64 /= 0xffffffff;
pos = (int)h64;
int i;
/* Infer the new position */
for(i = 0; i < num_reshards; i++) {
int total = num_original * 2;
h64 = hash;
h64 *= total;
h64 /= 0xffffffff;
h64 %= 2;
pos = pos + h64 * num_original;
num_original = total;
}
}
else if (!ra_call_distributor(ra, key, key_len, &pos TSRMLS_CC)) {
return NULL;
}
}
else {
/* hash */
hash = rcrc32(out, out_len);
efree(out);
/* get position on ring */
uint64_t h64 = hash;
h64 *= ra->count;
h64 /= 0xffffffff;
pos = (int)h64;
}
if(out_pos) *out_pos = pos;
return ra->redis[pos];
if(ra->z_dist) {
if (!ra_call_distributor(ra, key, key_len, &pos TSRMLS_CC)) {
return NULL;
}
}
else {
/* hash */
hash = rcrc32(out, out_len);
efree(out);
/* get position on ring */
uint64_t h64 = hash;
h64 *= ra->count;
h64 /= 0xffffffff;
pos = (int)h64;
}
if(out_pos) *out_pos = pos;
return ra->redis[pos];
}
zval *