Skip to content

Instantly share code, notes, and snippets.

@sonnenhaft
Last active November 8, 2019 04:24
Show Gist options
  • Save sonnenhaft/db68d51ae94a11b513fc3693c782142d to your computer and use it in GitHub Desktop.
Save sonnenhaft/db68d51ae94a11b513fc3693c782142d to your computer and use it in GitHub Desktop.
RxJS getUniqueValuesFromPromise
// to run this gist, in your current folder you need to "run npm i --save rxjs", then run "run npm i --save node-fetch"
// @example "jsfiddle": https://jsfiddle.net/gs186d4d/ // rxjs v5.5.2
// @example "codepen": https://codepen.io/sonnenhaft/pen/xxxzxEN?editors=0010 // rxjs v6.5.3
const Rx = require('rxjs') // v5.5.2
const fetch = require('node-fetch')
/**
* @param { function< Promise<string, string> > } getPromise - function that returns a promise
* @param { number } values - number of unique values to fetch from API
* @param { number } threads - number of unique values to fetch from API
* @param { number } maxErrors - number of errors, after which we will throw catch case
* @param { number } timeout in millis for whole algorithm
*
* @return { Promise< Array<string>, string > } promise with array of strings, or error as string
*/
const getUniqueValuesFromPromise = (getPromise, {
values = 3,
threads = 2,
maxErrors = 0,
timeout = 20 * 1000
}) => {
const ajaxObservable = () => Rx.Observable.fromPromise(getPromise())
// will create range (array) from 1 to "threads", aka [1, ... threads]
return Rx.Observable.range(1, threads)
// will convert each number of previous range into a "thread" - into our ajax loop
.map(() => {
// expand in here will loop our ajax call, it will create new observable, right after first one is over
return ajaxObservable().expand(ajaxObservable)
})
.mergeAll()
// if number of errors will be greater than "maxErrors" then will stop subscribers
// and because we need "values", it will just fail
.retry(maxErrors)
// will filter all duplicates, and will allow only "good" values
.distinct()
// will unsubscribe (successfully stop) from observable when we will gain "values" amount
.take(values)
// will transform output to array, so will wait until successful stop
.toArray()
// if no values will come here in "timeout" millis, then we unsubscribe (error stop) from observer
.timeout(timeout)
// instead of observable, with this method we create a promise, so it is "eatable"
// outside of non "rxjs" code
.toPromise();
}
/**
* Test function to adjust "getUniqueValuesFromPromise"
* function use it as a first argument, to check the flow.
* Promise, that function will generate, will get resolved in around 15ms.
*
* @return { Promise<string> } number from 0 to 10
*/
const getPromiseAndRandomlyResolveOrRejectWithNumber = () => new Promise((resolve, reject) => {
let rand = Math.random();
const random = Math.round(rand * 10)
setTimeout(() => (rand ? resolve : reject)(random + ''), random + 10)
})
/** Task function (won't work in jsfiddle because of cors) */
const getData = () => {
return fetch('https://api.github.com/zen').then(r => {
return r.ok ? r.text() : Promise.reject(`${r.statusText} ${r.status}`)
})
}
getUniqueValuesFromPromise(getData, {
values: 5,
threads: 2,
maxErrors: 5,
timeout: 20 * 1000
}).then(
array => document.getElementById('data').innerText = array.join('\n'),
error => document.getElementById('data').innerText = 'error happened: ' + error
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment