Skip to content

Instantly share code, notes, and snippets.

@ericacm
Last active December 29, 2015 09:38
Show Gist options
  • Save ericacm/7651168 to your computer and use it in GitHub Desktop.
Save ericacm/7651168 to your computer and use it in GitHub Desktop.
ThreadPoolExecutor using an unbounded queue where minThreads can be less than maxThreads
import scala.concurrent.ExecutionContext
import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, ThreadFactory, TimeUnit}
import java.util.concurrent.{Executors, RejectedExecutionHandler, RejectedExecutionException}
object ScalingThreadPoolExecutor {
val defaultSecondsBeforeEviction = 60
def apply(minThreads: Int, maxThreads: Int, threadFactory: ThreadFactory): ScalingThreadPoolExecutor = {
val queue = new BlockingQueue
val rejectionHandler = new RejectionHandler
val sec = defaultSecondsBeforeEviction
val tpe = new ScalingThreadPoolExecutor(minThreads, maxThreads, sec, threadFactory, queue, rejectionHandler)
queue.executor = tpe
tpe
}
def apply(minThreads: Int, maxThreads: Int): ScalingThreadPoolExecutor = {
apply(minThreads, maxThreads, Executors.defaultThreadFactory)
}
class BlockingQueue extends LinkedBlockingQueue[Runnable] {
var executor: ScalingThreadPoolExecutor = _
/**
* Inserts the specified element at the tail of this queue if there is at
* least one available thread to run the current task. If all pool threads
* are actively busy, it rejects the offer. If the offer is rejected the
* ThreadPoolExecutor will try to allocate a new thread (up to the maximum).
* If a thread cannot be allocated the RejectionHandler will be called.
*/
override def offer(r: Runnable): Boolean = {
val allWorkingThreads = executor.getActiveCount + super.size
allWorkingThreads < executor.getPoolSize && super.offer(r)
}
}
class RejectionHandler extends RejectedExecutionHandler {
/**
* No threads are available to run this task. Queue it.
*/
def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
try executor.getQueue.put(r) catch {
case iex: InterruptedException => throw new RejectedExecutionException(iex)
}
}
}
}
import ScalingThreadPoolExecutor._
/**
* ThreadPoolExecutor using an unbounded queue where minThreads can be less than maxThreads.
*
* The ThreadPoolExecutor javadoc says:
* If there are more than corePoolSize but less than maximumPoolSize threads running,
* a new thread will be created only if the queue is full.
*
* If you use a unbounded queue then ThreadPoolExecutor will never allocate more than corePoolSize
* threads. This executor fixes that problem.
*
* Adapted from https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile
*
* As an alternative see https://github.com/boundary/overlock (ThreadPool.instrumentedElastic)
*/
class ScalingThreadPoolExecutor(minThreads: Int, maxThreads: Int, secBeforeEviction: Int, threadFactory: ThreadFactory,
blockingQueue: BlockingQueue, rejectionHandler: RejectionHandler)
extends ThreadPoolExecutor(minThreads, maxThreads, secBeforeEviction, TimeUnit.SECONDS,
blockingQueue, threadFactory, rejectionHandler) {
// Use AtomicInteger to avoid locking to get activeCount
val activeCount = new AtomicInteger
override def getActiveCount: Int = activeCount.get()
override protected def beforeExecute(t: Thread, r: Runnable) { activeCount.incrementAndGet() }
override protected def afterExecute(r: Runnable, t: Throwable) { activeCount.decrementAndGet() }
}
/**
* An ExecutionContext factory for blocking threads
*/
object BlockingTaskExecutionContext extends Logging {
val ids = new AtomicInteger
def apply(poolName: String, maxThreads: Int = 10): ExecutionContext = {
val threadGroup = Thread.currentThread.getThreadGroup
val uncaughtHandler = new UncaughtExceptionHandler {
def uncaughtException(t: Thread, e: Throwable): Unit = {
log.error(s"Exception in $poolName thread: ${e.getMessage}", e)
}
}
val threadFactory = new ThreadFactory {
def newThread(r: Runnable): Thread = {
val name = s"$poolName-${ids.incrementAndGet()}"
val t = new Thread(threadGroup, r, name)
t.setDaemon(true) // Prevent live threads from stopping System.exit(0)
t.setUncaughtExceptionHandler(uncaughtHandler)
t
}
}
val executor = ScalingThreadPoolExecutor(0, maxThreads, threadFactory)
ExecutionContext.fromExecutor(executor)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment