Skip to content

Instantly share code, notes, and snippets.

@robcowie
Last active May 18, 2022 11:42
Show Gist options
  • Save robcowie/a6e8e73a22dc7da009ec3dc67ac08b28 to your computer and use it in GitHub Desktop.
Save robcowie/a6e8e73a22dc7da009ec3dc67ac08b28 to your computer and use it in GitHub Desktop.
Stub event sources for consumption by stream processing code
# loop = asyncio.get_event_loop()
# loop.run_until_complete(event_source_async(your_handler))
import asyncio
import random
import time
def randint():
return random.randint(-100, 100)
def randfloat():
return random.uniform(0, 70.5)
# If using a synchronous approach, consume this generator
def event_source_sync():
"""Emit events of the form {"type": str, "val": int|float}"""
MSG_TYPES = [("a", randint), ("b", randfloat)]
while True:
for MSG_TYPE, genval in MSG_TYPES:
yield {"type": MSG_TYPE, "val": genval()}
time.sleep(0.2)
# If using an asynchronous approach, provide a coroutine handler
# then create and start an event loop to run event_source_async
async def event_source_async(handler):
"""Call handler with each event of the form {"type": str, "val": int|float}"""
MSG_TYPES = [("a", randint), ("b", randfloat)]
while True:
for MSG_TYPE, genval in MSG_TYPES:
await handler({"type": MSG_TYPE, "val": genval()})
await asyncio.sleep(0.2)
async def event_handler(evt):
print(evt)
# YOUR CODE HERE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment