Created
November 4, 2021 08:58
-
-
Save samoshkin/d82bebb3b33185fcc6de0c92a8d8ea93 to your computer and use it in GitHub Desktop.
Process items from Iterable in parallel with given concurrency factor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function eachParallel(iterable, action, concurrencyFactor) { | |
const evt = new EventEmitter(); | |
let runningCount = 0; | |
let cancelled = false; | |
const errors = new Map(); | |
let ended = false; | |
const emit = evt.emit.bind(evt); | |
const on = evt.on.bind(evt); | |
const until = eventName => new Promise(res => evt.once(eventName, res)); | |
async function run(item) { | |
try { | |
runningCount += 1; | |
await action(item); | |
} catch (err) { | |
errors.set(item, err); | |
cancelled = true; | |
} finally { | |
runningCount -= 1; | |
emit('itemProcessed'); | |
} | |
} | |
async function startConsuming() { | |
for await (const item of iterable) { | |
while (runningCount >= concurrencyFactor) { | |
await until('itemProcessed'); | |
} | |
if (cancelled) break; | |
run(item); | |
} | |
ended = true; | |
emit('end'); | |
} | |
startConsuming(); | |
return new Promise((res, rej) => { | |
function maybeFinalize() { | |
if (ended && runningCount === 0) { | |
if (errors.size > 0) rej(errors); | |
else res(); | |
} | |
} | |
on('itemProcessed', maybeFinalize); | |
on('end', maybeFinalize); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment