Created
October 9, 2013 17:24
-
-
Save sunilmallya/6904943 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
A Non blocking task scheduler and executor implemented using co routines and Tornado | |
Torando components acts as a non blocking listener and the 2 co routines scheduler and task exectutor | |
''' | |
import tornado.ioloop | |
import tornado.web | |
import tornado.httpserver | |
import tornado.gen | |
import time | |
from socket import * | |
from heapq import * | |
import random | |
import itertools | |
############################################################ | |
## Priority Q Impl | |
############################################################ | |
pq = [] # list of entries arranged in a heap | |
entry_finder = {} # mapping of tasks to entries | |
REMOVED = '<removed-task>' # placeholder for a removed task | |
counter = itertools.count() # unique sequence count | |
def add_task(task, priority=0): | |
'Add a new task, update if already present' | |
if task in entry_finder: | |
remove_task(task) | |
count = next(counter) | |
entry = [priority, count, task] | |
entry_finder[task] = entry | |
heappush(pq, entry) | |
def remove_task(task): | |
'Mark an existing task as REMOVED. Raise KeyError if not found.' | |
entry = entry_finder.pop(task) | |
entry[-1] = REMOVED | |
def pop_task(): | |
'Remove and return the lowest priority task. Raise KeyError if empty.' | |
while pq: | |
priority, count, task = heappop(pq) | |
if task is not REMOVED: | |
del entry_finder[task] | |
return task | |
raise KeyError('pop from an empty priority queue') | |
def peek_task(): | |
if len(pq) >0: | |
return pq[0] | |
else: | |
return (None,None,None) | |
################################################################ | |
# CO ROUTINES | |
################################################################ | |
# The Process logic, to schedule a task | |
def task_scheduler(): | |
try: | |
while True: | |
task = (yield) | |
#Compute priority of the task | |
priority = time.time() + 2 | |
add_task(task,priority) | |
except GeneratorExit: | |
print("Exit Task Scheduler") | |
# Check scheduler | |
def check_scheduler(): | |
print "[check scheduler]" | |
global texecutor | |
priority, count, task = peek_task() | |
cur_time = time.time() | |
if priority and priority <= cur_time: | |
task = pop_task() | |
texecutor.send(task) | |
# Task Executor | |
def task_executor(): | |
try: | |
while True: | |
task = (yield) | |
print "[Task exec] --> ", task, time.time() | |
except GeneratorExit: | |
print("Exit Task Executor") | |
tscheduler = task_scheduler() | |
tscheduler.next() | |
texecutor = task_executor() | |
texecutor.next() | |
########################################### | |
# Create Tornado server application | |
########################################### | |
from tornado.options import define, options | |
define("port", default=8888, help="run on the given port", type=int) | |
class GetData(tornado.web.RequestHandler): | |
@tornado.web.asynchronous | |
def post(self,*args,**kwargs): | |
recv_data = self.request.body | |
tscheduler.send(recv_data) | |
self.finish() | |
application = tornado.web.Application([ | |
(r"/",GetData), | |
]) | |
def main(): | |
tornado.options.parse_command_line() | |
server = tornado.httpserver.HTTPServer(application) | |
server.listen(options.port) | |
tornado.ioloop.PeriodicCallback(check_scheduler,1000).start() | |
tornado.ioloop.IOLoop.instance().start() | |
# ============= MAIN ======================== # | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment