Skip to content

Instantly share code, notes, and snippets.

@jemshit
Last active October 1, 2017 07:35
Show Gist options
  • Save jemshit/c8fd40180519b6df06ae598d123e1cfd to your computer and use it in GitHub Desktop.
Save jemshit/c8fd40180519b6df06ae598d123e1cfd to your computer and use it in GitHub Desktop.
import com.jakewharton.rx.ReplayingShare;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class HotReactiveData {
// Problems to solve:
// 1- UI components should be notified when data is updated
// 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified.
// 3- Error propagation to UI when ...
// 4- Sometimes get fresh data from API
private static class LocalDataSource {
private String profile = "John";
Observable<String> getProfile() {
return Observable.just(profile);
}
void setProfile(String profile) {
this.profile = profile;
}
}
private static class ProfileRepository {
private LocalDataSource localDataSource = new LocalDataSource();
private Relay<String> relay = PublishRelay.<String>create().toSerialized(); // replaying share has .replay(1) so no need for BehaviourRelay
Observable<String> getProfile() {
return localDataSource.getProfile()
.mergeWith(relay)
.compose(ReplayingShare.instance()) // .replay(1).publish().refCount()
.distinctUntilChanged(); // to not to get same item (state) twice
}
void updateProfileLocalOnly(String profile) {
localDataSource.setProfile(profile);
relay.accept(profile);
}
}
public static void main(String[] args) throws InterruptedException {
ProfileRepository repository = new ProfileRepository();
// Listen to profile
repository.getProfile()
.subscribe(profile -> System.out.println("Observer1 got profile: " + profile));
// Update profile from somewhere
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit
.doOnNext(integer -> System.out.println("Updating profile from some place"))
.subscribe(integer -> repository.updateProfileLocalOnly("Agent " + integer));
// Second Observer listens to the same data after a delay. Should get last item emitted + new updates
Observable.defer(() -> repository.getProfile())
.delaySubscription(2, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess)
.subscribe(profile -> System.out.println("Observer2 got profile: " + profile));
Thread.sleep(5000);
// Prints:
/* Observer1 got profile: John
Updating profile from some place
Observer1 got profile: Agent 0
Updating profile from some place
Observer1 got profile: Agent 1
Observer2 got profile: Agent 1
Updating profile from some place
Observer1 got profile: Agent 2
Observer2 got profile: Agent 2
*/
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment