Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Created November 18, 2024 18:57
Show Gist options
  • Save cowboyd/d2754af19e840ca5f38d6e175da1e87a to your computer and use it in GitHub Desktop.
Save cowboyd/d2754af19e840ca5f38d6e175da1e87a to your computer and use it in GitHub Desktop.
A buffered Effection channel that exerts backpressure on the sender so that a maximum number of subscribers can be working on it at a time.
import { Operation, Channel, resource, Subscription, createChannel } from "./mod.ts";
export function createBufferedChannel<T, TDone>(max: number): Operation<Channel<T,TDone>> {
return resource(function* (provide) {
// the unbuffered channel.
let input = createChannel<T, TDone>();
// this channel is used to send whenever a working slot becomes available
let available = createChannel<void, never>();
// a set of markers indicating which subscribers are currently working.
// where working is defined as haveing gotten the next item from `next()`, but
// not yet asked for the one beyond that.
let working = new Set<object>();
let subscribe: Operation<Subscription<T,TDone>> = resource(function*(provide) {
let unbuffered = yield* input;
let marker = {};
function addWorker() {
working.add(marker);
}
function* removeWorker() {
working.delete(marker);
yield* available.send()
}
try {
yield* provide({
// this subscription registers itself with each turn of `next()` as "working"
// and notifies the main channel to check if there is availability to send another
// value.
*next() {
yield* removeWorker();
let next = yield* unbuffered.next();
if (next.done) {
return next;
} else {
addWorker();
return next;
}
}
})
} finally {
yield* removeWorker();
}
});
let availability = yield* available;
yield* provide({
...subscribe,
// blocks until there are less than `max` subscribers currently working
*send(value) {
while (working.size >= max) {
yield* availability.next();
}
yield* input.send(value);
},
// blocks until all work has been completed.
*close(value) {
yield* input.close(value);
while (working.size > 0) {
yield* availability.next();
}
}
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment