Last active
April 14, 2016 21:19
-
-
Save kevinwright/93036260501236ec629105f81fa78da0 to your computer and use it in GitHub Desktop.
ThrottledRunner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.generic.CanBuildFrom | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.language.higherKinds | |
import scala.util.{Failure, Success} | |
object AsyncUtils { | |
/** | |
* A subtle variant on `Future.traverse` that forces the Futures to be executed one at a time | |
* instead of allowing parallelism | |
*/ | |
def sequencedTraverse | |
[A, B, M[X] <: TraversableOnce[X]] | |
(in: M[A]) | |
(fn: A => Future[B]) | |
(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext) | |
: Future[M[B]] = { | |
in.foldLeft(Future.successful(cbf(in))){ (facc, a) => | |
for (acc <- facc; b <- fn(a)) yield (acc += b) | |
}.map(_.result()) | |
} | |
def throttledTraverse | |
[A, B, M[X] <: TraversableOnce[X]] | |
(in: M[A], maxConcurrents: Int) | |
(fn: A => Future[B]) | |
(implicit | |
cbf1: CanBuildFrom[M[A], Future[B], M[Future[B]]], | |
cbf2: CanBuildFrom[M[Future[B]], B, M[B]], | |
executor: ExecutionContext | |
) | |
: Future[M[B]] = { | |
Future.sequence(ThrottledRunner.run(maxConcurrents, in, fn)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.language.higherKinds | |
import java.util.concurrent.ConcurrentLinkedQueue | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.util.{Failure, Success, Try} | |
import scala.collection.JavaConverters._ | |
import scala.collection.generic.CanBuildFrom | |
object ThrottledRunner { | |
def run[A, B, M[X] <: TraversableOnce[X]]( | |
maxConcurrents: Int, | |
in: M[A], | |
fn: A => Future[B] | |
)(implicit cbf: CanBuildFrom[M[A], Future[B], M[Future[B]]], ec: ExecutionContext) | |
: M[Future[B]] = run(SharedThrottle(maxConcurrents), in, fn) | |
def run[A, B, M[X] <: TraversableOnce[X]]( | |
throttle: SharedThrottle, | |
in: M[A], | |
fn: A => Future[B] | |
)(implicit cbf: CanBuildFrom[M[A], Future[B], M[Future[B]]], ec: ExecutionContext): M[Future[B]] = { | |
import throttle._ | |
object queue extends ConcurrentLinkedQueue[(A, Promise[B])] { | |
val results = new ConcurrentLinkedQueue[Future[B]] | |
def addItem(item: A): Boolean = add(item -> Promise[B]) | |
override def add(pair: (A, Promise[B])): Boolean = { | |
val r = super.add(pair) | |
results add pair._2.future | |
attach() | |
r | |
} | |
final def completedTaskHandler(comp: Try[B], promise: Promise[B]): Unit = { | |
releaseSlot() | |
comp match { | |
case Success(result) => | |
promise success result | |
attach() | |
case Failure(x) => val _ = promise failure x | |
} | |
} | |
final def attach(): Unit = | |
if(!isEmpty && seizeSlot()) { | |
poll() match { | |
case null => () | |
case (x, promise) => fn(x) onComplete { comp => completedTaskHandler(comp, promise) } | |
} | |
} | |
} | |
in foreach queue.addItem | |
queue.results.iterator().asScala.to[M] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment