Last active
October 1, 2017 07:35
-
-
Save jemshit/c8fd40180519b6df06ae598d123e1cfd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.jakewharton.rx.ReplayingShare; | |
import com.jakewharton.rxrelay2.PublishRelay; | |
import com.jakewharton.rxrelay2.Relay; | |
import io.reactivex.Observable; | |
import java.util.concurrent.TimeUnit; | |
public class HotReactiveData { | |
// Problems to solve: | |
// 1- UI components should be notified when data is updated | |
// 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified. | |
// 3- Error propagation to UI when ... | |
// 4- Sometimes get fresh data from API | |
private static class LocalDataSource { | |
private String profile = "John"; | |
Observable<String> getProfile() { | |
return Observable.just(profile); | |
} | |
void setProfile(String profile) { | |
this.profile = profile; | |
} | |
} | |
private static class ProfileRepository { | |
private LocalDataSource localDataSource = new LocalDataSource(); | |
private Relay<String> relay = PublishRelay.<String>create().toSerialized(); // replaying share has .replay(1) so no need for BehaviourRelay | |
Observable<String> getProfile() { | |
return localDataSource.getProfile() | |
.mergeWith(relay) | |
.compose(ReplayingShare.instance()) // .replay(1).publish().refCount() | |
.distinctUntilChanged(); // to not to get same item (state) twice | |
} | |
void updateProfileLocalOnly(String profile) { | |
localDataSource.setProfile(profile); | |
relay.accept(profile); | |
} | |
} | |
public static void main(String[] args) throws InterruptedException { | |
ProfileRepository repository = new ProfileRepository(); | |
// Listen to profile | |
repository.getProfile() | |
.subscribe(profile -> System.out.println("Observer1 got profile: " + profile)); | |
// Update profile from somewhere | |
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit | |
.doOnNext(integer -> System.out.println("Updating profile from some place")) | |
.subscribe(integer -> repository.updateProfileLocalOnly("Agent " + integer)); | |
// Second Observer listens to the same data after a delay. Should get last item emitted + new updates | |
Observable.defer(() -> repository.getProfile()) | |
.delaySubscription(2, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess) | |
.subscribe(profile -> System.out.println("Observer2 got profile: " + profile)); | |
Thread.sleep(5000); | |
// Prints: | |
/* Observer1 got profile: John | |
Updating profile from some place | |
Observer1 got profile: Agent 0 | |
Updating profile from some place | |
Observer1 got profile: Agent 1 | |
Observer2 got profile: Agent 1 | |
Updating profile from some place | |
Observer1 got profile: Agent 2 | |
Observer2 got profile: Agent 2 | |
*/ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment