Skip to content

Instantly share code, notes, and snippets.

@robcowie
Created August 18, 2024 18:18
Show Gist options
  • Save robcowie/07ee2759e5b8dd052ea0a820823dfb7d to your computer and use it in GitHub Desktop.
Save robcowie/07ee2759e5b8dd052ea0a820823dfb7d to your computer and use it in GitHub Desktop.
Asyncio loop with example long-running tasks, signal handlers and graceful clean up on termination
"""Asyncio loop example.
* Long running tasks
* Graceful shutdown with task cancellation
* Signal handling (SIGTERM, SIGINT)
* Use uvloop for better performance
"""
import asyncio
import signal
import uvloop
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter("%(levelname)-8s [%(asctime)s] %(name)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
async def task1():
"""Simple task that runs every second."""
while True:
logger.info("Task 1: Running")
await asyncio.sleep(0.5)
async def task2():
"""Task that runs every 4 seconds, with cancellation logic."""
try:
while True:
logger.info("Task 2: Running every 4 seconds")
await asyncio.sleep(1)
except asyncio.CancelledError:
logger.info("Handling task 2 cancellation")
async def task3():
"""Task running a blocking function in a thread pool."""
def blocking_function():
import time
time.sleep(3)
return "Hello from blocking function"
logger.info("Task 3: Running blocking function in thread pool")
while True:
loop = asyncio.get_running_loop()
future = loop.run_in_executor(None, blocking_function)
# result = await asyncio.wait_for(future, timeout=5)
result = await future
logger.info(f"Blocking function returned: {result}")
async def task4():
"""Task running a subprocess, with timeout."""
while True:
logger.info("Task 4: Running subprocess")
process = await asyncio.wait_for(
asyncio.create_subprocess_exec("ls"), timeout=5
)
logger.info(f"Process returned: {process.stdout}")
await asyncio.sleep(2)
tasks = {}
# Function to cancel all tasks
async def cancel_tasks():
for name, task in tasks.items():
logger.info(f"Cancelling {name}")
task.cancel()
await asyncio.gather(*tasks.values(), return_exceptions=True)
async def cancel_all_tasks():
"""Cancel all active tasks in the current loop."""
tasks = [task for task in asyncio.all_tasks() if not task.done()]
tasks = [task for task in tasks if task is not asyncio.current_task()]
for task in tasks:
task.cancel()
# Run until all cancelled tasks are complete
await asyncio.gather(*tasks, return_exceptions=True)
def shutdown(sig, loop):
logger.info(f"Shutting down on signal {sig}")
loop.create_task(cancel_all_tasks()).add_done_callback(lambda t: loop.stop())
def main():
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
# Add signal handlers
loop.add_signal_handler(signal.SIGTERM, shutdown, "SIGTERM", loop)
loop.add_signal_handler(signal.SIGINT, shutdown, "SIGINT", loop)
# Create tasks and add them to the loop
tasks["task1"] = loop.create_task(task1())
tasks["task2"] = loop.create_task(task2())
tasks["task3"] = loop.create_task(task3())
tasks["task4"] = loop.create_task(task4())
logger.info("Starting event loop...")
try:
loop.run_forever()
finally:
loop.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment