Last active
June 6, 2018 15:07
-
-
Save bj0/0313ee67766de6a04881 to your computer and use it in GitHub Desktop.
A Python 3.5 decorator for creating asynchronous iterators from coroutines (since we don't have async generators yet).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
import types | |
@types.coroutine | |
def _yield_(value): | |
return (yield value) | |
async def yield_(value): | |
return await _yield_(value) | |
async def yield_from(aiter): | |
if hasattr(aiter, '__aiter__'): | |
async for item in aiter: | |
await yield_(item) | |
else: | |
for item in aiter: | |
await _yield_(item) | |
class AsyncIterable: | |
def __init__(self, coro): | |
self._coro = coro | |
async def __aiter__(self): | |
return AsyncIterator(self._coro) | |
@types.coroutine | |
def __iter__(self): | |
"""delegate iteration to coroutine""" | |
yield from self._coro | |
class AsyncIterator: | |
def __init__(self, coro): | |
self._coro = coro | |
# if we get passed an iterator that doesn't have a .send() (like from iter()), we must use next() | |
self._has_send = hasattr(coro, 'send') | |
def __await__(self): | |
return self | |
def __anext__(self): | |
return self | |
def __next__(self): | |
return self.send(None) | |
def send(self, value): | |
try: | |
result = self._coro.send(value) if self._has_send else next(self._coro) | |
except StopIteration: | |
raise StopAsyncIteration | |
if not inspect.isawaitable(result): | |
# we have reached the bottom of the async chain | |
raise StopIteration(result) | |
else: | |
return result | |
def async_iterable(f): | |
return lambda *args, **kwargs: AsyncIterable(f(*args, **kwargs)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from stars import async_iterable, yield_, yield_from | |
import asyncio as aio | |
import pytest | |
@async_iterable | |
def gen_range(count): | |
"""simple generator""" | |
for i in range(count): | |
yield i | |
@async_iterable | |
async def async_range(count): | |
"""async generator""" | |
for i in range(count): | |
await yield_(i) | |
@async_iterable | |
def gen_range_delegate(count): | |
"""simple generator delegation""" | |
yield from range(count) | |
@async_iterable | |
async def async_range_delegate(count): | |
"""async generator delegation""" | |
await yield_from(range(count)) | |
# from github | |
@async_iterable | |
async def double(ait): | |
async for value in ait: | |
await yield_(value * 2) | |
await aio.sleep(0.001) | |
@async_iterable | |
async def async_range_twice(count): | |
await yield_from(async_range(count)) | |
await yield_(None) | |
await yield_from(async_range(count)) | |
@async_iterable | |
def range_twice(count): | |
yield from async_range(count) | |
yield None | |
yield from async_range(count) | |
class HasAsyncGenMethod: | |
def __init__(self, factor): | |
self._factor = factor | |
@async_iterable | |
async def async_multiplied(self, ait): | |
async for value in ait: | |
await yield_(value * self._factor) | |
async def collect(ait): | |
items = [] | |
async for value in ait: | |
items.append(value) | |
return items | |
@pytest.mark.asyncio | |
async def test_async_generator(): | |
assert await collect(async_range(10)) == list(range(10)) | |
assert (await collect(double(async_range(5))) | |
== [0, 2, 4, 6, 8]) | |
tripler = HasAsyncGenMethod(3) | |
assert (await collect(tripler.async_multiplied(async_range(5))) | |
== [0, 3, 6, 9, 12]) | |
@pytest.mark.asyncio | |
async def test_ranges(): | |
assert await collect(async_range(10)) == list(range(10)) | |
assert await collect(async_range_delegate(10)) == list(range(10)) | |
assert await collect(gen_range(10)) == list(range(10)) | |
assert await collect(gen_range_delegate(10)) == list(range(10)) | |
@pytest.mark.asyncio | |
async def test_range_delegation(): | |
assert await collect(async_range_twice(3)) == [ | |
0, 1, 2, None, 0, 1, 2 | |
] | |
assert await collect(range_twice(3)) == [ | |
0, 1, 2, None, 0, 1, 2 | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from stars import async_iterable, yield_, yield_from | |
import asyncio as aio | |
import pytest | |
@async_iterable | |
def q_gen_delegate(q): | |
"""delegate from our queue-generator""" | |
yield from q_gen(q) | |
@async_iterable | |
def q_gen_delegate2(q): | |
"""delegate from our async queue-generator""" | |
yield from async_q_gen(q) | |
@async_iterable | |
async def async_q_gen_delegate(q): | |
await yield_from(q_gen(q)) | |
@async_iterable | |
async def async_q_gen_delegate2(q): | |
await yield_from(async_q_gen(q)) | |
@async_iterable | |
def q_gen(q): | |
"""make a generator that yields gets of the queue""" | |
try: | |
while True: | |
yield (yield from aio.wait_for(q.get(), 0.02)) | |
except aio.TimeoutError: | |
return | |
@async_iterable | |
async def async_q_gen(q): | |
"""make an async generator that yields gets of the queue""" | |
try: | |
while True: | |
await yield_(await aio.wait_for(q.get(), 0.02)) | |
except aio.TimeoutError: | |
return | |
async def filler(q): | |
"""put a number in a queue every 0.01s five times""" | |
i = 0 | |
while i < 5: | |
await aio.sleep(0.01) | |
await q.put(i) | |
i += 1 | |
async def collect(ait): | |
items = [] | |
async for value in ait: | |
# print('val:{}'.format(value)) | |
items.append(value) | |
return items | |
@pytest.mark.asyncio | |
async def test_q_gen(): | |
q = aio.Queue() | |
aio.ensure_future(filler(q)) | |
assert await collect(q_gen(q)) == list(range(5)) | |
@pytest.mark.asyncio | |
async def test_async_q_gen(): | |
q = aio.Queue() | |
aio.ensure_future(filler(q)) | |
assert await collect(async_q_gen(q)) == list(range(5)) | |
@pytest.mark.asyncio | |
async def test_q_gen_delegate(): | |
q = aio.Queue() | |
aio.ensure_future(filler(q)) | |
assert await collect(q_gen_delegate(q)) == list(range(5)) | |
@pytest.mark.asyncio | |
async def test_q_gen_delegate2(): | |
q = aio.Queue() | |
aio.ensure_future(filler(q)) | |
assert await collect(q_gen_delegate2(q)) == list(range(5)) | |
@pytest.mark.asyncio | |
async def test_async_q_gen_delegate(): | |
q = aio.Queue() | |
aio.ensure_future(filler(q)) | |
assert await collect(q_gen_delegate(q)) == list(range(5)) | |
@pytest.mark.asyncio | |
async def test_async_q_gen_delegate2(): | |
q = aio.Queue() | |
aio.ensure_future(filler(q)) | |
assert await collect(q_gen_delegate2(q)) == list(range(5)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment