Skip to content

Instantly share code, notes, and snippets.

@jcouyang
Last active November 12, 2024 21:37
Show Gist options
  • Save jcouyang/632709f30e12a7879a73e9e132c0d56b to your computer and use it in GitHub Desktop.
Save jcouyang/632709f30e12a7879a73e9e132c0d56b to your computer and use it in GitHub Desktop.
Promise All with Limit of Concurrent N

The Promise All Problem

in case of processing a very large array e.g. Promise.all(A_VERY_LARGE_ARRAY_OF_XHR_PROMISE)

which would probably blow you browser memory by trying to send all requests at the same time

solution is limit the concurrent of requests, and wrap promise in thunk

Promise.allConcurrent(2)([()=>fetch('BLAH1'), ()=>fetch('BLAH2'),...()=>fetch('BLAHN')])

if set concurrent to 2, it will send request BLAH1 and BLAH2 at the same time

if BLAH1 return response and resolved, will immediatly send request to BLAH3

in this way promise sending at the same time always keep the limit 2 which we’ve just configed before

describe('promiseAllStepN', function(){
describe('3 tasks, and cucurrent is 2', function(){
let tasks;
beforeEach(function(){
tasks = range(3).map(x=>sinon.stub())
})
describe('1 is finish', function(){
it('will kickoff the third task', function(done){
tasks[0].returns(Promise.resolve(0))
let task2 = tasks[2]
tasks[1].returns(new Promise(resolve=>setTimeout(()=>{
expect(task2.called).to.be.equal(true)
resolve(1)
done()
}, 1000)))
tasks[2].returns(Promise.resolve(2))
return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
})
})
})
describe('10 tasks, and cucurrent is 3', function(){
let tasks;
beforeEach(function(){
tasks = range(10).map(x=>sinon.stub())
})
describe('1st is finish but 2nd stuck', function(){
it.only('final task will run before 2nd', function(done){
tasks.forEach((task,index) => task.returns(Promise.resolve(index)))
let task10 = tasks[9]
tasks[1].returns(new Promise(resolve=>setTimeout(()=>{
expect(task10.called).to.be.equal(true)
resolve(1)
done()
}, 1000)))
return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
})
})
})
})
function promiseAllStepN(n, list) {
let tail = list.splice(n)
let head = list
let resolved = []
let processed = 0
return new Promise(resolve=>{
head.forEach(x=>{
let res = x()
resolved.push(res)
res.then(y=>{
runNext()
return y
})
})
function runNext(){
if(processed == tail.length){
resolve(Promise.all(resolved))
}else{
resolved.push(tail[processed]().then(x=>{
runNext()
return x
}))
processed++
}
}
})
}
Promise.allConcurrent = n => list => promiseAllStepN(n, list)
@Chaoste
Copy link

Chaoste commented Mar 6, 2020

I agree with @wertlex. It' a very lightweight module (no dependencies) and ensures more reliability. Instead of:

    ...
    const tasks = params.map((param) => someDeferredFunction(param));
    return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
    ...

do:

const Bottleneck = require('bottleneck');
    ...
    const limiter = new Bottleneck({ maxConcurrent: 2 });
    const tasks = params.map((param) => limiter.schedule(() => someDeferredFunction(param)));
    return Promise.all(tasks).then(x=>console.log(x))
    ...

@mzdunek93
Copy link

mzdunek93 commented Apr 14, 2020

TypeScripted version:

const promiseAllLimit = async <T>(n: number, list: (() => Promise<T>)[]) => {
  const head = list.slice(0, n)
  const tail = list.slice(n)
  const result: T[] = []
  const execute = async (promise: () => Promise<T>, i: number, runNext: () => Promise<void>) => {
    result[i] = await promise()
    await runNext()
  }
  const runNext = async () => {
    const i = list.length - tail.length
    const promise = tail.shift()
    if (promise !== undefined) {
      await execute(promise, i, runNext)
    }
  }
  await Promise.all(head.map((promise, i) => execute(promise, i, runNext)))
  return result
}

@swayam18
Copy link

swayam18 commented Jun 11, 2020

More efficient (no splicing, only index based operations) Typescript version, with order preservation and early exit error handling semantics

export function promiseAllN<T>(collection: Array<() => Promise<T>>, n: number = 100): Promise<T[]> {
    let i =0;
    let jobsLeft = collection.length;
    let outcome = [];
    let rejected = false;
    // create a new promise and capture reference to resolve and reject to avoid nesting of code
    let resolve, reject;
    const pendingPromise: Promise<T[]> = new Promise(function(res, rej ) {
        resolve = res; reject = rej;
    });

    // Guard clause
    if(collection.length === 0) {
        resolve([]);
        return p;
    }
    
    // execute the j'th thunk
    function runJob(j: number) {
        collection[j]().then(result => {
            if(rejected) {
                return; // no op!
            }
            jobsLeft --;
            outcome[j] = result;
            if(jobsLeft <= 0) {
                resolve(outcome);
            } else if(i < collection.length) {
                runJob(i);
                i++;
            } else {
                return; // nothing to do here.
            }
        }).catch(e => {
            if(rejected) {
                return; // no op!
            }
            rejected = true;
            reject(e);
            return;
        });
    }
   
   // bootstrap, while handling cases where the length of the given array is smaller than maxConcurrent jobs
    while(i < Math.min(collection.length, n)) {
        runJob(i); i++;
    }

    return pendingPromise;
}

Usage:

promiseAllN(items.map((i) => () => /*...*/, 50)

Though I prefer a version that avoids creating a bunch of closures, as so:

promiseAllN(items, (i) => /*...*/, 50)

@marlomgirardi
Copy link

marlomgirardi commented Jun 29, 2020

We have a lot of good options and writing one or using a custom one seems to not be the best option.
I'm using this one @supercharge/promise-pool. It is dependency-free, small and has a readable api.

const PromisePool = require('@supercharge/promise-pool')
 
const users = [
  { name: 'Marcus' },
  { name: 'Norman' },
  { name: 'Christian' }
]

await PromisePool
  .for(users)
  .withConcurrency(2)
  .process(async data => {
    // Create user ...
  })

@webshared
Copy link

webshared commented Jan 12, 2021

The pure JS implementation without the callback hell :)

const pAll = async (queue, concurrency) => {
  let index = 0;
  const results = [];

  // Run a pseudo-thread
  const execThread = async () => {
    while (index < queue.length) {
      const curIndex = index++;
      // Use of `curIndex` is important because `index` may change after await is resolved
      results[curIndex] = await queue[curIndex]();
    }
  };

  // Start threads
  const threads = [];
  for (let thread = 0; thread < concurrency; thread++) {
    threads.push(execThread());
  }
  await Promise.all(threads);
  return results;
};

and a simple use case:

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  const res = await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  console.log(res);
};

@mborgeaud
Copy link

mborgeaud commented Sep 3, 2021

@atolkachiov thanks for this brilliant JS implementation.
Let's say I'm storing the HTTP responses of each makeHTTPRequest in a global array. I'd like to retry those that fail (i.e. HTTP response <> 200). How should I chain another pAll call, so it runs right after the first completed? I tried this but didn't work.

var retryURLs= [];
function makeHTTPRequest(url) {
   ...
   if(httpResult != 200) retryURLs.push(url);
}

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  await pAll(
    retryURLs.map(url => () => makeHTTPRequest(url)),
    5
  );
};

@avuenja
Copy link

avuenja commented Nov 19, 2021

The pure JS implementation without the callback hell :)

const pAll = async (queue, concurrency) => {
  let index = 0;
  const results = [];

  // Run a pseudo-thread
  const execThread = async () => {
    while (index < queue.length) {
      const curIndex = index++;
      // Use of `curIndex` is important because `index` may change after await is resolved
      results[curIndex] = await queue[curIndex]();
    }
  };

  // Start threads
  const threads = [];
  for (let thread = 0; thread < concurrency; thread++) {
    threads.push(execThread());
  }
  await Promise.all(threads);
  return results;
};

and a simple use case:

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  const res = await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  console.log(res);
};

This is awesome man! 🎉

@jacksonv1lle
Copy link

This is a great tool, thanks!

Not sure if it is intentional but I have noticed that if all the promises in a thread are rejected, the next thread is not executed. To prevent this I am catching the error and returning it.

results[curIndex] = await queue[curIndex]().catch(err => err);

@webshared
Copy link

@jacksonv1lle it happens because Promise.all rejects instantly upon any of promises reject. You can also wrap the call into try/catch:

try {
  results[curIndex] = await queue[curIndex]();
} catch(err) {
  console.log("ERROR", err);
  results[curIndex] = err;
}

@gvozdenkov
Copy link

The pure JS implementation without the callback hell :)

const pAll = async (queue, concurrency) => {
  let index = 0;
  const results = [];

  // Run a pseudo-thread
  const execThread = async () => {
    while (index < queue.length) {
      const curIndex = index++;
      // Use of `curIndex` is important because `index` may change after await is resolved
      results[curIndex] = await queue[curIndex]();
    }
  };

  // Start threads
  const threads = [];
  for (let thread = 0; thread < concurrency; thread++) {
    threads.push(execThread());
  }
  await Promise.all(threads);
  return results;
};

and a simple use case:

const test = async () => {
  const urls = ["url1", "url2", "url3"];
  const res = await pAll(
    urls.map(url => () => makeHTTPRequest(url)),
    5
  );
  console.log(res);
};

Thank you! This works! But can you explain how this code works? I don't understand the magic of how the splitting into a limited number of requests occurs?

@gionkunz
Copy link

gionkunz commented Nov 12, 2024

I know this is four years ago, but given it's still a top google result, I wanted to share my approach / style too. I made a couple of decisions in my implementation:

  • Typescript
  • Readability is more important than Size
  • Using Promise.race instead of Promise.all internally in order to maximize "worker" usage
  • Using const generic types to support promise tuple types in returned Promise.all
  • Guardless approach (loops just check queues, operations are only performed when needed without additional guards, eg. splice works with deleteCount=0 naturally and empty array is not processed)
  • Mix of data types (Array for remainingTasks, Map for executingTasks and array again for finishedTasks) to have native solutions for performance optimizations

If you prefer some of those decisions for your own code then hopefully this solution is useful to you. I did not yet test the function very extensively, so there might be bugs I did not account for yet.

export type AsyncFunction<T = unknown> = () => Promise<T>;
export type TaskResult<T = unknown> = {
  index: number;
  value: T;
  task: AsyncFunction<T>;
};

export async function promiseAllMaxConcurrency<const Tasks extends AsyncFunction<unknown>[]>(
  tasks: Tasks,
  maxConcurrency = 4
): Promise<{ [K in keyof Tasks]: Awaited<ReturnType<Tasks[K]>> }> {
  const remainingTasks = [...tasks];
  const finishedTasks: Promise<unknown>[] = [];
  const executingTasks = new Map<number, Promise<TaskResult>>();
  do {
    const tasksToAdd = maxConcurrency - executingTasks.size;
    for (const task of remainingTasks.splice(0, tasksToAdd)) {
      const index = tasks.indexOf(task);
      const taskPromise = task().then((value) => ({
        index: tasks.indexOf(task),
        value,
        task,
      } satisfies TaskResult));
      executingTasks.set(index, taskPromise);
    }

    const finishedPromise = Promise.race(executingTasks.values());
    const result = await finishedPromise;
    finishedTasks[result.index] = finishedPromise.then(result => result.value);
    executingTasks.delete(result.index);
  } while (remainingTasks.length > 0 || executingTasks.size > 0);

  return Promise.all(finishedTasks) as Promise<{ [K in keyof Tasks]: Awaited<ReturnType<Tasks[K]>> }>;
}

promiseAllMaxConcurrency([
  () => new Promise<number>((resolve) => resolve(1)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('two'), 1000)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('three'), 1000)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('four'), 1000)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('long running...'), 5000)),
  () => new Promise<boolean>((resolve) => setTimeout(() => resolve(true), 1000)),
]).then((results) => {
  console.log(results);
}).catch(e => console.error(e));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment