Last active
May 13, 2023 18:59
-
-
Save njsmith/3494ebd27f32c2ac168c71de116ccc5d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Rough draft of a Queue object that can be used simultaneously simultaneously from | |
# sync threads + *multiple* trio threads, all at once. | |
# | |
# If you don't have multiple threads each doing their own separate calls to trio.run, | |
# then don't use this; there are simpler solutions. This was mostly an exercise to | |
# figure out if and how this could be done. | |
# | |
# Also note: completely untested, probably has bugs | |
from collections import OrderedDict | |
from functools import partial | |
import outcome | |
import threading | |
import trio | |
class CrossThreadUnbufferedFIFOQueue: | |
def __init__(self): | |
self._lock = attr.ib(factory=threading.Lock) | |
# Used as FIFO queues; value is always None | |
self._putters = OrderedDict() | |
self._getters = OrderedDict() | |
def sync_put(self, value): | |
with self._lock: | |
if self._getters: | |
getter, _ = self._getters.popitem(last=False) | |
getter(value) | |
return | |
# Blocking path | |
waker = threading.Lock() | |
waker.acquire() | |
def putter(): | |
waker.release() | |
return value | |
self._putters[putter] = None | |
waker.acquire() | |
def sync_get(self): | |
with self._lock: | |
if self._putters: | |
putter, _ = self._putters.popitem(last=False) | |
return putter() | |
# Blocking path | |
waker = threading.Lock() | |
waker.acquire() | |
value_shared = None | |
def getter(value): | |
nonlocal value_shared | |
value_shared = value | |
waker.release() | |
self._getters[getter] = None | |
waker.acquire() | |
return value_shared | |
async def async_put(self, value): | |
with self._lock: | |
if self._getters: | |
getter, _ = self._getters.popitem(last=False) | |
getter(value) | |
return | |
# Blocking path | |
token = trio.lowlevel.current_token() | |
task = trio.lowlevel.current_task() | |
def putter(): | |
if trio.lowlevel.current_token() is token: | |
trio.lowlevel.reschedule(task) | |
return value | |
else: | |
trio.from_thread.run_sync(partial( | |
trio.lowlevel.reschedule, task, trio_token=token | |
)) | |
return value | |
self._putters[putter] = None | |
def abort_fn(_): | |
with self._lock: | |
if putter in self._putters: | |
del self._putters | |
return trio.lowlevel.Abort.SUCCEEDED | |
else: | |
return trio.lowlevel.Abort.FAILED | |
await trio.wait_task_rescheduled(abort_fn) | |
async def async_get(self): | |
with self._lock: | |
if self._putters: | |
putter, _ = self._putters.popitem(last=False) | |
return putter() | |
# Blocking path | |
token = trio.lowlevel.current_token() | |
task = trio.lowlevel.current_task() | |
def getter(value): | |
if trio.lowlevel.current_token() is token: | |
trio.lowlevel.reschedule(task, outcome.Value(value)) | |
else: | |
trio.from_thread.run_sync(partial( | |
trio.lowlevel.reschedule, | |
task, | |
outcome.Value(value), | |
trio_token=token | |
)) | |
self._getters[getter] = None | |
def abort_fn(_): | |
with self._lock: | |
if getter in self._getters: | |
del self._getters | |
return trio.lowlevel.Abort.SUCCEEDED | |
else: | |
return trio.lowlevel.Abort.FAILED | |
return await trio.wait_task_rescheduled(abort_fn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Oh haha whoops, those first two classes are vestigial leftovers from an earlier draft, theyr'e not actually used at all :-) I'll delete.