Created
October 12, 2014 21:20
-
-
Save josiahcarlson/5cefbb7d54ea3b1818a3 to your computer and use it in GitHub Desktop.
Reliably multiplex receiving results via Redis lists
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
result_multiplex.py | |
Copyright 2014 Josiah Carlson | |
Released as LGPL 2 or 3, your choice. | |
Problem: | |
You have a task processor that writes the results of tasks to a specified key | |
in Redis by performing an RPUSH operation. | |
You don't want to run dozens of BLPOP threads to gather the results to pass on | |
to waiting clients. | |
Solution: | |
Use one thread to BLPOP on one 'router' key, along with all of the known | |
destinations for result keys. | |
When a caller wants to be notified via callback of the receipt of results at a | |
specific LIST key, they only need to call rmux.register_callback(key, callback). | |
The listening thread receives notifications of keys to watch on the 'router' | |
key, then adds that key to the keys to BLPOP from. | |
Upon receiving a real result from one of the keys, the listening thread will | |
directly call the callback with the data as its only argument. | |
''' | |
import random | |
import threading | |
import time | |
import traceback | |
import uuid | |
import redis | |
NOP = lambda data: None | |
class ResultMultiplexer(object): | |
def __init__(self, conn, runid=None, timeout=60): | |
''' | |
For identification purposes in Redis, runid could be a hostname:pid. | |
''' | |
self.conn = conn | |
self.router_key = str(runid or uuid.uuid4()) | |
self.timeout = timeout | |
self.callbacks = {} | |
self.quit = True | |
def _process_results(self): | |
''' | |
Listens on ROUTER_KEY plus others for messages of one of four types: | |
1. no message, the request timed out after 1 second | |
2. an empty message, possibly sent by a request to quit processing | |
3. a key where a result value will be sent | |
4. the result of a computation, sent to a key | |
In cases 1 or 2, just listen again. | |
In case 3, add that key to the list of keys to listen on, then listen again. | |
In case 4, pop the callback from CALLBACKS, based on the key returned by | |
BLPOP, and call the function. Note that the function returned may be our | |
"NOP" function, which does nothing... which may indicate one of the | |
following conditions: | |
1. the callback was cancel()ed | |
2. if using the hostname:pid variant for RUNID, this process may try to | |
pick up where a previous process left off, picking up items from | |
ROUTER_KEY, as well as results, but not having callbacks to call | |
''' | |
keys = set([self.router_key]) | |
nop = NOP | |
while not self.quit: | |
info = self.conn.blpop(keys, 1) | |
if not info: | |
# message type 1 | |
continue | |
key, result = info | |
if key == self.router_key: | |
if result: | |
# message type 3 | |
keys.add(result) | |
# the missing else clause is message type 2 | |
continue | |
# message type 4 | |
keys.discard(key) | |
try: | |
self.callbacks.pop(key, nop)(result) | |
except: | |
# replace me with better error handling | |
traceback.print_tb() | |
def start_process_results_thread(self, daemon=True): | |
''' | |
Starts and returns the thread that listens for and processes result | |
callbacks. | |
''' | |
self.quit = False | |
t = threading.Thread(target=self._process_results) | |
t.daemon = True | |
t.start() | |
return t | |
def stop_process_results_thread(self, now=False): | |
''' | |
Shuts down the result processor. If now is False, will | |
''' | |
pipe = self.conn.pipeline(True) | |
if now: | |
pipe.lpush(self.router_key, '') | |
else: | |
pope.rpush(self.router_key, '') | |
pipe.expire(self.router_key, self.timeout) | |
@staticmethod | |
def send_result(conn, key, result, timeout=60): | |
''' | |
When the task has finsihed computing its result, it sends the result to the | |
key provided, and sets the expiration time of the key to KEY_TIMEOUT. | |
''' | |
pipe = conn.pipeline(True) | |
pipe.lpush(key, result) | |
pipe.expire(key, timeout) | |
pipe.execute() | |
def register_callback(self, key, callback): | |
''' | |
Registers the passed callback to the key, then sends the key to listen for | |
to ROUTER_KEY. | |
''' | |
self.callbacks[key] = callback | |
self.conn.rpush(self.router_key, key) | |
return key | |
def cancel(self, key): | |
''' | |
Attempts to pop the key from the callback registry. If None is returned, | |
then the result was already processed. If a function is returned, then you | |
have successfully prevented the callback from running. | |
''' | |
return self.callbacks.pop(key, None) | |
def test(): | |
conn = redis.Redis(db=15) | |
conn.flushdb() | |
mplex = ResultMultiplexer(conn) | |
mplex.start_process_results_thread() | |
def printer(x): | |
def printer(y): | |
print x, "==", y | |
return printer | |
for i in xrange(10): | |
mplex.register_callback(str(i), printer(i)) | |
x = range(10) | |
random.shuffle(x) | |
mplex.cancel(str(x[-1])) | |
print "canceled:", x[-1] | |
for v in x: | |
mplex.send_result(conn, str(v), str(v)) | |
time.sleep(1) | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment