Last active
March 19, 2021 18:27
-
-
Save ryantanner/7124583 to your computer and use it in GitHub Desktop.
Conspire's implementation of work pulling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AnalyticsLeader(supervisor: ActorRef) extends Leader[ProcessUser, AnalyticsNode, AnalyticsMessage](supervisor) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AnalyticsNode extends Node[AnalyticsProcessor, AnalyticsMessage]("analytics") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.goconspire.commons | |
import scala.concurrent.duration._ | |
import scala.collection.mutable.{Map, Queue} | |
import scala.reflect.ClassTag | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.actor.ActorLogging | |
import akka.actor.Props | |
import akka.actor.Terminated | |
import akka.event.LoggingReceive | |
import akka.routing.FromConfig | |
import akka.util.Timeout | |
import com.goconspire.commons.messages._ | |
class Leader[U <: WorkUnit, N <: Node[_, J], J <: JobMessage[J]](supervisor: ActorRef) extends Actor with ActorLogging { | |
// Create the cluster-aware router for managing remote routees | |
context.actorOf(Props[N].withRouter(FromConfig()), | |
"nodeRouter") | |
val workers = Map.empty[ActorRef, Option[(ActorRef, U)]] | |
val workQueue = Queue.empty[(ActorRef, U)] | |
// Notifies workers that there's work available, provided they're | |
// not already working on something | |
def notifyWorkers(): Unit = { | |
if (!workQueue.isEmpty) { | |
workers.foreach { | |
case (worker, m) if (m.isEmpty) => worker ! WorkIsReady | |
case _ => | |
} | |
} | |
} | |
override def preStart = { | |
log.info("Starting leader at {}", self.path.toStringWithAddress(self.path.address)) | |
} | |
def receive = LoggingReceive { | |
case WorkerCreated(worker) => | |
log.info("Worker created: {}", worker) | |
context.watch(worker) | |
workers += (worker -> None) | |
notifyWorkers() | |
case WorkerRequestsWork(worker) => | |
log.info("Worker requests work: {}", worker) | |
if (workers.contains(worker)) { | |
if (workQueue.isEmpty) | |
worker ! NoWorkToBeDone | |
else if (workers(worker) == None) { | |
val (workSender, work) = workQueue.dequeue() | |
workers += (worker -> Some(workSender -> work)) | |
// Use the special form of 'tell' that lets us supply | |
// the sender | |
log.info("Sending work {} to {}", worker, work) | |
worker.tell(WorkToBeDone(work), workSender) | |
} | |
} | |
// Worker has completed its work and we can clear it out | |
case WorkIsDone(msg, worker) => | |
workers.get(worker) match { | |
case Some((requester, _)) => | |
requester ! msg | |
workers += (worker -> none) | |
case None => | |
log.error("Blurgh! {} said it's done work but we didn't know about him", worker) | |
} | |
case Terminated(worker) => | |
if (workers.contains(worker) && workers(worker) != None) { | |
log.error("Blurgh! {} died while processing {}", worker, workers(worker)) | |
// Send the work that it was doing back to ourselves for processing | |
val (workSender, work) = workers(worker).get | |
self.tell(work, workSender) | |
} | |
workers -= worker | |
case trigger: JobTrigger[J, JobAcknowledged[J], JobFailed[J], U] with CollectionJobMessage[_, J] => | |
log.info("Queueing {} items", trigger.items.size) | |
trigger.toWorkUnits.foreach { work => | |
workQueue.enqueue(sender -> work) | |
} | |
notifyWorkers() | |
case trigger: JobTrigger[J, JobAcknowledged[J], JobFailed[J], U] with ItemJobMessage[_, U] => | |
log.info("Queuing single item") | |
trigger.toWorkUnits.foreach { work => | |
workQueue.enqueue(sender -> work) | |
} | |
notifyWorkers() | |
case work: U => | |
workQueue enqueue (sender -> work) | |
notifyWorkers() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Work Request Messages | |
// Messages from Workers | |
case class WorkerCreated(worker: ActorRef) | |
case class WorkerRequestsWork(worker: ActorRef) | |
case class WorkIsDone[M <: JobMessage[M]](msg: JobResponse[M], worker: ActorRef) | |
// Messages to Workers | |
case class WorkToBeDone(work: WorkUnit) | |
case object WorkIsReady | |
case object NoWorkToBeDone | |
trait WorkUnit { | |
def failed(reason: String): JobFailed[_] | |
} | |
// Job messages | |
trait JobMessage[T <: JobMessage[T]] { | |
val info: JobInfo | |
} | |
trait JobInfo { | |
val role: String | |
val command: String | |
val jobId: Long | |
val uuid: UUID | |
} | |
trait JobTrigger[M <: JobMessage[M], A <: JobAcknowledged[M], F <: JobFailed[M], U <: WorkUnit] extends JobMessage[M] { | |
def failed(reason: String): F | |
def acknowledged: A | |
} | |
trait JobResponse[M <: JobMessage[M]] extends JobMessage[M] | |
trait JobFailed[M <: JobMessage[M]] extends JobResponse[M] | |
trait JobAcknowledged[M <: JobMessage[M]] extends JobResponse[M] | |
trait JobCompleted[M <: JobMessage[M]] extends JobResponse[M] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.goconspire.commons | |
import scala.reflect.ClassTag | |
import scala.reflect.classTag | |
import akka.actor.Actor | |
import akka.actor.ActorLogging | |
import akka.actor.Props | |
import com.goconspire.commons.messages._ | |
abstract class Node[Processor <: Actor : ClassTag, M <: JobMessage[M]](serviceName: String) extends Actor with ActorLogging { | |
val facade = context.actorSelection("/user/facade") | |
def leaderMsg(msg: Any) = NotifyLeader(serviceName, msg) | |
def props: Props = Props(classTag[Processor].runtimeClass, self) | |
override def preStart = { | |
facade ! leaderMsg(WorkerCreated(self)) | |
} | |
def working(work: Any): Receive = { | |
case WorkIsReady => // do nothing | |
case NoWorkToBeDone => // do nothing | |
case WorkToBeDone => | |
log.error("Node busy") | |
case sa: JobAcknowledged[M] => | |
facade ! leaderMsg(sa) | |
case ca: JobCompleted[M] => | |
facade ! leaderMsg(WorkIsDone(ca, self)) | |
facade ! leaderMsg(WorkerRequestsWork(self)) | |
context.stop(sender) | |
context.become(idle) | |
case fa: JobFailed[M] => | |
facade ! leaderMsg(WorkIsDone(fa, self)) | |
facade ! leaderMsg(WorkerRequestsWork(self)) | |
context.stop(sender) | |
context.become(idle) | |
} | |
def idle: Receive = { | |
case WorkIsReady => | |
log.info("Requesting work") | |
facade ! leaderMsg(WorkerRequestsWork(self)) | |
case WorkToBeDone(work) => | |
log.info("Got work {}", work) | |
val processor = context.actorOf(props) | |
processor ! work | |
context.become(working(work)) | |
case NoWorkToBeDone => // do nothing | |
} | |
def receive = idle | |
} |
in Leader.scala, line 66:
should be
workers.get(worker) match {
case Some(Some((requester, _))) =>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What's the purpose of the
supervisor
in theLeader
? Unless I've missed something it doesn't appear to be used.