Created
August 18, 2024 18:18
-
-
Save robcowie/07ee2759e5b8dd052ea0a820823dfb7d to your computer and use it in GitHub Desktop.
Asyncio loop with example long-running tasks, signal handlers and graceful clean up on termination
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Asyncio loop example. | |
* Long running tasks | |
* Graceful shutdown with task cancellation | |
* Signal handling (SIGTERM, SIGINT) | |
* Use uvloop for better performance | |
""" | |
import asyncio | |
import signal | |
import uvloop | |
import logging | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
ch = logging.StreamHandler() | |
formatter = logging.Formatter("%(levelname)-8s [%(asctime)s] %(name)s: %(message)s") | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
async def task1(): | |
"""Simple task that runs every second.""" | |
while True: | |
logger.info("Task 1: Running") | |
await asyncio.sleep(0.5) | |
async def task2(): | |
"""Task that runs every 4 seconds, with cancellation logic.""" | |
try: | |
while True: | |
logger.info("Task 2: Running every 4 seconds") | |
await asyncio.sleep(1) | |
except asyncio.CancelledError: | |
logger.info("Handling task 2 cancellation") | |
async def task3(): | |
"""Task running a blocking function in a thread pool.""" | |
def blocking_function(): | |
import time | |
time.sleep(3) | |
return "Hello from blocking function" | |
logger.info("Task 3: Running blocking function in thread pool") | |
while True: | |
loop = asyncio.get_running_loop() | |
future = loop.run_in_executor(None, blocking_function) | |
# result = await asyncio.wait_for(future, timeout=5) | |
result = await future | |
logger.info(f"Blocking function returned: {result}") | |
async def task4(): | |
"""Task running a subprocess, with timeout.""" | |
while True: | |
logger.info("Task 4: Running subprocess") | |
process = await asyncio.wait_for( | |
asyncio.create_subprocess_exec("ls"), timeout=5 | |
) | |
logger.info(f"Process returned: {process.stdout}") | |
await asyncio.sleep(2) | |
tasks = {} | |
# Function to cancel all tasks | |
async def cancel_tasks(): | |
for name, task in tasks.items(): | |
logger.info(f"Cancelling {name}") | |
task.cancel() | |
await asyncio.gather(*tasks.values(), return_exceptions=True) | |
async def cancel_all_tasks(): | |
"""Cancel all active tasks in the current loop.""" | |
tasks = [task for task in asyncio.all_tasks() if not task.done()] | |
tasks = [task for task in tasks if task is not asyncio.current_task()] | |
for task in tasks: | |
task.cancel() | |
# Run until all cancelled tasks are complete | |
await asyncio.gather(*tasks, return_exceptions=True) | |
def shutdown(sig, loop): | |
logger.info(f"Shutting down on signal {sig}") | |
loop.create_task(cancel_all_tasks()).add_done_callback(lambda t: loop.stop()) | |
def main(): | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
loop = asyncio.get_event_loop() | |
# Add signal handlers | |
loop.add_signal_handler(signal.SIGTERM, shutdown, "SIGTERM", loop) | |
loop.add_signal_handler(signal.SIGINT, shutdown, "SIGINT", loop) | |
# Create tasks and add them to the loop | |
tasks["task1"] = loop.create_task(task1()) | |
tasks["task2"] = loop.create_task(task2()) | |
tasks["task3"] = loop.create_task(task3()) | |
tasks["task4"] = loop.create_task(task4()) | |
logger.info("Starting event loop...") | |
try: | |
loop.run_forever() | |
finally: | |
loop.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment