Last active November 8, 2019 04:24
RxJS getUniqueValuesFromPromise
// to run this gist, in your current folder you need to "run npm i --save rxjs", then run "run npm i --save node-fetch"
// @example "jsfiddle": // rxjs v5.5.2
// @example "codepen": // rxjs v6.5.3
const Rx = require('rxjs') // v5.5.2
const fetch = require('node-fetch')
* @param { function< Promise<string, string> > } getPromise - function that returns a promise
* @param { number } values - number of unique values to fetch from API
* @param { number } threads - number of unique values to fetch from API
* @param { number } maxErrors - number of errors, after which we will throw catch case
* @param { number } timeout in millis for whole algorithm
* @return { Promise< Array<string>, string > } promise with array of strings, or error as string
const getUniqueValuesFromPromise = (getPromise, {
values = 3,
threads = 2,
maxErrors = 0,
timeout = 20 * 1000
}) => {
const ajaxObservable = () => Rx.Observable.fromPromise(getPromise())
// will create range (array) from 1 to "threads", aka [1, ... threads]
return Rx.Observable.range(1, threads)
// will convert each number of previous range into a "thread" - into our ajax loop
.map(() => {
// expand in here will loop our ajax call, it will create new observable, right after first one is over
return ajaxObservable().expand(ajaxObservable)
// if number of errors will be greater than "maxErrors" then will stop subscribers
// and because we need "values", it will just fail
// will filter all duplicates, and will allow only "good" values
// will unsubscribe (successfully stop) from observable when we will gain "values" amount
// will transform output to array, so will wait until successful stop
// if no values will come here in "timeout" millis, then we unsubscribe (error stop) from observer
// instead of observable, with this method we create a promise, so it is "eatable"
// outside of non "rxjs" code
* Test function to adjust "getUniqueValuesFromPromise"
* function use it as a first argument, to check the flow.
* Promise, that function will generate, will get resolved in around 15ms.
* @return { Promise<string> } number from 0 to 10
const getPromiseAndRandomlyResolveOrRejectWithNumber = () => new Promise((resolve, reject) => {
let rand = Math.random();
const random = Math.round(rand * 10)
setTimeout(() => (rand ? resolve : reject)(random + ''), random + 10)
/** Task function (won't work in jsfiddle because of cors) */
const getData = () => {
return fetch('').then(r => {
return r.ok ? r.text() : Promise.reject(`${r.statusText} ${r.status}`)
getUniqueValuesFromPromise(getData, {
values: 5,
threads: 2,
maxErrors: 5,
timeout: 20 * 1000
array => document.getElementById('data').innerText = array.join('\n'),
error => document.getElementById('data').innerText = 'error happened: ' + error
