Skip to content

Instantly share code, notes, and snippets.

@bj0
Last active June 6, 2018 15:07
Show Gist options
  • Save bj0/0313ee67766de6a04881 to your computer and use it in GitHub Desktop.
Save bj0/0313ee67766de6a04881 to your computer and use it in GitHub Desktop.
A Python 3.5 decorator for creating asynchronous iterators from coroutines (since we don't have async generators yet).
import inspect
import types
@types.coroutine
def _yield_(value):
return (yield value)
async def yield_(value):
return await _yield_(value)
async def yield_from(aiter):
if hasattr(aiter, '__aiter__'):
async for item in aiter:
await yield_(item)
else:
for item in aiter:
await _yield_(item)
class AsyncIterable:
def __init__(self, coro):
self._coro = coro
async def __aiter__(self):
return AsyncIterator(self._coro)
@types.coroutine
def __iter__(self):
"""delegate iteration to coroutine"""
yield from self._coro
class AsyncIterator:
def __init__(self, coro):
self._coro = coro
# if we get passed an iterator that doesn't have a .send() (like from iter()), we must use next()
self._has_send = hasattr(coro, 'send')
def __await__(self):
return self
def __anext__(self):
return self
def __next__(self):
return self.send(None)
def send(self, value):
try:
result = self._coro.send(value) if self._has_send else next(self._coro)
except StopIteration:
raise StopAsyncIteration
if not inspect.isawaitable(result):
# we have reached the bottom of the async chain
raise StopIteration(result)
else:
return result
def async_iterable(f):
return lambda *args, **kwargs: AsyncIterable(f(*args, **kwargs))
from stars import async_iterable, yield_, yield_from
import asyncio as aio
import pytest
@async_iterable
def gen_range(count):
"""simple generator"""
for i in range(count):
yield i
@async_iterable
async def async_range(count):
"""async generator"""
for i in range(count):
await yield_(i)
@async_iterable
def gen_range_delegate(count):
"""simple generator delegation"""
yield from range(count)
@async_iterable
async def async_range_delegate(count):
"""async generator delegation"""
await yield_from(range(count))
# from github
@async_iterable
async def double(ait):
async for value in ait:
await yield_(value * 2)
await aio.sleep(0.001)
@async_iterable
async def async_range_twice(count):
await yield_from(async_range(count))
await yield_(None)
await yield_from(async_range(count))
@async_iterable
def range_twice(count):
yield from async_range(count)
yield None
yield from async_range(count)
class HasAsyncGenMethod:
def __init__(self, factor):
self._factor = factor
@async_iterable
async def async_multiplied(self, ait):
async for value in ait:
await yield_(value * self._factor)
async def collect(ait):
items = []
async for value in ait:
items.append(value)
return items
@pytest.mark.asyncio
async def test_async_generator():
assert await collect(async_range(10)) == list(range(10))
assert (await collect(double(async_range(5)))
== [0, 2, 4, 6, 8])
tripler = HasAsyncGenMethod(3)
assert (await collect(tripler.async_multiplied(async_range(5)))
== [0, 3, 6, 9, 12])
@pytest.mark.asyncio
async def test_ranges():
assert await collect(async_range(10)) == list(range(10))
assert await collect(async_range_delegate(10)) == list(range(10))
assert await collect(gen_range(10)) == list(range(10))
assert await collect(gen_range_delegate(10)) == list(range(10))
@pytest.mark.asyncio
async def test_range_delegation():
assert await collect(async_range_twice(3)) == [
0, 1, 2, None, 0, 1, 2
]
assert await collect(range_twice(3)) == [
0, 1, 2, None, 0, 1, 2
]
from stars import async_iterable, yield_, yield_from
import asyncio as aio
import pytest
@async_iterable
def q_gen_delegate(q):
"""delegate from our queue-generator"""
yield from q_gen(q)
@async_iterable
def q_gen_delegate2(q):
"""delegate from our async queue-generator"""
yield from async_q_gen(q)
@async_iterable
async def async_q_gen_delegate(q):
await yield_from(q_gen(q))
@async_iterable
async def async_q_gen_delegate2(q):
await yield_from(async_q_gen(q))
@async_iterable
def q_gen(q):
"""make a generator that yields gets of the queue"""
try:
while True:
yield (yield from aio.wait_for(q.get(), 0.02))
except aio.TimeoutError:
return
@async_iterable
async def async_q_gen(q):
"""make an async generator that yields gets of the queue"""
try:
while True:
await yield_(await aio.wait_for(q.get(), 0.02))
except aio.TimeoutError:
return
async def filler(q):
"""put a number in a queue every 0.01s five times"""
i = 0
while i < 5:
await aio.sleep(0.01)
await q.put(i)
i += 1
async def collect(ait):
items = []
async for value in ait:
# print('val:{}'.format(value))
items.append(value)
return items
@pytest.mark.asyncio
async def test_q_gen():
q = aio.Queue()
aio.ensure_future(filler(q))
assert await collect(q_gen(q)) == list(range(5))
@pytest.mark.asyncio
async def test_async_q_gen():
q = aio.Queue()
aio.ensure_future(filler(q))
assert await collect(async_q_gen(q)) == list(range(5))
@pytest.mark.asyncio
async def test_q_gen_delegate():
q = aio.Queue()
aio.ensure_future(filler(q))
assert await collect(q_gen_delegate(q)) == list(range(5))
@pytest.mark.asyncio
async def test_q_gen_delegate2():
q = aio.Queue()
aio.ensure_future(filler(q))
assert await collect(q_gen_delegate2(q)) == list(range(5))
@pytest.mark.asyncio
async def test_async_q_gen_delegate():
q = aio.Queue()
aio.ensure_future(filler(q))
assert await collect(q_gen_delegate(q)) == list(range(5))
@pytest.mark.asyncio
async def test_async_q_gen_delegate2():
q = aio.Queue()
aio.ensure_future(filler(q))
assert await collect(q_gen_delegate2(q)) == list(range(5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment