Last active
December 29, 2015 09:38
-
-
Save ericacm/7651168 to your computer and use it in GitHub Desktop.
ThreadPoolExecutor using an unbounded queue where minThreads can be less than maxThreads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.ExecutionContext | |
import java.lang.Thread.UncaughtExceptionHandler | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, ThreadFactory, TimeUnit} | |
import java.util.concurrent.{Executors, RejectedExecutionHandler, RejectedExecutionException} | |
object ScalingThreadPoolExecutor { | |
val defaultSecondsBeforeEviction = 60 | |
def apply(minThreads: Int, maxThreads: Int, threadFactory: ThreadFactory): ScalingThreadPoolExecutor = { | |
val queue = new BlockingQueue | |
val rejectionHandler = new RejectionHandler | |
val sec = defaultSecondsBeforeEviction | |
val tpe = new ScalingThreadPoolExecutor(minThreads, maxThreads, sec, threadFactory, queue, rejectionHandler) | |
queue.executor = tpe | |
tpe | |
} | |
def apply(minThreads: Int, maxThreads: Int): ScalingThreadPoolExecutor = { | |
apply(minThreads, maxThreads, Executors.defaultThreadFactory) | |
} | |
class BlockingQueue extends LinkedBlockingQueue[Runnable] { | |
var executor: ScalingThreadPoolExecutor = _ | |
/** | |
* Inserts the specified element at the tail of this queue if there is at | |
* least one available thread to run the current task. If all pool threads | |
* are actively busy, it rejects the offer. If the offer is rejected the | |
* ThreadPoolExecutor will try to allocate a new thread (up to the maximum). | |
* If a thread cannot be allocated the RejectionHandler will be called. | |
*/ | |
override def offer(r: Runnable): Boolean = { | |
val allWorkingThreads = executor.getActiveCount + super.size | |
allWorkingThreads < executor.getPoolSize && super.offer(r) | |
} | |
} | |
class RejectionHandler extends RejectedExecutionHandler { | |
/** | |
* No threads are available to run this task. Queue it. | |
*/ | |
def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) { | |
try executor.getQueue.put(r) catch { | |
case iex: InterruptedException => throw new RejectedExecutionException(iex) | |
} | |
} | |
} | |
} | |
import ScalingThreadPoolExecutor._ | |
/** | |
* ThreadPoolExecutor using an unbounded queue where minThreads can be less than maxThreads. | |
* | |
* The ThreadPoolExecutor javadoc says: | |
* If there are more than corePoolSize but less than maximumPoolSize threads running, | |
* a new thread will be created only if the queue is full. | |
* | |
* If you use a unbounded queue then ThreadPoolExecutor will never allocate more than corePoolSize | |
* threads. This executor fixes that problem. | |
* | |
* Adapted from https://github.com/kimchy/kimchy.github.com/blob/master/_posts/2008-11-23-juc-executorservice-gotcha.textile | |
* | |
* As an alternative see https://github.com/boundary/overlock (ThreadPool.instrumentedElastic) | |
*/ | |
class ScalingThreadPoolExecutor(minThreads: Int, maxThreads: Int, secBeforeEviction: Int, threadFactory: ThreadFactory, | |
blockingQueue: BlockingQueue, rejectionHandler: RejectionHandler) | |
extends ThreadPoolExecutor(minThreads, maxThreads, secBeforeEviction, TimeUnit.SECONDS, | |
blockingQueue, threadFactory, rejectionHandler) { | |
// Use AtomicInteger to avoid locking to get activeCount | |
val activeCount = new AtomicInteger | |
override def getActiveCount: Int = activeCount.get() | |
override protected def beforeExecute(t: Thread, r: Runnable) { activeCount.incrementAndGet() } | |
override protected def afterExecute(r: Runnable, t: Throwable) { activeCount.decrementAndGet() } | |
} | |
/** | |
* An ExecutionContext factory for blocking threads | |
*/ | |
object BlockingTaskExecutionContext extends Logging { | |
val ids = new AtomicInteger | |
def apply(poolName: String, maxThreads: Int = 10): ExecutionContext = { | |
val threadGroup = Thread.currentThread.getThreadGroup | |
val uncaughtHandler = new UncaughtExceptionHandler { | |
def uncaughtException(t: Thread, e: Throwable): Unit = { | |
log.error(s"Exception in $poolName thread: ${e.getMessage}", e) | |
} | |
} | |
val threadFactory = new ThreadFactory { | |
def newThread(r: Runnable): Thread = { | |
val name = s"$poolName-${ids.incrementAndGet()}" | |
val t = new Thread(threadGroup, r, name) | |
t.setDaemon(true) // Prevent live threads from stopping System.exit(0) | |
t.setUncaughtExceptionHandler(uncaughtHandler) | |
t | |
} | |
} | |
val executor = ScalingThreadPoolExecutor(0, maxThreads, threadFactory) | |
ExecutionContext.fromExecutor(executor) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment