Skip to content

Instantly share code, notes, and snippets.

@ryantanner
Last active March 19, 2021 18:27
Show Gist options
  • Save ryantanner/7124583 to your computer and use it in GitHub Desktop.
Save ryantanner/7124583 to your computer and use it in GitHub Desktop.
Conspire's implementation of work pulling
class AnalyticsLeader(supervisor: ActorRef) extends Leader[ProcessUser, AnalyticsNode, AnalyticsMessage](supervisor)
class AnalyticsNode extends Node[AnalyticsProcessor, AnalyticsMessage]("analytics")
package com.goconspire.commons
import scala.concurrent.duration._
import scala.collection.mutable.{Map, Queue}
import scala.reflect.ClassTag
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.Terminated
import akka.event.LoggingReceive
import akka.routing.FromConfig
import akka.util.Timeout
import com.goconspire.commons.messages._
class Leader[U <: WorkUnit, N <: Node[_, J], J <: JobMessage[J]](supervisor: ActorRef) extends Actor with ActorLogging {
// Create the cluster-aware router for managing remote routees
context.actorOf(Props[N].withRouter(FromConfig()),
"nodeRouter")
val workers = Map.empty[ActorRef, Option[(ActorRef, U)]]
val workQueue = Queue.empty[(ActorRef, U)]
// Notifies workers that there's work available, provided they're
// not already working on something
def notifyWorkers(): Unit = {
if (!workQueue.isEmpty) {
workers.foreach {
case (worker, m) if (m.isEmpty) => worker ! WorkIsReady
case _ =>
}
}
}
override def preStart = {
log.info("Starting leader at {}", self.path.toStringWithAddress(self.path.address))
}
def receive = LoggingReceive {
case WorkerCreated(worker) =>
log.info("Worker created: {}", worker)
context.watch(worker)
workers += (worker -> None)
notifyWorkers()
case WorkerRequestsWork(worker) =>
log.info("Worker requests work: {}", worker)
if (workers.contains(worker)) {
if (workQueue.isEmpty)
worker ! NoWorkToBeDone
else if (workers(worker) == None) {
val (workSender, work) = workQueue.dequeue()
workers += (worker -> Some(workSender -> work))
// Use the special form of 'tell' that lets us supply
// the sender
log.info("Sending work {} to {}", worker, work)
worker.tell(WorkToBeDone(work), workSender)
}
}
// Worker has completed its work and we can clear it out
case WorkIsDone(msg, worker) =>
workers.get(worker) match {
case Some((requester, _)) =>
requester ! msg
workers += (worker -> none)
case None =>
log.error("Blurgh! {} said it's done work but we didn't know about him", worker)
}
case Terminated(worker) =>
if (workers.contains(worker) && workers(worker) != None) {
log.error("Blurgh! {} died while processing {}", worker, workers(worker))
// Send the work that it was doing back to ourselves for processing
val (workSender, work) = workers(worker).get
self.tell(work, workSender)
}
workers -= worker
case trigger: JobTrigger[J, JobAcknowledged[J], JobFailed[J], U] with CollectionJobMessage[_, J] =>
log.info("Queueing {} items", trigger.items.size)
trigger.toWorkUnits.foreach { work =>
workQueue.enqueue(sender -> work)
}
notifyWorkers()
case trigger: JobTrigger[J, JobAcknowledged[J], JobFailed[J], U] with ItemJobMessage[_, U] =>
log.info("Queuing single item")
trigger.toWorkUnits.foreach { work =>
workQueue.enqueue(sender -> work)
}
notifyWorkers()
case work: U =>
workQueue enqueue (sender -> work)
notifyWorkers()
}
}
// Work Request Messages
// Messages from Workers
case class WorkerCreated(worker: ActorRef)
case class WorkerRequestsWork(worker: ActorRef)
case class WorkIsDone[M <: JobMessage[M]](msg: JobResponse[M], worker: ActorRef)
// Messages to Workers
case class WorkToBeDone(work: WorkUnit)
case object WorkIsReady
case object NoWorkToBeDone
trait WorkUnit {
def failed(reason: String): JobFailed[_]
}
// Job messages
trait JobMessage[T <: JobMessage[T]] {
val info: JobInfo
}
trait JobInfo {
val role: String
val command: String
val jobId: Long
val uuid: UUID
}
trait JobTrigger[M <: JobMessage[M], A <: JobAcknowledged[M], F <: JobFailed[M], U <: WorkUnit] extends JobMessage[M] {
def failed(reason: String): F
def acknowledged: A
}
trait JobResponse[M <: JobMessage[M]] extends JobMessage[M]
trait JobFailed[M <: JobMessage[M]] extends JobResponse[M]
trait JobAcknowledged[M <: JobMessage[M]] extends JobResponse[M]
trait JobCompleted[M <: JobMessage[M]] extends JobResponse[M]
package com.goconspire.commons
import scala.reflect.ClassTag
import scala.reflect.classTag
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import com.goconspire.commons.messages._
abstract class Node[Processor <: Actor : ClassTag, M <: JobMessage[M]](serviceName: String) extends Actor with ActorLogging {
val facade = context.actorSelection("/user/facade")
def leaderMsg(msg: Any) = NotifyLeader(serviceName, msg)
def props: Props = Props(classTag[Processor].runtimeClass, self)
override def preStart = {
facade ! leaderMsg(WorkerCreated(self))
}
def working(work: Any): Receive = {
case WorkIsReady => // do nothing
case NoWorkToBeDone => // do nothing
case WorkToBeDone =>
log.error("Node busy")
case sa: JobAcknowledged[M] =>
facade ! leaderMsg(sa)
case ca: JobCompleted[M] =>
facade ! leaderMsg(WorkIsDone(ca, self))
facade ! leaderMsg(WorkerRequestsWork(self))
context.stop(sender)
context.become(idle)
case fa: JobFailed[M] =>
facade ! leaderMsg(WorkIsDone(fa, self))
facade ! leaderMsg(WorkerRequestsWork(self))
context.stop(sender)
context.become(idle)
}
def idle: Receive = {
case WorkIsReady =>
log.info("Requesting work")
facade ! leaderMsg(WorkerRequestsWork(self))
case WorkToBeDone(work) =>
log.info("Got work {}", work)
val processor = context.actorOf(props)
processor ! work
context.become(working(work))
case NoWorkToBeDone => // do nothing
}
def receive = idle
}
@oxlade39
Copy link

What's the purpose of the supervisor in the Leader? Unless I've missed something it doesn't appear to be used.

@smilingleo
Copy link

in Leader.scala, line 66:
should be

workers.get(worker) match {
  case Some(Some((requester, _))) => 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment