Created
January 19, 2014 15:33
-
-
Save benjchristensen/8506432 to your computer and use it in GitHub Desktop.
Observable example with just the `bind` operator and unit tests asserting behavior while exploring the best signature. This variant uses: Func1<Operator<R>, Operator<T>> but has Operator implement Subscription directly and compose a CompositeSubscription within it.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx; | |
import static org.junit.Assert.*; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.junit.Test; | |
import rx.subscriptions.CompositeSubscription; | |
import rx.subscriptions.Subscriptions; | |
import rx.util.functions.Action0; | |
import rx.util.functions.Action1; | |
import rx.util.functions.Func1; | |
public class ObsurvableBind3<T> { | |
private final Action1<Operator<T>> f; | |
ObsurvableBind3(Action1<Operator<T>> f) { | |
this.f = f; | |
} | |
public static <T> ObsurvableBind3<T> create(final Action1<Operator<T>> f) { | |
return new ObsurvableBind3<T>(f); | |
} | |
public <R> ObsurvableBind3<R> bind(final Func1<Operator<R>, Operator<T>> bind) { | |
return new ObsurvableBind3<R>(new Action1<Operator<R>>() { | |
@Override | |
public void call(Operator<R> o) { | |
observe(bind.call(o)); | |
} | |
}); | |
} | |
public void observe(Operator<T> o) { | |
f.call(o); | |
} | |
public Subscription subscribe(final Observer<T> o) { | |
Operator<T> op = Operator.create(o); | |
observe(op); | |
return op; | |
} | |
public static abstract class Operator<T> implements Observer<T>, Subscription { | |
private final CompositeSubscription cs; | |
Operator(CompositeSubscription cs) { | |
this.cs = cs; | |
} | |
Operator() { | |
this.cs = new CompositeSubscription(); | |
} | |
public static <T> Operator<T> create(final Observer<T> o) { | |
return create(o, null); | |
} | |
public static <T> Operator<T> create(final Observer<T> o, Subscription s) { | |
Operator<T> op = new Operator<T>() { | |
@Override | |
public void onCompleted() { | |
o.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
o.onError(e); | |
} | |
@Override | |
public void onNext(T v) { | |
o.onNext(v); | |
} | |
}; | |
if (s != null) { | |
op.add(s); | |
} | |
return op; | |
} | |
/** | |
* Used to register an unsubscribe callback. | |
*/ | |
public final void add(Subscription s) { | |
cs.add(s); | |
} | |
@Override | |
public final void unsubscribe() { | |
cs.unsubscribe(); | |
} | |
public final boolean isUnsubscribed() { | |
return cs.isUnsubscribed(); | |
} | |
} | |
public static <T> ObsurvableBind3<T> from(final T t) { | |
return ObsurvableBind3.create(new Action1<Operator<T>>() { | |
@Override | |
public void call(Operator<T> o) { | |
o.onNext(t); | |
o.onCompleted(); | |
} | |
}); | |
} | |
public static <T> ObsurvableBind3<T> from(final Iterable<T> is) { | |
return ObsurvableBind3.create(new Action1<Operator<T>>() { | |
@Override | |
public void call(Operator<T> o) { | |
for (T i : is) { | |
if (o.isUnsubscribed()) { | |
break; | |
} | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
} | |
/**************************************************************************************************************/ | |
/**************************************************************************************************************/ | |
public static class UnitTest { | |
@Test | |
public void testBindUnsubscribe() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter).bind(new Func1<Operator<Integer>, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Operator<Integer> childObserver) { | |
// we return an Operator to the "parent" | |
return new Operator<Integer>() { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer t) { | |
childObserver.onNext(t); | |
unsubscribe(); | |
} | |
}; | |
} | |
}).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
@Test | |
public void testBindUnsubscribeLayered() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(2, "A")).bind(new DebugTakeFunction<Integer>(1, "B")).subscribe(new Action1<Integer>() { | |
@Override | |
public void call(Integer i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(1, counter.get()); | |
assertEquals(1, received.get()); | |
} | |
private static class DebugTakeFunction<T> implements Func1<Operator<T>, Operator<T>> { | |
final int num; | |
final String id; | |
DebugTakeFunction(final int num, final String id) { | |
this.num = num; | |
this.id = id; | |
} | |
int count = 0; | |
@Override | |
public Operator<T> call(final Operator<T> childObserver) { | |
final CompositeSubscription parentSub = new CompositeSubscription(); | |
// when the child unsubscribes we want it to trigger the parent unsubscribe so we link them | |
childObserver.add(parentSub); | |
// we return an Operator to the "parent" | |
return new Operator<T>(parentSub) { | |
@Override | |
public void onCompleted() { | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
count++; | |
System.out.println("simpleTake[" + id + "] => onNext " + count + " parentSub: " + parentSub.isUnsubscribed() + " childSub: " + childObserver.isUnsubscribed()); | |
if (!parentSub.isUnsubscribed()) { | |
childObserver.onNext(t); | |
if (count >= num) { | |
System.out.println("simpleTake[" + id + "] => unsubscribe"); | |
parentSub.unsubscribe(); | |
} | |
} | |
} | |
}; | |
} | |
} | |
@Test | |
public void testBindUnsubscribeNested() { | |
final AtomicInteger counter = new AtomicInteger(); | |
final AtomicInteger received = new AtomicInteger(); | |
final AtomicInteger childCounter = new AtomicInteger(); | |
OBSERVABLE_OF_5_INTEGERS(counter) | |
.bind(new DebugTakeFunction<Integer>(4, "A")) | |
.bind(new DebugTakeFunction<Integer>(3, "B")) | |
.bind(new Func1<Operator<ObsurvableBind3<String>>, Operator<Integer>>() { | |
@Override | |
public Operator<Integer> call(final Operator<ObsurvableBind3<String>> childObserver) { | |
// when a child unsubscribes to this outer parent we will just filter out anything further while the nested Obsurvables continue | |
final CompositeSubscription outerSubscription = new CompositeSubscription(); | |
childObserver.add(outerSubscription); | |
return new Operator<Integer>(outerSubscription) { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we complete and will send nothing further | |
childObserver.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
childObserver.onError(e); | |
} | |
@Override | |
public void onNext(Integer i) { | |
if (outerSubscription.isUnsubscribed()) { | |
System.out.println("***** group onNext => outerSubscription unsubscribed so skipping onNext of group"); | |
} else { | |
childObserver.onNext(OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(childCounter, String.valueOf(i))); | |
} | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<ObsurvableBind3<String>>(2, "C")) | |
// flatten | |
.bind(new Func1<Operator<String>, Operator<ObsurvableBind3<String>>>() { | |
@Override | |
public Operator<ObsurvableBind3<String>> call(final Operator<String> childObserver) { | |
final CompositeSubscription parentGivingUsGroups = new CompositeSubscription(); | |
final CompositeSubscription childGroups = new CompositeSubscription(); | |
// if child unsubscribes as we are giving it flattened results then we shut everything down | |
childObserver.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** flattening got unsubscribed so shut down all groups"); | |
childGroups.unsubscribe(); | |
} | |
})); | |
return new Operator<ObsurvableBind3<String>>(parentGivingUsGroups) { | |
@Override | |
public void onCompleted() { | |
// if the parent completes we don't immediately as sub-groups can still be emitting | |
} | |
@Override | |
public void onError(Throwable e) { | |
// if an error occurs we immediately shut everything down | |
childObserver.onError(e); | |
// unsubscribe all child groups | |
childGroups.unsubscribe(); | |
} | |
@Override | |
public void onNext(ObsurvableBind3<String> o) { | |
// subscribe to this so we can flatten it | |
final CompositeSubscription childGroupSubscription = new CompositeSubscription(); | |
childGroups.add(childGroupSubscription); | |
// if all child groups are unsubscribed we want to unsubscribe the parent | |
childGroupSubscription.add(Subscriptions.create(new Action0() { | |
@Override | |
public void call() { | |
System.out.println("***** single group got unsubscribed"); | |
} | |
})); | |
o.observe(new Operator<String>(childGroupSubscription) { | |
@Override | |
public void onCompleted() { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
// TODO we are ignoring when all child groups complete and parent is complete | |
} | |
@Override | |
public void onError(Throwable e) { | |
// remove the subscription if we finish | |
childGroups.remove(childGroupSubscription); | |
} | |
@Override | |
public void onNext(String args) { | |
// emit flattened results | |
childObserver.onNext(args); | |
} | |
}); | |
} | |
}; | |
} | |
}) | |
.bind(new DebugTakeFunction<String>(7, "D")) | |
.subscribe(new Action1<String>() { | |
@Override | |
public void call(String i) { | |
received.incrementAndGet(); | |
System.out.println("Received: " + i); | |
} | |
}); | |
assertEquals(2, counter.get()); | |
assertEquals(7, received.get()); | |
assertEquals(7, childCounter.get()); | |
} | |
ObsurvableBind3<Integer> OBSERVABLE_OF_INFINITE_INTEGERS = ObsurvableBind3.create(new Action1<Operator<Integer>>() { | |
@Override | |
public void call(Operator<Integer> o) { | |
int i = 1; | |
// System.out.println("source subscription: " + s); | |
while (!o.isUnsubscribed()) { | |
o.onNext(i++); | |
} | |
o.onCompleted(); | |
} | |
}); | |
ObsurvableBind3<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) { | |
return ObsurvableBind3.create(new Action1<Operator<Integer>>() { | |
@Override | |
public void call(Operator<Integer> o) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGERS source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (o.isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
ObsurvableBind3<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); | |
ObsurvableBind3<String> OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING(final AtomicInteger numEmitted, final String v) { | |
return ObsurvableBind3.create(new Action1<Operator<String>>() { | |
@Override | |
public void call(Operator<String> o) { | |
// System.out.println("$$ OBSERVABLE_OF_5_INTEGER_PLUS_INPUT_TO_STRING source subscription: " + s); | |
for (int i = 1; i <= 5; i++) { | |
if (o.isUnsubscribed()) { | |
break; | |
} | |
// System.out.println(i); | |
numEmitted.incrementAndGet(); | |
o.onNext(v + "-" + i); | |
} | |
o.onCompleted(); | |
} | |
}); | |
}; | |
ObsurvableBind3<Integer> ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(final CountDownLatch latch) { | |
return ObsurvableBind3.create(new Action1<Operator<Integer>>() { | |
@Override | |
public void call(final Operator<Integer> o) { | |
Thread t = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
System.out.println("-------> subscribe to infinite sequence"); | |
System.out.println("Starting thread: " + Thread.currentThread()); | |
int i = 1; | |
while (!o.isUnsubscribed()) { | |
o.onNext(i++); | |
Thread.yield(); | |
} | |
o.onCompleted(); | |
latch.countDown(); | |
System.out.println("Ending thread: " + Thread.currentThread()); | |
} | |
}); | |
t.start(); | |
} | |
}); | |
} | |
} | |
public Subscription subscribe(final Action1<T> onNext) { | |
return subscribe(new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(T t) { | |
onNext.call(t); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment