Created
January 18, 2014 21:58
-
-
Save benjchristensen/8497234 to your computer and use it in GitHub Desktop.
Observable example with just the `bind` operator and unit tests asserting behavior while exploring the best signature. This variant uses: Func1<Operator<R>, Operator<T>>
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx; | |
import static org.junit.Assert.*; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.junit.Test; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Func1; | |
public class ObsurvableBind2<T> { | |
private final Action1<Operator<T>> f; | |
ObsurvableBind2(Action1<Operator<T>> f) { | |
this.f = f; | |
} | |
public static <T> ObsurvableBind2<T> create(final Action1<Operator<T>> f) { | |
return new ObsurvableBind2<T>(f); | |
} | |
public <R> ObsurvableBind2<R> bind(final Func1<Operator<R>, Operator<T>> bind) { | |
return new ObsurvableBind2<R>(new Action1<Operator<R>>() { | |
@Override | |
public void call(Operator<R> o) { | |
observe(bind.call(o)); | |
} | |
}); | |
} | |
public void observe(Operator<T> o) { | |
f.call(o); | |
} | |
public Subscription subscribe(final Observer<T> o) { | |
final OperatorSubscription os = new OperatorSubscription(); | |
observe(createOperator(o, os)); | |
return os; | |
} | |
public static interface Operator<T> extends Observer<T> { | |
/** | |
* Get the Subscription intended to pass up to the source being bound to. | |
* <p> | |
* In other words, the subscription from Operator -> Source | |
*/ | |
public OperatorSubscription getSubscription(); | |
} | |
private Operator<T> createOperator(final Observer<T> o, final OperatorSubscription os) { | |
return new Operator<T>() { | |
@Override | |
public void onCompleted() { | |
o.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(T v) { | |
o.onNext(v); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return os; | |
} | |
}; | |
} | |
public static <T> ObsurvableBind2<T> from(final T t) { | |
return ObsurvableBind2.create(new Action1<Operator<T>>() { | |
@Override | |
public void call(Operator<T> o) { | |
o.onNext(t); | |
o.onCompleted(); | |
} | |
}); | |
} | |
public static <T> ObsurvableBind2<T> from(final Iterable<T> is) { | |
return ObsurvableBind2.create(new Action1<Operator<T>>() { | |
@Override | |
public void call(Operator<T> o) { | |
for (T i : is) { | |
if (o.getSubscription().isUnsubscribed()) { | |
break; | |
} | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
} | |
/**************************************************************************************************************/ | |
private static class OperatorSubscription implements Subscription { | |
private final CompositeSubscription cs = new CompositeSubscription(); | |
@Override | |
public void unsubscribe() { | |
cs.unsubscribe(); | |
} | |
public boolean isUnsubscribed() { | |
return cs.isUnsubscribed(); | |
} | |
/** | |
* Used to register an unsubscribe callback. | |
*/ | |
public void add(Subscription s) { | |
cs.add(s); | |
} | |
} | |
/**************************************************************************************************************/ | |
/**************************************************************************************************************/ | |
public static class UnitTest { | |
@Test | |
public void testBindUnsubscribe() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func1<Operator<Integer>, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Operator<Integer> childObserver) { | |
final OperatorSubscription parentSub = new OperatorSubscription(); | |
// we return an Operator to the "parent" | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer t) { | |
childObserver.onNext(t); | |
parentSub.unsubscribe(); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSub; | |
} | |
}; | |
} | |
}).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
@Test | |
public void testBindUnsubscribeLayered() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
private static class DebugTakeFunction<T> implements Func1<Operator<T>, Operator<T>> { | |
final int num; | |
final String id; | |
DebugTakeFunction(final int num, final String id) { | |
this.num = num; | |
this.id = id; | |
} | |
int count = 0; | |
@Override | |
public Operator<T> call(final Operator<T> childObserver) { | |
final OperatorSubscription parentSub = new OperatorSubscription(); | |
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them | |
childObserver.getSubscription().add(parentSub); | |
// we return an Operator to the "parent" | |
return new Operator<T>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
count++; | |
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childObserver.getSubscription().isUnsubscribed()); | |
if (!parentSub.isUnsubscribed()) { | |
childObserver.onNext(t); | |
if (count >= num) { | |
System.out.println("simpleTake[" + id + "] => unsubscribe"); | |
parentSub.unsubscribe(); | |
} | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentSub; | |
} | |
}; | |
} | |
} | |
@Test | |
public void testBindUnsubscribeNested() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicInteger childCounter = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(4, "A")) | |
.bind(new DebugTakeFunction<Integer>(3, "B")) | |
.bind(new Func1<Operator<ObsurvableBind2<String>>, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Operator<ObsurvableBind2<String>> childObserver) { | |
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue | |
final OperatorSubscription outerSubscription = new OperatorSubscription(); | |
childObserver.getSubscription().add(outerSubscription); | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we complete and will send nothing further | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer i) { | |
if (outerSubscription.isUnsubscribed()) { | |
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group"); | |
} else { | |
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i))); | |
} | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return outerSubscription; | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<ObsurvableBind2<String>>(2, "C")) | |
// flatten | |
.bind(new Func1<Operator<String>, Operator<ObsurvableBind2<String>>>() { | |
@Override | |
public Operator<ObsurvableBind2<String>> call(final Operator<String> childObserver) { | |
final OperatorSubscription parentGivingUsGroups = new OperatorSubscription(); | |
final CompositeSubscription childGroups = new CompositeSubscription(); | |
// if child unsubscribes as we are giving it flattened results then we shut everything down | |
childObserver.getSubscription().add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** flattening got unsubscribed so shut down all groups"); | |
childGroups.unsubscribe(); | |
} | |
})); | |
return new Operator<ObsurvableBind2<String>>() { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we don't immediately as sub-groups can still be emitting | |
} | |
@Override | |
public void onError(Throwable e) { | |
// if an error occurs we immediately shut everything down | |
childObserver.onError(e); | |
// unsubscribe all child groups | |
childGroups.unsubscribe(); | |
} | |
@Override | |
public void onNext(ObsurvableBind2<String> o) { | |
// subscribe to this so we can flatten it | |
final OperatorSubscription childGroupSubscription = new OperatorSubscription(); | |
childGroups.add(childGroupSubscription); | |
// if all child groups are unsubscribed we want to unsubscribe the parent | |
childGroupSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** single group got unsubscribed"); | |
} | |
})); | |
o.observe(new Operator<String>() { | |
@Override | |
public void onCompleted() { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
// TODO we are ignoring when all child groups complete and parent is complete | |
} | |
@Override | |
public void onError(Throwable e) { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
} | |
@Override | |
public void onNext(String args) { | |
// emit flattened results | |
childObserver.onNext(args); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return childGroupSubscription; | |
} | |
}); | |
} | |
@Override | |
public OperatorSubscription getSubscription() { | |
return parentGivingUsGroups; | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<String>(7, "D")) | |
.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(2, counter.get()); | |
assertEquals(7, received.get()); | |
assertEquals(7, childCounter.get()); | |
} | |
ObsurvableBind2<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = ObsurvableBind2.create(new Action1<Operator<Integer>>() { | |
@Override | |
public void call(Operator<Integer> o) { | |
int i = 1; | |
// System.out.println("source subscription: " + s); | |
while (!o.getSubscription().isUnsubscribed()) { | |
o.onNext(i++); | |
} | |
o.onCompleted(); | |
} | |
}); | |
ObsurvableBind2<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) { | |
return ObsurvableBind2.create(new Action1<Operator<Integer>>() { | |
@Override | |
public void call(Operator<Integer> o) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (o.getSubscription().isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
ObsurvableBind2<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); | |
ObsurvableBind2<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) { | |
return ObsurvableBind2.create(new Action1<Operator<String>>() { | |
@Override | |
public void call(Operator<String> o) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (o.getSubscription().isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(v + "-" + i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
ObsurvableBind2<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) { | |
return ObsurvableBind2.create(new Action1<Operator<Integer>>() { | |
@Override | |
public void call(final Operator<Integer> o) { | |
Thread t = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("-------> subscribe to infinite sequence"); | |
System.out.println("Starting thread: " + Thread.currentThread()); | |
int i = 1; | |
while (!o.getSubscription().isUnsubscribed()) { | |
o.onNext(i++); | |
Thread.yield(); | |
} | |
o.onCompleted(); | |
latch.countDown(); | |
System.out.println("Ending thread: " + Thread.currentThread()); | |
} | |
}); | |
t.start(); | |
} | |
}); | |
} | |
} | |
public Subscription subscribe(final Action1<T> onNext) { | |
return subscribe(new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(T t) { | |
onNext.call(t); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment