-
-
Save metamatik/22ed17a1a195b863217299e5220efef5 to your computer and use it in GitHub Desktop.
Migrate Redis data on Amazon ElastiCache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Forked and heavily adappted from: | |
https://gist.github.com/kitwalker12/517d99c3835975ad4d1718d28a63553e | |
Copies all keys from the source Redis host to the destination Redis host. | |
Useful to migrate Redis instances where commands like SLAVEOF and MIGRATE are | |
restricted (e.g. on Amazon ElastiCache). | |
The script scans through the keyspace of the given database number and uses | |
a pipeline of DUMP and RESTORE commands to migrate the keys. | |
Requires Redis 2.8.0 or higher. | |
Python requirements: | |
click | |
progressbar | |
redis | |
""" | |
import click | |
from progressbar import ProgressBar | |
from progressbar.widgets import Bar, ETA, Percentage | |
import redis | |
from redis.exceptions import ResponseError | |
DEFAULT_REDIS_PORT = 6379 | |
DEFAULT_REDIS_DB = 0 | |
def _copy_keys(keys, src_pipe, dst, replace, non_existing, already_existing): | |
# We execute the source pipeline. | |
src_result = src_pipe.execute() | |
# We create the matching destination pipeline. | |
dst_pipe = dst.pipeline() | |
for key, ttl, data in zip(keys, src_result[::2], src_result[1::2]): | |
if data != None: | |
dst_pipe.restore(key, ttl if ttl > 0 else 0, data, replace=replace) | |
else: | |
non_existing += 1 | |
# We execute the destination pipeline. | |
dst_result = dst_pipe.execute(False) | |
# We analyze the results of the destination pipeline execution. | |
for key, result in zip(keys, dst_result): | |
if result != b'OK': | |
e = result | |
if ( | |
hasattr(e, 'args') | |
and e.args[0] in ( | |
'BUSYKEY Target key name already exists.', | |
'Target key name is busy.' | |
) | |
): | |
already_existing += 1 | |
else: | |
print("Key failed:", key, repr(data), repr(result)) | |
raise e | |
return non_existing, already_existing | |
@click.command() | |
@click.argument('src_host') | |
@click.option('--src_port', default=DEFAULT_REDIS_PORT, help='Source port (default: %d)' % DEFAULT_REDIS_PORT) | |
@click.option('--src_db', default=DEFAULT_REDIS_DB, help='Source db number (default: %d)' % DEFAULT_REDIS_DB) | |
@click.option('--src_ssl', default=False, is_flag=True, help='SSL connection to source? (default: False)') | |
@click.option('--src_pass', default=None, help='Source password (default: None)') | |
@click.argument('dst_host') | |
@click.option('--dst_port', default=DEFAULT_REDIS_PORT, help='Destination port (default: %d)' % DEFAULT_REDIS_PORT) | |
@click.option('--dst_db', default=DEFAULT_REDIS_DB, help='Destination db number (default: %d)' % DEFAULT_REDIS_DB) | |
@click.option('--dst_ssl', default=False, is_flag=True, help='SSL connection to destination? (default: False)') | |
@click.option('--dst_pass', default=None, help='Destination password (default: None)') | |
@click.option('--flush', default=False, is_flag=True, help='Flush destination before migration? (default: False)') | |
@click.option('--pattern', default='*', help='Pattern of key names to migrate (default: *)') | |
@click.option('--replace', default=False, is_flag=True, help='Replace existing keys in destination? (default: False)') | |
def migrate( | |
src_host, src_port, src_pass, src_ssl, src_db, | |
dst_host, dst_port, dst_pass, dst_ssl, dst_db, | |
pattern, flush, replace | |
): | |
print("Connecting to Redis instances...") | |
src = redis.StrictRedis(host=src_host, port=src_port, ssl=src_ssl, db=src_db, password=src_pass) | |
dst = redis.StrictRedis(host=dst_host, port=dst_port, ssl=dst_ssl, db=dst_db, password=dst_pass) | |
# Flush destination database? | |
if flush: | |
dst.flushdb() | |
print("Counting keys to migrate...") | |
num_keys = len(src.keys(pattern)) | |
if num_keys == 0: | |
print("No keys found, exiting.") | |
return | |
progress_widgets = ['%d keys: ' % num_keys, Percentage(), ' ', Bar(), ' ', ETA()] | |
progress_bar = ProgressBar(widgets=progress_widgets, maxval=num_keys).start() | |
BATCH_SIZE = 100 | |
cursor = 0 | |
non_existing = 0 | |
already_existing = 0 | |
keys = [] | |
src_pipe = src.pipeline() | |
# We loop over the keys matching the specified pattern: | |
for key in src.scan_iter(match=pattern): | |
cursor += 1 | |
keys.append(key) | |
src_pipe.pttl(key) | |
src_pipe.dump(key) | |
# We have iterated enough for a migration batch. | |
if cursor % BATCH_SIZE == 0: | |
# We copy that batch of keys... | |
non_existing, already_existing = _copy_keys(keys, src_pipe, dst, replace, non_existing, already_existing) | |
# ... and we reset the source containers to continue iterating. | |
keys = [] | |
src_pipe = src.pipeline() | |
progress_bar.update(min(num_keys, cursor)) | |
# Iteration is now over, we need to restore the last partial batch. | |
non_existing, already_existing = _copy_keys(keys, src_pipe, dst, replace, non_existing, already_existing) | |
progress_bar.finish() | |
print("Keys disappeared on source during scan:", non_existing) | |
print("Keys already existing on destination:", already_existing) | |
if __name__ == '__main__': | |
migrate() |
This works great. I didn't see it delete anything in source in my tests.
I added a remove option here: https://gist.github.com/mohammadjolani/7d71b73afda27980e985e53d02feb9aa
Here is Java implementation https://github.com/BalaclavaLab/blc-redis-dump-restore
It does not use keys() function so Redis won't get blocked.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this! It works! One question though, any way we can not delete the keys in source? @metamatik