Last active
November 8, 2019 04:24
-
-
Save sonnenhaft/db68d51ae94a11b513fc3693c782142d to your computer and use it in GitHub Desktop.
RxJS getUniqueValuesFromPromise
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// to run this gist, in your current folder you need to "run npm i --save rxjs", then run "run npm i --save node-fetch" | |
// @example "jsfiddle": https://jsfiddle.net/gs186d4d/ // rxjs v5.5.2 | |
// @example "codepen": https://codepen.io/sonnenhaft/pen/xxxzxEN?editors=0010 // rxjs v6.5.3 | |
const Rx = require('rxjs') // v5.5.2 | |
const fetch = require('node-fetch') | |
/** | |
* @param { function< Promise<string, string> > } getPromise - function that returns a promise | |
* @param { number } values - number of unique values to fetch from API | |
* @param { number } threads - number of unique values to fetch from API | |
* @param { number } maxErrors - number of errors, after which we will throw catch case | |
* @param { number } timeout in millis for whole algorithm | |
* | |
* @return { Promise< Array<string>, string > } promise with array of strings, or error as string | |
*/ | |
const getUniqueValuesFromPromise = (getPromise, { | |
values = 3, | |
threads = 2, | |
maxErrors = 0, | |
timeout = 20 * 1000 | |
}) => { | |
const ajaxObservable = () => Rx.Observable.fromPromise(getPromise()) | |
// will create range (array) from 1 to "threads", aka [1, ... threads] | |
return Rx.Observable.range(1, threads) | |
// will convert each number of previous range into a "thread" - into our ajax loop | |
.map(() => { | |
// expand in here will loop our ajax call, it will create new observable, right after first one is over | |
return ajaxObservable().expand(ajaxObservable) | |
}) | |
.mergeAll() | |
// if number of errors will be greater than "maxErrors" then will stop subscribers | |
// and because we need "values", it will just fail | |
.retry(maxErrors) | |
// will filter all duplicates, and will allow only "good" values | |
.distinct() | |
// will unsubscribe (successfully stop) from observable when we will gain "values" amount | |
.take(values) | |
// will transform output to array, so will wait until successful stop | |
.toArray() | |
// if no values will come here in "timeout" millis, then we unsubscribe (error stop) from observer | |
.timeout(timeout) | |
// instead of observable, with this method we create a promise, so it is "eatable" | |
// outside of non "rxjs" code | |
.toPromise(); | |
} | |
/** | |
* Test function to adjust "getUniqueValuesFromPromise" | |
* function use it as a first argument, to check the flow. | |
* Promise, that function will generate, will get resolved in around 15ms. | |
* | |
* @return { Promise<string> } number from 0 to 10 | |
*/ | |
const getPromiseAndRandomlyResolveOrRejectWithNumber = () => new Promise((resolve, reject) => { | |
let rand = Math.random(); | |
const random = Math.round(rand * 10) | |
setTimeout(() => (rand ? resolve : reject)(random + ''), random + 10) | |
}) | |
/** Task function (won't work in jsfiddle because of cors) */ | |
const getData = () => { | |
return fetch('https://api.github.com/zen').then(r => { | |
return r.ok ? r.text() : Promise.reject(`${r.statusText} ${r.status}`) | |
}) | |
} | |
getUniqueValuesFromPromise(getData, { | |
values: 5, | |
threads: 2, | |
maxErrors: 5, | |
timeout: 20 * 1000 | |
}).then( | |
array => document.getElementById('data').innerText = array.join('\n'), | |
error => document.getElementById('data').innerText = 'error happened: ' + error | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment