Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created March 14, 2014 10:07
Show Gist options
  • Save akarnokd/9545150 to your computer and use it in GitHub Desktop.
Save akarnokd/9545150 to your computer and use it in GitHub Desktop.
package rx.observers;
import rx.Observer;
/**
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
* <p>
* When multiple threads are notifying they will be serialized by:
* <p>
* <li>Allowing only one thread at a time to emit</li>
* <li>Adding notifications to a queue if another thread is already emitting</li>
* <li>Not holding any locks or blocking any threads while emitting</li>
* <p>
*
* @param <T>
*/
public class SerializedObserver<T> implements Observer<T> {
private final Observer<? super T> actual;
private boolean emitting = false;
private boolean terminated = false;
private FastList queue;
private static final int MAX_DRAIN_ITERATION = 1;
private static final Object NULL_SENTINEL = new Object();
private static final Object COMPLETE_SENTINEL = new Object();
static final class FastList {
Object[] array;
int size;
public void add(Object o) {
int s = size;
Object[] a = array;
if (a == null) {
a = new Object[16];
array = a;
} else
if (s == a.length) {
Object[] array2 = new Object[s + (s >> 2)];
System.arraycopy(a, 0, array2, 0, s);
a = array2;
array = a;
}
a[s] = o;
size = s + 1;
}
}
private static final class ErrorSentinel {
final Throwable e;
ErrorSentinel(Throwable e) {
this.e = e;
}
}
public SerializedObserver(Observer<? super T> s) {
this.actual = s;
}
@Override
public void onCompleted() {
FastList list;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
}
queue.add(COMPLETE_SENTINEL);
return;
}
emitting = true;
list = queue;
queue = null;
}
drainQueue(list);
actual.onCompleted();
}
@Override
public void onError(final Throwable e) {
FastList list;
synchronized (this) {
if (terminated) {
return;
}
terminated = true;
if (emitting) {
if (queue == null) {
queue = new FastList();
}
queue.add(new ErrorSentinel(e));
return;
}
emitting = true;
list = queue;
queue = null;
}
drainQueue(list);
actual.onError(e);
}
@Override
public void onNext(T t) {
FastList list;
synchronized (this) {
if (terminated) {
return;
}
if (emitting) {
if (queue == null) {
queue = new FastList();
}
queue.add(t != null ? t : NULL_SENTINEL);
return;
}
emitting = true;
list = queue;
queue = null;
}
try {
int iter = MAX_DRAIN_ITERATION;
do {
drainQueue(list);
if (iter == MAX_DRAIN_ITERATION) {
actual.onNext(t);
}
--iter;
if (iter > 0) {
synchronized (this) {
list = queue;
queue = null;
}
if (list == null) {
break;
}
}
} while (iter > 0);
} finally {
synchronized (this) {
if (terminated) {
list = queue;
queue = null;
} else {
emitting = false;
list = null;
}
}
drainQueue(list);
}
}
void drainQueue(FastList list) {
if (list == null || list.size == 0) {
return;
}
for (Object v : list.array) {
if (v == null) {
break;
}
if (v == NULL_SENTINEL) {
actual.onNext(null);
} else
if (v == COMPLETE_SENTINEL) {
actual.onCompleted();
} else
if (v.getClass() == ErrorSentinel.class) {
actual.onError(((ErrorSentinel)v).e);
} else {
actual.onNext((T)v);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment