Last active
December 10, 2015 22:48
-
-
Save som-snytt/4504281 to your computer and use it in GitHub Desktop.
Future select
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package object futurism { | |
import scala.language.higherKinds | |
import scala.collection.generic.CanBuildFrom | |
import scala.collection.{TraversableOnce => Once} | |
import scala.concurrent._ | |
import scala.util.{Try, Success, Failure} | |
import scala.util.control.NonFatal | |
// TraversableOnce.filter results in an iterator that !isTraversableAgain | |
implicit class NaiveSelectable(val co: Future.type) extends AnyVal { | |
def select[A](fs: Once[Future[A]])(implicit xc: ExecutionContext): Future[(Try[A], Once[Future[A]])] = { | |
val p = Promise[(Try[A], Once[Future[A]])] | |
def finishing(f: Future[A])(v: Try[A]) { if (!p.isCompleted) p trySuccess ((v, fs filter (_ ne f))) } | |
fs foreach (f => f onComplete finishing(f) _) | |
println("Selectably...") | |
p.future | |
} | |
} | |
implicit class Selectable(val co: Future.type) extends AnyVal { | |
def select[A, B <: Future[A], M[+B] <: TraversableOnce[B]](fs: M[Future[A]]) | |
(implicit cbf: CanBuildFrom[M[B],B,M[B]], xc: ExecutionContext): | |
Future[(Try[A], M[Future[A]])] = { | |
val p = Promise[(Try[A], M[B])]() | |
def finishing(f: Future[A])(v: Try[A]) { | |
if (!p.isCompleted) p tryComplete { | |
val b = cbf() | |
for (x <- fs; if x ne f) b += x.asInstanceOf[B] | |
Success((v, b.result)) | |
} | |
} | |
fs foreach (f => f onComplete finishing(f) _) | |
println("Selectably...") | |
p.future | |
} | |
} | |
/** "Select" off the first future to be satisfied. Return this as a | |
* result, with the remainder of the Futures as a sequence. | |
* | |
* @param fs a scala.collection.Seq | |
*/ | |
implicit class Klangable(val co: Future.type) /*extends AnyVal*/ { | |
import scala.annotation._ | |
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = { | |
@tailrec def stripe(p: Promise[(Try[A], Seq[Future[A]])], | |
heads: Seq[Future[A]], | |
elem: Future[A], | |
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = { | |
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) } | |
if (tail.isEmpty) p.future | |
else stripe(p, heads :+ elem, tail.head, tail.tail) | |
} | |
println("Klang, klang, klang, went the trolley.") | |
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!")) | |
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail) | |
} | |
} | |
implicit class EitherFuture[A](val f: Future[A]) extends AnyVal { | |
def packaged(implicit xc: ExecutionContext): Future[Either[Throwable, A]] = { | |
val p = Promise[Either[Throwable, A]] | |
f.onComplete{ | |
case res => | |
try { | |
res match { | |
case Failure(t) => p success Left(t) | |
case Success(v) => p success Right(v) | |
} | |
} catch { | |
case NonFatal(t) => p failure t | |
} | |
}(xc) | |
p.future | |
} | |
} | |
} | |
package futuring { | |
import futurism.Selectable | |
//import futurism.Klangable | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import ExecutionContext.Implicits.global | |
object Test extends App { | |
val fs = List(future(5), future(17), future(11)) | |
val (res, rest) = Await.result(Future select fs, Duration.Inf) | |
println(s"result $res, rest $rest") | |
val all = Await.result(Future.sequence(rest), Duration.Inf) | |
println(all mkString ",") | |
} | |
object Crosson extends App { | |
import scala.util.{Try, Success, Failure} | |
import java.util.concurrent.{CountDownLatch => Latch} | |
def sleepily(inS: Int) = future { | |
try Thread.sleep(inS * 1000) | |
catch { | |
case _: InterruptedException => println(s"Interrupted conjuring a $inS") | |
} | |
inS | |
} | |
val workToDo = List(4, 10, 2, 8, 1) | |
val workToDoFutures = workToDo map sleepily | |
val done = new Latch(workToDo.size) | |
//def getResults[T](futures:TraversableOnce[Future[T]]) { | |
def getResults[T](futures: Seq[Future[T]]) { | |
def finish() = done.countDown() | |
val nextOne = Future select futures | |
nextOne onComplete { | |
case Success((result, remains)) => | |
println(s"just for visual debug purposes ${result}") | |
if (remains.size > 0) getResults(remains) | |
finish() | |
case Failure(e) => e.printStackTrace() | |
finish() | |
} | |
} | |
getResults(workToDoFutures) | |
done.await(/*timeout*/) | |
} | |
object Crosson0 extends App { | |
import scala.util.{Try, Success, Failure} | |
import java.util.concurrent.{CountDownLatch => Latch} | |
def sleep(inS: Int) = { Thread.sleep(inS * 1000); inS } | |
val workToDo = List(4, 10, 2, 8, 1) | |
val workToDoFutures = workToDo.map { t => future { sleep(t) } } | |
def getResults[T](futures:TraversableOnce[Future[T]]) { | |
val nextOne = Future select futures | |
nextOne.onComplete { result => | |
result match { | |
case Success((result, remains)) => | |
println(s"just for visual debug purposes ${result}") | |
getResults(remains) | |
case Failure(ex) => ex.printStackTrace() | |
} | |
} | |
} | |
getResults(workToDoFutures) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Selectable unfortunately subtly breaks the contract.