Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Created January 18, 2014 21:58
Show Gist options
  • Save benjchristensen/8497234 to your computer and use it in GitHub Desktop.
Save benjchristensen/8497234 to your computer and use it in GitHub Desktop.
Observable example with just the `bind` operator and unit tests asserting behavior while exploring the best signature. This variant uses: Func1<Operator<R>, Operator<T>>
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;
import static org.junit.Assert.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
public class ObsurvableBind2<T> {
private final Action1<Operator<T>> f;
ObsurvableBind2(Action1<Operator<T>> f) {
this.f = f;
}
public static <T> ObsurvableBind2<T> create(final Action1<Operator<T>> f) {
return new ObsurvableBind2<T>(f);
}
public <R> ObsurvableBind2<R> bind(final Func1<Operator<R>, Operator<T>> bind) {
return new ObsurvableBind2<R>(new Action1<Operator<R>>() {
@Override
public void call(Operator<R> o) {
observe(bind.call(o));
}
});
}
public void observe(Operator<T> o) {
f.call(o);
}
public Subscription subscribe(final Observer<T> o) {
final OperatorSubscription os = new OperatorSubscription();
observe(createOperator(o, os));
return os;
}
public static interface Operator<T> extends Observer<T> {
/**
* Get the Subscription intended to pass up to the source being bound to.
* <p>
* In other words, the subscription from Operator -> Source
*/
public OperatorSubscription getSubscription();
}
private Operator<T> createOperator(final Observer<T> o, final OperatorSubscription os) {
return new Operator<T>() {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T v) {
o.onNext(v);
}
@Override
public OperatorSubscription getSubscription() {
return os;
}
};
}
public static <T> ObsurvableBind2<T> from(final T t) {
return ObsurvableBind2.create(new Action1<Operator<T>>() {
@Override
public void call(Operator<T> o) {
o.onNext(t);
o.onCompleted();
}
});
}
public static <T> ObsurvableBind2<T> from(final Iterable<T> is) {
return ObsurvableBind2.create(new Action1<Operator<T>>() {
@Override
public void call(Operator<T> o) {
for (T i : is) {
if (o.getSubscription().isUnsubscribed()) {
break;
}
o.onNext(i);
}
o.onCompleted();
}
});
}
/**************************************************************************************************************/
private static class OperatorSubscription implements Subscription {
private final CompositeSubscription cs = new CompositeSubscription();
@Override
public void unsubscribe() {
cs.unsubscribe();
}
public boolean isUnsubscribed() {
return cs.isUnsubscribed();
}
/**
* Used to register an unsubscribe callback.
*/
public void add(Subscription s) {
cs.add(s);
}
}
/**************************************************************************************************************/
/**************************************************************************************************************/
public static class UnitTest {
@Test
public void testBindUnsubscribe() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func1<Operator<Integer>, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Operator<Integer> childObserver) {
final OperatorSubscription parentSub = new OperatorSubscription();
// we return an Operator to the "parent"
return new Operator<Integer>() {
@Override
public void onCompleted() {
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(Integer t) {
childObserver.onNext(t);
parentSub.unsubscribe();
}
@Override
public OperatorSubscription getSubscription() {
return parentSub;
}
};
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(1, counter.get());
assertEquals(1, received.get());
}
@Test
public void testBindUnsubscribeLayered() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter)
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(1, counter.get());
assertEquals(1, received.get());
}
private static class DebugTakeFunction<T> implements Func1<Operator<T>, Operator<T>> {
final int num;
final String id;
DebugTakeFunction(final int num, final String id) {
this.num = num;
this.id = id;
}
int count = 0;
@Override
public Operator<T> call(final Operator<T> childObserver) {
final OperatorSubscription parentSub = new OperatorSubscription();
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them
childObserver.getSubscription().add(parentSub);
// we return an Operator to the "parent"
return new Operator<T>() {
@Override
public void onCompleted() {
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(T t) {
count++;
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childObserver.getSubscription().isUnsubscribed());
if (!parentSub.isUnsubscribed()) {
childObserver.onNext(t);
if (count >= num) {
System.out.println("simpleTake[" + id + "] => unsubscribe");
parentSub.unsubscribe();
}
}
}
@Override
public OperatorSubscription getSubscription() {
return parentSub;
}
};
}
}
@Test
public void testBindUnsubscribeNested() {
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
final AtomicInteger childCounter = new AtomicInteger();
OBSERVABLE_OF_5_INTEGERS(counter)
.bind(new DebugTakeFunction<Integer>(4, "A"))
.bind(new DebugTakeFunction<Integer>(3, "B"))
.bind(new Func1<Operator<ObsurvableBind2<String>>, Operator<Integer>>() {
@Override
public Operator<Integer> call(final Operator<ObsurvableBind2<String>> childObserver) {
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue
final OperatorSubscription outerSubscription = new OperatorSubscription();
childObserver.getSubscription().add(outerSubscription);
return new Operator<Integer>() {
@Override
public void onCompleted() {
// if the parent completes we complete and will send nothing further
childObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
childObserver.onError(e);
}
@Override
public void onNext(Integer i) {
if (outerSubscription.isUnsubscribed()) {
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group");
} else {
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i)));
}
}
@Override
public OperatorSubscription getSubscription() {
return outerSubscription;
}
};
}
})
.bind(new DebugTakeFunction<ObsurvableBind2<String>>(2, "C"))
// flatten
.bind(new Func1<Operator<String>, Operator<ObsurvableBind2<String>>>() {
@Override
public Operator<ObsurvableBind2<String>> call(final Operator<String> childObserver) {
final OperatorSubscription parentGivingUsGroups = new OperatorSubscription();
final CompositeSubscription childGroups = new CompositeSubscription();
// if child unsubscribes as we are giving it flattened results then we shut everything down
childObserver.getSubscription().add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("***** flattening got unsubscribed so shut down all groups");
childGroups.unsubscribe();
}
}));
return new Operator<ObsurvableBind2<String>>() {
@Override
public void onCompleted() {
// if the parent completes we don't immediately as sub-groups can still be emitting
}
@Override
public void onError(Throwable e) {
// if an error occurs we immediately shut everything down
childObserver.onError(e);
// unsubscribe all child groups
childGroups.unsubscribe();
}
@Override
public void onNext(ObsurvableBind2<String> o) {
// subscribe to this so we can flatten it
final OperatorSubscription childGroupSubscription = new OperatorSubscription();
childGroups.add(childGroupSubscription);
// if all child groups are unsubscribed we want to unsubscribe the parent
childGroupSubscription.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("***** single group got unsubscribed");
}
}));
o.observe(new Operator<String>() {
@Override
public void onCompleted() {
// remove the subscription if we finish
childGroups.remove(childGroupSubscription);
// TODO we are ignoring when all child groups complete and parent is complete
}
@Override
public void onError(Throwable e) {
// remove the subscription if we finish
childGroups.remove(childGroupSubscription);
}
@Override
public void onNext(String args) {
// emit flattened results
childObserver.onNext(args);
}
@Override
public OperatorSubscription getSubscription() {
return childGroupSubscription;
}
});
}
@Override
public OperatorSubscription getSubscription() {
return parentGivingUsGroups;
}
};
}
})
.bind(new DebugTakeFunction<String>(7, "D"))
.subscribe(new Action1<String>() {
@Override
public void call(String i) {
received.incrementAndGet();
System.out.println("Received: " + i);
}
});
assertEquals(2, counter.get());
assertEquals(7, received.get());
assertEquals(7, childCounter.get());
}
ObsurvableBind2<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = ObsurvableBind2.create(new Action1<Operator<Integer>>() {
@Override
public void call(Operator<Integer> o) {
int i = 1;
// System.out.println("source subscription: " + s);
while (!o.getSubscription().isUnsubscribed()) {
o.onNext(i++);
}
o.onCompleted();
}
});
ObsurvableBind2<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {
return ObsurvableBind2.create(new Action1<Operator<Integer>>() {
@Override
public void call(Operator<Integer> o) {
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s);
for (int i = 1; i <= 5; i++) {
if (o.getSubscription().isUnsubscribed()) {
break;
}
// System.out.println(i);
numEmitted.incrementAndGet();
o.onNext(i);
}
o.onCompleted();
}
});
};
ObsurvableBind2<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());
ObsurvableBind2<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) {
return ObsurvableBind2.create(new Action1<Operator<String>>() {
@Override
public void call(Operator<String> o) {
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s);
for (int i = 1; i <= 5; i++) {
if (o.getSubscription().isUnsubscribed()) {
break;
}
// System.out.println(i);
numEmitted.incrementAndGet();
o.onNext(v + "-" + i);
}
o.onCompleted();
}
});
};
ObsurvableBind2<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) {
return ObsurvableBind2.create(new Action1<Operator<Integer>>() {
@Override
public void call(final Operator<Integer> o) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("-------> subscribe to infinite sequence");
System.out.println("Starting thread: " + Thread.currentThread());
int i = 1;
while (!o.getSubscription().isUnsubscribed()) {
o.onNext(i++);
Thread.yield();
}
o.onCompleted();
latch.countDown();
System.out.println("Ending thread: " + Thread.currentThread());
}
});
t.start();
}
});
}
}
public Subscription subscribe(final Action1<T> onNext) {
return subscribe(new Observer<T>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(T t) {
onNext.call(t);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment