Created
November 18, 2024 18:57
-
-
Save cowboyd/d2754af19e840ca5f38d6e175da1e87a to your computer and use it in GitHub Desktop.
A buffered Effection channel that exerts backpressure on the sender so that a maximum number of subscribers can be working on it at a time.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Operation, Channel, resource, Subscription, createChannel } from "./mod.ts"; | |
export function createBufferedChannel<T, TDone>(max: number): Operation<Channel<T,TDone>> { | |
return resource(function* (provide) { | |
// the unbuffered channel. | |
let input = createChannel<T, TDone>(); | |
// this channel is used to send whenever a working slot becomes available | |
let available = createChannel<void, never>(); | |
// a set of markers indicating which subscribers are currently working. | |
// where working is defined as haveing gotten the next item from `next()`, but | |
// not yet asked for the one beyond that. | |
let working = new Set<object>(); | |
let subscribe: Operation<Subscription<T,TDone>> = resource(function*(provide) { | |
let unbuffered = yield* input; | |
let marker = {}; | |
function addWorker() { | |
working.add(marker); | |
} | |
function* removeWorker() { | |
working.delete(marker); | |
yield* available.send() | |
} | |
try { | |
yield* provide({ | |
// this subscription registers itself with each turn of `next()` as "working" | |
// and notifies the main channel to check if there is availability to send another | |
// value. | |
*next() { | |
yield* removeWorker(); | |
let next = yield* unbuffered.next(); | |
if (next.done) { | |
return next; | |
} else { | |
addWorker(); | |
return next; | |
} | |
} | |
}) | |
} finally { | |
yield* removeWorker(); | |
} | |
}); | |
let availability = yield* available; | |
yield* provide({ | |
...subscribe, | |
// blocks until there are less than `max` subscribers currently working | |
*send(value) { | |
while (working.size >= max) { | |
yield* availability.next(); | |
} | |
yield* input.send(value); | |
}, | |
// blocks until all work has been completed. | |
*close(value) { | |
yield* input.close(value); | |
while (working.size > 0) { | |
yield* availability.next(); | |
} | |
} | |
}) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment