Skip to content

Instantly share code, notes, and snippets.

@samoshkin
Created November 4, 2021 08:58
Show Gist options
  • Save samoshkin/d82bebb3b33185fcc6de0c92a8d8ea93 to your computer and use it in GitHub Desktop.
Save samoshkin/d82bebb3b33185fcc6de0c92a8d8ea93 to your computer and use it in GitHub Desktop.
Process items from Iterable in parallel with given concurrency factor
async function eachParallel(iterable, action, concurrencyFactor) {
const evt = new EventEmitter();
let runningCount = 0;
let cancelled = false;
const errors = new Map();
let ended = false;
const emit = evt.emit.bind(evt);
const on = evt.on.bind(evt);
const until = eventName => new Promise(res => evt.once(eventName, res));
async function run(item) {
try {
runningCount += 1;
await action(item);
} catch (err) {
errors.set(item, err);
cancelled = true;
} finally {
runningCount -= 1;
emit('itemProcessed');
}
}
async function startConsuming() {
for await (const item of iterable) {
while (runningCount >= concurrencyFactor) {
await until('itemProcessed');
}
if (cancelled) break;
run(item);
}
ended = true;
emit('end');
}
startConsuming();
return new Promise((res, rej) => {
function maybeFinalize() {
if (ended && runningCount === 0) {
if (errors.size > 0) rej(errors);
else res();
}
}
on('itemProcessed', maybeFinalize);
on('end', maybeFinalize);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment