Created
July 19, 2018 01:37
-
-
Save amzyang/444ab6f0173289b18b15f1b5b0a278ac to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding:utf8 | |
import redis | |
import logging | |
import click | |
from itertools import izip_longest | |
# hosts = ('172.16.14.2', '172.16.14.4', '172.16.14.7', '172.16.14.16') | |
""" | |
Setup: pip install click redis | |
Usage: python migrate.py --host=your_host_here --pattern=your_key_pattern | |
""" | |
dest = { | |
'host': 'your host here', | |
'port': 6379, # change it | |
'db': 1, | |
'password': 'your password here', | |
} | |
src = { | |
'port': 6380, # change it | |
'db': 1, | |
'password': 'your password here', | |
} | |
def dump_and_restore(dest_conn, package): | |
""" | |
:type dest_conn: redis.Redis | |
""" | |
pipe = dest_conn.pipeline() | |
for key, value, ttl in package: | |
pipe.restore(key, ttl=ttl, value=value, replace=True) | |
logging.info("key: %s, value: %s, ttl: %d", key, value, ttl) | |
result = pipe.execute() | |
logging.info(result) | |
def grouper(iterable, n, fillvalue=None): | |
"Collect data into fixed-length chunks or blocks" | |
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx | |
args = [iter(iterable)] * n | |
return izip_longest(fillvalue=fillvalue, *args) | |
@click.command() | |
@click.option('--host', help="redis host") | |
@click.option('--pattern', help="key pattern") | |
def migrate(host, pattern): | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s s[line:%(lineno)d] %(levelname)s %(message)s', | |
datefmt='%a, %d %b %Y %H:%M:%S', | |
filename='/tmp/migrate-%s.log' % host, | |
filemode='w') | |
src_conn = redis.Redis(host=host, **src) | |
_iter = src_conn.scan_iter(match=pattern, count=100) | |
dest_conn = redis.Redis(**dest) | |
# 不知道数据规模,假设有 100w | |
with click.progressbar(_iter, label="Restore", length=10000000, show_percent=True, show_pos=True) as keys: | |
chunks = grouper(keys, 100) | |
for chunk in chunks: | |
pipe = src_conn.pipeline(transaction=False) | |
for key in chunk: | |
if key is None: | |
continue | |
pipe.dump(key) | |
result = pipe.execute() | |
pipe = src_conn.pipeline(transaction=False) | |
for key in chunk: | |
if key is None: | |
continue | |
pipe.pttl(key) | |
ttl_result = map(lambda v: v or 0, pipe.execute()) | |
dump_and_restore(dest_conn, zip(chunk, result, ttl_result)) | |
if __name__ == '__main__': | |
migrate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment