Skip to content

Instantly share code, notes, and snippets.

@JoshRosen
Last active August 29, 2015 14:07
Show Gist options
  • Save JoshRosen/287630864ac9803fe59f to your computer and use it in GitHub Desktop.
Save JoshRosen/287630864ac9803fe59f to your computer and use it in GitHub Desktop.
name := "Hadoop Configuration thread-safety test"
version := "1.0"
scalaVersion := "2.10.4"
// libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.4"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.4.1"
import java.util.concurrent.{TimeUnit, ExecutorService, Executors}
import org.apache.hadoop.conf.Configuration
import scala.collection.JavaConversions._
import scala.util.Random
object HadoopThreadSafetyTest {
val operations: Seq[(Random, Configuration) => Any] = Seq(
{ case (_, conf) => conf.iterator.size },
{ case (rand, conf) => conf.set(rand.nextString(10), rand.nextString(10)) },
{ case (rand, conf) => conf.get(rand.nextString(10)) }
)
def getRunnable(id: Int, conf: Configuration): Runnable = {
new Runnable {
override def run(): Unit = {
val rand = new Random()
for (j <- 1 to NUM_OPS_PER_THREAD) {
val op = rand.shuffle(operations).head
op.apply(rand, conf)
}
println(s"Thread $id finished")
}
}
}
val NUM_OPS_PER_THREAD = 100
val NUM_THREADS = 100
val SHOULD_CLONE = false
val TIMEOUT_MS = 100000
def main(args: Array[String]) {
val sharedConfiguration: Configuration = new Configuration()
val pool: ExecutorService = Executors.newFixedThreadPool(NUM_THREADS)
for (i <- 1 to NUM_THREADS) {
val conf = if (SHOULD_CLONE) {
new Configuration(sharedConfiguration)
} else {
sharedConfiguration
}
pool.execute(getRunnable(i, conf))
}
pool.shutdown()
if (!pool.awaitTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
println("TEST FAILED DUE TO TIMEOUT")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment