Last active April 14, 2016 21:19
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.{Failure, Success}
object AsyncUtils {
* A subtle variant on `Future.traverse` that forces the Futures to be executed one at a time
* instead of allowing parallelism
def sequencedTraverse
[A, B, M[X] <: TraversableOnce[X]]
(in: M[A])
(fn: A => Future[B])
(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext)
: Future[M[B]] = {
in.foldLeft(Future.successful(cbf(in))){ (facc, a) =>
for (acc <- facc; b <- fn(a)) yield (acc += b)
def throttledTraverse
[A, B, M[X] <: TraversableOnce[X]]
(in: M[A], maxConcurrents: Int)
(fn: A => Future[B])
cbf1: CanBuildFrom[M[A], Future[B], M[Future[B]]],
cbf2: CanBuildFrom[M[Future[B]], B, M[B]],
executor: ExecutionContext
: Future[M[B]] = {
Future.sequence(, in, fn))
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
final case class SharedThrottle(maxConcurrents: Int) {
require(maxConcurrents > 0, s"SharedThrottle.maxConcurrents must be greater than 0 but was $maxConcurrents")
private[this] val counter = new AtomicInteger(0)
* Try to grab an available slot, uses a CAS operation internally to avoid locking,
* will self-recurse until this succeeds
* @return true if a slot was available and we acquired it, false if not slots free
@tailrec def seizeSlot(): Boolean = {
val n = counter.get
@inline def plusone = n + 1
n < maxConcurrents && (counter.compareAndSet(n, plusone) || seizeSlot())
def releaseSlot(): Unit = { val _ = counter.decrementAndGet() }
def slotsUsed: Int = counter.get
import scala.language.higherKinds
import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
object ThrottledRunner {
def run[A, B, M[X] <: TraversableOnce[X]](
maxConcurrents: Int,
in: M[A],
fn: A => Future[B]
)(implicit cbf: CanBuildFrom[M[A], Future[B], M[Future[B]]], ec: ExecutionContext)
: M[Future[B]] = run(SharedThrottle(maxConcurrents), in, fn)
def run[A, B, M[X] <: TraversableOnce[X]](
throttle: SharedThrottle,
in: M[A],
fn: A => Future[B]
)(implicit cbf: CanBuildFrom[M[A], Future[B], M[Future[B]]], ec: ExecutionContext): M[Future[B]] = {
import throttle._
object queue extends ConcurrentLinkedQueue[(A, Promise[B])] {
val results = new ConcurrentLinkedQueue[Future[B]]
def addItem(item: A): Boolean = add(item -> Promise[B])
override def add(pair: (A, Promise[B])): Boolean = {
val r = super.add(pair)
results add pair._2.future
final def completedTaskHandler(comp: Try[B], promise: Promise[B]): Unit = {
comp match {
case Success(result) =>
promise success result
case Failure(x) => val _ = promise failure x
final def attach(): Unit =
if(!isEmpty && seizeSlot()) {
poll() match {
case null => ()
case (x, promise) => fn(x) onComplete { comp => completedTaskHandler(comp, promise) }
in foreach queue.addItem
