Last active
May 5, 2024 20:48
-
-
Save michaelsbradleyjr/b473c6099399eec5fdcedf016fe30b5a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// runs as expected with Node v22.1.0, [email protected], [email protected] | |
// $ npm install ix rxjs | |
/* global require setTimeout */ | |
// Rx | |
const {Subject} = require('rxjs'); | |
const {tap: rxTap} = require('rxjs/operators'); | |
// Ix | |
const {AsyncIterableX: {from: ixFrom}} = require('ix/asynciterable'); | |
const {tap: ixTap} = require('ix/asynciterable/operators'); | |
const randomInt = (min, max) => Math.floor(Math.random() * (max + 1 - min)) + min; | |
const timer = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
const timedEffect = async (ms, effect) => { await timer(ms); effect(); }; | |
const triangleNumber = n => n * (n + 1) / 2; | |
const numSends = 10; | |
const onceAround = triangleNumber(numSends); | |
const twiceAround = 2 * onceAround; | |
const minSendWait = 0; | |
const maxSendWait = minSendWait + 10; | |
// make minEffectWait greater than combined time needed for all sends so effects | |
// log only after all sends have logged (easier to visually compare the order) | |
const minEffectWait = numSends * maxSendWait + 100; | |
const maxEffectWait = minEffectWait + 100; | |
let _v = 0; | |
const asyncNext = async (sub$, v) => { | |
const wait = randomInt(minSendWait, maxSendWait); | |
await timer(wait); | |
console.log(`sending: ${v} (after ${wait} ms)`); | |
_v += v; | |
if (_v === onceAround || _v == twiceAround) console.log(); | |
sub$.next(v); | |
}; | |
let _resolve; | |
const rxFinished = new Promise(resolve => { _resolve = resolve; }); | |
let __resolve; | |
const ixFinished = new Promise(resolve => { __resolve = resolve; }); | |
let __v = 0; | |
const causeEffect = v => { | |
const wait = randomInt(minEffectWait, maxEffectWait); | |
// if causeEffect doesn't return a promise ix effect ordering will vary | |
return timedEffect( | |
wait, | |
() => { | |
console.log(`effect with: ${v} (after ${wait} ms)`); | |
__v += v; | |
if (__v === onceAround) _resolve(); | |
if (__v === twiceAround) __resolve(); | |
} | |
); | |
}; | |
function nop() { return; } | |
// Key ideas: | |
// * Rx consumer does *not* control rate of reaction (producer does) so | |
// consumer's effect order exhibits non-deterministism | |
// * Ix consumer *does* control rate of reaction so consumer's effect order is | |
// totally determined by the consumer | |
(async () => { | |
console.log('Rx\n----------------------------------------------------------'); | |
let sub$ = new Subject(); | |
let x = 0; | |
sub$.pipe(rxTap(causeEffect)).subscribe(); | |
while (x < numSends) { | |
asyncNext(sub$, ++x); | |
} | |
await rxFinished; | |
console.log('\n^ send and effect order may be different\n'); | |
console.log('Ix\n----------------------------------------------------------'); | |
sub$ = new Subject(); | |
x = 0; | |
// .forEach() inits consumer's pulling; pipeline asynchronously awaits data | |
ixFrom(sub$).pipe(ixTap(causeEffect)).forEach(nop); | |
while (x < numSends) { | |
asyncNext(sub$, ++x); | |
} | |
await ixFinished; | |
console.log('\n^ send and effect order will *always* be the same (if `causeEffect` returns a promise)\n'); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment