Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 13:56
Show Gist options
  • Save akarnokd/8860735 to your computer and use it in GitHub Desktop.
Save akarnokd/8860735 to your computer and use it in GitHub Desktop.
* Copyright 2014 Netflix, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package rx.operators;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TestScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
* Delivers events on the specified Scheduler.
* <p>
* This provides backpressure by blocking the incoming onNext when there is already one in the queue.
* <p>
* This means that at any given time the max number of "onNext" in flight is 3:
* -> 1 being delivered on the Scheduler
* -> 1 in the queue waiting for the Scheduler
* -> 1 blocking on the queue waiting to deliver it
* I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler
* can loop and have something to do each time around to optimize for avoiding rescheduling when it
* can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop
* thus if the queue is empty it exits and next time something is added it will reschedule.
* <img width="640" src="">
public class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
public OperatorObserveOn(Scheduler scheduler) {
this.scheduler = scheduler;
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TestScheduler) {
// this one will deadlock as it is single-threaded and won't run the scheduled
// work until it manually advances, which it won't be able to do as it will block
return child;
} else {
return new ObserveOnSubscriber(child);
private static Object NULL_SENTINEL = new Object();
private static Object COMPLETE_SENTINEL = new Object();
private static class ErrorSentinel {
final Throwable e;
ErrorSentinel(Throwable e) {
this.e = e;
/** Observe through individual queue per observer. */
private class ObserveOnSubscriber extends Subscriber<T> {
final Subscriber<? super T> observer;
private volatile Scheduler.Inner recursiveScheduler;
private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue();
final AtomicLong counter = new AtomicLong(0);
public ObserveOnSubscriber(Subscriber<? super T> observer) {
super(observer); = observer;
public void onNext(final T t) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
if (t == null) {
} else {
} catch (InterruptedException e) {
if (!isUnsubscribed()) {
public void onCompleted() {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
} catch (InterruptedException e) {
public void onError(final Throwable e) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.put(new ErrorSentinel(e));
} catch (InterruptedException e2) {
// call directly if we can't schedule
protected void schedule() {
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
// first time through, register a Subscription
// that can interrupt this thread
add(Subscriptions.create(new Action0() {
public void call() {
// we have to interrupt the parent thread because
// it can be blocked on queue.put
add(scheduler.schedule(new Action1<Inner>() {
public void call(Inner inner) {
recursiveScheduler = inner;
} else {
recursiveScheduler.schedule(new Action1<Inner>() {
public void call(Inner inner) {
private void pollQueue() {
do {
Object v = queue.poll();
if (v != null) {
if (v == NULL_SENTINEL) {
} else if (v == COMPLETE_SENTINEL) {
} else if (v instanceof ErrorSentinel) {
observer.onError(((ErrorSentinel) v).e);
} else {
observer.onNext((T) v);
} while (counter.decrementAndGet() > 0);
private static final int MAX_ELEMENTS = 4;
* Same behavior as ArrayBlockingQueue<Object>(1) except that we can interrupt/unsubscribe it.
private class InterruptibleBlockingQueue {
private final Semaphore semaphore = new Semaphore(MAX_ELEMENTS);
private final BlockingQueue<Object> bq = new LinkedBlockingQueue<Object>();
private volatile boolean interrupted = false;
public Object poll() {
if (interrupted) {
return null;
Object o = bq.peek();
if (o == null) {
return null;
try {
return o;
} finally {
* Add an Object, blocking if an item is already in the queue.
* @param o
* @throws InterruptedException
public void put(Object o) throws InterruptedException {
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
if (o == null) {
throw new IllegalArgumentException("Can not put null");
* Used to unsubscribe and interrupt the producer if blocked in put()
public void interrupt() {
interrupted = true;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment