Skip to content

Instantly share code, notes, and snippets.

@guenter
Last active September 17, 2020 11:25
Show Gist options
  • Save guenter/7471695 to your computer and use it in GitHub Desktop.
Save guenter/7471695 to your computer and use it in GitHub Desktop.
A simple Mesos "Hello World": downloads and starts a Python web server on every node in the cluster.
name := "ScalaMesos"
version := "1.0"
scalaVersion := "2.10.3"
resolvers += "Mesosphere Repo" at "http://downloads.mesosphere.io/maven"
libraryDependencies ++= Seq(
"org.apache.mesos" % "mesos" % "0.14.2",
"mesosphere" % "mesos-utils" % "0.0.6"
)
import mesosphere.mesos.util.FrameworkInfo
import org.apache.mesos.MesosSchedulerDriver
/**
* @author Tobi Knaup
*/
object Main extends App {
val framework = FrameworkInfo("ScalaMesos")
val scheduler = new ScalaScheduler
val driver = new MesosSchedulerDriver(scheduler, framework.toProto, "zk://localhost:2181/mesos")
driver.run()
}
import java.util
import mesosphere.mesos.util.ScalarResource
import org.apache.mesos.Protos._
import org.apache.mesos.{SchedulerDriver, Scheduler}
import scala.collection.JavaConverters._
/**
* @author Tobi Knaup
*/
class ScalaScheduler extends Scheduler {
def error(driver: SchedulerDriver, message: String) {}
def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
def disconnected(driver: SchedulerDriver) {}
def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]) {}
def statusUpdate(driver: SchedulerDriver, status: TaskStatus) {
println(s"received status update $status")
}
def offerRescinded(driver: SchedulerDriver, offerId: OfferID) {}
def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]) {
for (offer <- offers.asScala) {
println(s"offer $offer")
val cmd = CommandInfo.newBuilder
.addUris(CommandInfo.URI.newBuilder.setValue("https://gist.github.com/guenter/7470373/raw/42ed566dba6a22f1b160e9774d750e46e83b61ad/http.py"))
.setValue("python http.py")
val cpus = ScalarResource("cpus", 1.0)
val id = "task" + System.currentTimeMillis()
val task = TaskInfo.newBuilder
.setCommand(cmd)
.setName(id)
.setTaskId(TaskID.newBuilder.setValue(id))
.addResources(cpus.toProto)
.setSlaveId(offer.getSlaveId)
.build
driver.launchTasks(offer.getId, List(task).asJava)
}
}
def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo) {}
def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {}
}
@milisarge
Copy link

Also why task is running endless loop. Is it one-shot running command?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment