diff --git a/README.md b/README.md
index 9610e6b..cf35156 100644
--- a/README.md
+++ b/README.md
@@ -43,6 +43,8 @@ gocd-mesos {
port: "8080"
user-name: "random-guy"
password: "random-insecure-string"
+ # Enable if the go server mandatorily requires user name and password
+ auth-enabled: false
}
go-agent {
diff --git a/pom.xml b/pom.xml
index 485dd80..1efa135 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,10 +35,6 @@
repo.codahale.com
http://repo.codahale.com/
-
- morphia.googlecode.com
- http://morphia.googlecode.com/svn/mavenrepo/
-
maven.obiba.org/
http://maven.obiba.org/maven2/
@@ -91,6 +87,11 @@
scala-xml
2.11.0-M4
+
+ com.typesafe.play
+ play-json_2.11
+ 2.4.6
+
org.scalatest
scalatest_2.11
@@ -101,9 +102,12 @@
mockito-all
1.9.5
+
+ com.typesafe.scala-logging
+ scala-logging_2.11
+ 3.1.0
+
-
-
diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
index ff326bb..3915b3f 100644
--- a/src/main/resources/application.conf
+++ b/src/main/resources/application.conf
@@ -5,10 +5,12 @@ gocd-mesos {
# GO server config to poll and connect the agent
go-server {
- host: "localhost"
- port: "8080"
+ host: "192.168.33.10"
+ port: "8153"
user-name: "azhagu"
password: "random-insecure-string"
+ # Enable if the go server mandatorily requires user name and password
+ auth-enabled: false
}
go-agent {
diff --git a/src/main/scala/com/indix/mesos/FrameworkConfig.scala b/src/main/scala/com/indix/mesos/FrameworkConfig.scala
index 89d35d5..6216241 100644
--- a/src/main/scala/com/indix/mesos/FrameworkConfig.scala
+++ b/src/main/scala/com/indix/mesos/FrameworkConfig.scala
@@ -14,8 +14,14 @@ class FrameworkConfig(config: Config) {
val goServerPort = rootConfig.getString("go-server.port")
- val goUserName = rootConfig.getString("go-server.user-name")
- val goPassword = rootConfig.getString("go-server.password")
+ val goMinAgents = rootConfig.getInt("go-agent.min-agents")
+
+ val goMaxAgents = rootConfig.getInt("go-agent.max-agents")
+
+ lazy val goUserName = rootConfig.getString("go-server.user-name")
+ lazy val goPassword = rootConfig.getString("go-server.password")
+
+ val goAuthEnabled = if(rootConfig.hasPath("go-server.auth-enabled")) rootConfig.getBoolean("go-server.auth-enabled") else false
val goAgentKey = if(rootConfig.hasPath("go-agent.auto-register-key")) Some(rootConfig.getString("go-agent.auto-register-key")) else None
}
diff --git a/src/main/scala/com/indix/mesos/GOCDPoller.scala b/src/main/scala/com/indix/mesos/GOCDPoller.scala
index e56776d..d7221c2 100644
--- a/src/main/scala/com/indix/mesos/GOCDPoller.scala
+++ b/src/main/scala/com/indix/mesos/GOCDPoller.scala
@@ -1,46 +1,79 @@
package com.indix.mesos
-import java.net.{UnknownHostException, SocketTimeoutException}
-
import com.google.common.io.BaseEncoding
-
import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
import scalaj.http._
+import play.api.libs.json._
-case class GOCDPoller(conf: FrameworkConfig) {
+case class GoAgent(id: String, status: String)
+
+case class GOCDPoller(conf: FrameworkConfig) {
val authToken = BaseEncoding.base64().encode(s"${conf.goUserName}:${conf.goPassword}".getBytes("UTF-8"));
val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int]
- def goTaskQueueSize(): Int = {
+
+ private[mesos] def request(url: String) = {
+ val request = if(conf.goAuthEnabled) {
+ Http(url)
+ .header("Authorization", s"Basic $authToken")
+ } else {
+ Http(url)
+ }.header("Accept", "application/vnd.go.cd.v1+json")
+ request.asString.body
+ }
+
+ private[mesos] def jsonRequest(url: String) = {
+ Json.parse(request(url))
+ }
+
+ private[mesos] def xmlRequest(url: String) = {
+ scala.xml.XML.loadString(request(url))
+ }
+
+ private def buildUrl() = {
+ s"http://${conf.goServerHost}:${conf.goServerPort}"
+ }
+
+
+ def getPendingJobsCount: Int = {
println("Polling GO Server for scheduled jobs")
- try {
- val response: HttpResponse[String] = Http(s"http://${conf.goServerHost}:${conf.goServerPort}" + "/go/api/jobs/scheduled.xml").asString //.header("Authorization", s"Basic ${authToken}").asString
- val responseXml = scala.xml.XML.loadString(response.body)
- (responseXml \\ "scheduledJobs" \\ "job").size
- } catch {
- case e: SocketTimeoutException => {
- println("GOCD Server timed out!!")
- 0
- }
- }
- }
-
- def goIdleAgentsCount() = {
- }
-
- def pollAndAddTask() = {
- val scheduled : Int = goTaskQueueSize()
- println(s"Go server has ${scheduled.toString} pending jobs to be scheduled")
- if(scheduled > 0)
- responseHistory += scheduled
-
- if(responseHistory.size > 5) {
- println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.")
- TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
- responseHistory.clear()
- }
+ withRetry(3){
+ val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml")
+ (responseXml \\ "scheduledJobs" \\ "job").size
+ }.getOrElse(0)
+ }
+
+ def getAllAgents: List[GoAgent] = {
+ println("Polling Go server for agents")
+ withRetry(3) {
+ val responseJson = jsonRequest(buildUrl() + "/go/api/agents")
+ (responseJson \ "_embedded" \ "agents").toOption.map(jsValue => {
+ jsValue.as[JsArray]
+ .value
+ .map(agent => GoAgent((agent \ "uuid").get.as[String], (agent \ "status").get.as[String]))
+ .toList
+ }).getOrElse(List.empty)
+ }.getOrElse(List.empty)
+ }
+
+ def getIdleAgents: List[GoAgent] = {
+ getAllAgents.filter(_.status.equalsIgnoreCase("idle"))
+ }
+
+ def getBuildingAgents = {
+ getAllAgents.filter(_.status.equalsIgnoreCase("building"))
}
+
+
+ private def withRetry[T](n: Int)(fn: => T): Try[T] = {
+ Try(fn) match {
+ case res: Success[T] => res
+ case _ if n < 1 => withRetry(n - 1)(fn)
+ case Failure(ex) => throw ex
+ }
+ }
}
diff --git a/src/main/scala/com/indix/mesos/GOCDScalar.scala b/src/main/scala/com/indix/mesos/GOCDScalar.scala
new file mode 100644
index 0000000..1d72b8e
--- /dev/null
+++ b/src/main/scala/com/indix/mesos/GOCDScalar.scala
@@ -0,0 +1,121 @@
+package com.indix.mesos
+
+import com.indix.mesos.common.GocdMesosLogger
+import org.apache.mesos.MesosSchedulerDriver
+import org.apache.mesos.Protos.{TaskState, TaskID, TaskStatus}
+import scala.collection.JavaConverters._
+
+
+case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) extends GocdMesosLogger {
+
+ def reconcileTasks() = {
+ logger.info("Going to reconcile tasks now")
+ val runningTasks = runningTaskIds.map(id => TaskStatus
+ .newBuilder()
+ .setTaskId(TaskID
+ .newBuilder()
+ .setValue(id)
+ .build())
+ .setState(TaskState.TASK_RUNNING)
+ .build())
+ logger.info(s"There are ${runningTasks.size} tasks that need to reconciled")
+ driver.reconcileTasks(runningTasks.asJavaCollection)
+ }
+
+ def scale() = {
+ logger.info("SCALAR going to do autoscale operation")
+ val supply = getSupply
+ val demand = getDemand
+ if(demand > supply) {
+ logger.info(s"The demand: $demand is greater than supply: $supply. Now computing the agents needed according to min and max agents.")
+ val needed = computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents)
+ if(needed > 0) {
+ logger.info(s"Adding $needed more agents to the fleet.")
+ scaleUp(needed)
+ }
+ } else if (demand < supply) {
+ logger.info(s"The demand: $demand is less than supply: $supply. Now computing the agents not needed according to min and max agents.")
+ val notNeeded = computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents)
+ if(notNeeded > 0) {
+ logger.info(s"Removing $notNeeded agents from the fleet.")
+ scaleDown(notNeeded)
+ }
+ }
+ }
+
+ def reconcileAndScale() = {
+ reconcileTasks()
+ scale()
+ }
+
+ private[mesos] def runningTaskIds = runningTasks.map(_.goAgentUuid)
+
+ private[mesos] def runningTasks = TaskQueue.getRunningTasks
+
+ private[mesos] def pendingTasks = TaskQueue.getPendingTasks
+
+
+ private[mesos] def getTotalAgents = {
+ poller.getAllAgents.size
+ }
+
+ private[mesos] def getSupply = {
+ // Supply is number of 'idle GoAgents that are launched via Mesos' + 'GoAgents that are waiting to be launched'
+ val running = runningTasks
+ .toList
+ .map(_.goAgentUuid)
+ .intersect(poller.getIdleAgents.map(_.id)).size
+ val pending = pendingTasks.size
+ running + pending
+ }
+
+ private[mesos] def getDemand = {
+ // Demand is number of 'jobs pending in Go Server' + 'agents in building state'
+ poller.getPendingJobsCount + poller.getBuildingAgents.size
+ }
+
+ private[mesos] def scaleDown(agentCount: Int) = {
+ val idleAgents = poller.getIdleAgents
+ idleAgents.take(agentCount).foreach(agent => {
+ TaskQueue.getRunningTasks.toList.find(_.goAgentUuid == agent.id).foreach { task =>
+ driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build())
+ }
+ })
+ }
+
+ private[mesos] def scaleUp(agentCount: Int) = {
+ for(_ <- 0 to agentCount) {
+ TaskQueue.add(GoTask(conf.goAgentDocker))
+ }
+ }
+
+ private[mesos] def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = {
+ assert(demand > supply)
+ if(supply > goMaxAgents) {
+ return 0
+ }
+ val needed = demand - supply
+ if(supply + needed > goMaxAgents) {
+ goMaxAgents - supply
+ } else {
+ needed
+ }
+ }
+
+
+ private[mesos] def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = {
+ assert(supply > demand)
+ if(supply < goMinAgents) {
+ return 0
+ }
+ val notNeeded = supply - demand
+ if(supply - notNeeded < goMinAgents) {
+ supply - goMinAgents
+ } else if(supply - notNeeded > goMaxAgents) {
+ supply - goMaxAgents
+ } else {
+ notNeeded
+ }
+ }
+
+}
diff --git a/src/main/scala/com/indix/mesos/GoCDScheduler.scala b/src/main/scala/com/indix/mesos/GoCDScheduler.scala
index f3e5d37..b0a6b8b 100644
--- a/src/main/scala/com/indix/mesos/GoCDScheduler.scala
+++ b/src/main/scala/com/indix/mesos/GoCDScheduler.scala
@@ -2,6 +2,7 @@ package com.indix.mesos
import java.util.UUID
+import com.indix.mesos.common.GocdMesosLogger
import com.typesafe.config.ConfigFactory
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
import org.apache.mesos.Protos.Environment.Variable
@@ -9,10 +10,11 @@ import org.apache.mesos.Protos._
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
import scala.collection.JavaConverters._
+import scala.concurrent.Future
import scala.concurrent.duration._
-class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
+class GoCDScheduler(conf : FrameworkConfig) extends Scheduler with GocdMesosLogger {
lazy val envForGoCDTask = Environment.newBuilder()
.addVariables(Variable.newBuilder().setName("GO_SERVER").setValue(conf.goServerHost).build())
.addVariables(Variable.newBuilder().setName("AGENT_KEY").setValue(conf.goAgentKey.getOrElse(UUID.randomUUID().toString)).build())
@@ -20,19 +22,22 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
override def error(driver: SchedulerDriver, message: String) {}
override def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {
- println(s"executor completed execution with status: $status")
+ logger.info(s"executor completed execution with status: $status")
}
override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
override def disconnected(driver: SchedulerDriver): Unit = {
- println(s"Received Disconnected message $driver")
+ logger.info(s"Received Disconnected message $driver")
}
- override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]) {}
+ override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
+
+ }
override def statusUpdate(driver: SchedulerDriver, status: TaskStatus) {
- println(s"received status update $status")
+ logger.info(s"received status update $status")
+ TaskQueue.updateState(status.getTaskId.getValue, status.getState)
}
override def offerRescinded(driver: SchedulerDriver, offerId: OfferID) {}
@@ -43,26 +48,23 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
*
*/
override def resourceOffers(driver: SchedulerDriver, offers: java.util.List[Offer]) {
- println(s"Received resource offer size: ${offers.size()}")
+ logger.info(s"Received resource offer size: ${offers.size()}")
//for every available offer run tasks
for (offer <- offers.asScala) {
- println(s"offer $offer")
- val nextTask = TaskQueue.dequeue
- if(nextTask != null) {
- val task = deployGoAgentTask(nextTask, offer)
- task match {
- case Some(tt) => driver.launchTasks(List(offer.getId).asJava, List(tt).asJava)
+ logger.info(s"offer $offer")
+ TaskQueue.findNext.foreach { goTask =>
+ val mesosTask = deployGoAgentTask(goTask, offer)
+ mesosTask match {
+ case Some(tt) => {
+ driver.launchTasks(List(offer.getId).asJava, List(tt).asJava)
+ TaskQueue.add(goTask.copy(state = GoTaskState.Scheduled))
+ }
case None => {
- TaskQueue.enqueue(nextTask)
- println(s"declining unused offer because offer is not enough")
+ logger.info(s"declining unused offer because offer is not enough")
driver.declineOffer(offer.getId)
}
}
}
- else {
- println(s"declining unused offer because there is no task")
- driver.declineOffer(offer.getId)
- }
}
}
@@ -94,7 +96,13 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
.setContainer(dockerContainerInfo(goTask).build())
.setCommand(Protos.CommandInfo.newBuilder()
.setEnvironment(Protos.Environment.newBuilder()
- .addAllVariables(envForGoCDTask.getVariablesList))
+ .addAllVariables(envForGoCDTask
+ .addVariables(Variable
+ .newBuilder()
+ .setName("GO_AGENT_UUID")
+ .setValue(goTask.goAgentUuid)
+ .build())
+ .getVariablesList))
.setShell(false))
}
@@ -104,7 +112,7 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
val available = Resources(offer)
if(available.canSatisfy(needed)) {
val currentTimeStamp = System.currentTimeMillis()
- val taskId = "gocd-agent-task-" + currentTimeStamp
+ val taskId = goTask.mesosTaskId
val task = TaskInfo.newBuilder
.setExecutor(executorInfo(goTask, currentTimeStamp).build())
.setName(taskId)
@@ -120,36 +128,21 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
}
override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo) {
- println(s"RE-registered with mesos master.")
+ logger.info(s"RE-registered with mesos master.")
}
override def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- println(s"registered with mesos master. Framework id is ${frameworkId.getValue}")
+ logger.info(s"registered with mesos master. Framework id is ${frameworkId.getValue}")
}
-
}
- object GoCDMesosFramework extends App {
- val config = new FrameworkConfig(ConfigFactory.load())
-
+ object GoCDMesosFramework extends App with GocdMesosLogger {
+ val config = new FrameworkConfig(ConfigFactory.load())
val id = "GOCD-Mesos-" + System.currentTimeMillis()
-
-
- println(s"The Framework id is $id")
-
+ logger.info(s"The Framework id is $id")
val poller = GOCDPoller(config)
- val timeInterval = 2 * 60 * 1000
- val runnable = new Runnable {
- override def run(): Unit = {
- while(true) {
- poller.pollAndAddTask
- Thread.sleep(timeInterval)
- }
- }
- }
-
val frameworkInfo = FrameworkInfo.newBuilder()
.setId(FrameworkID.newBuilder().setValue(id).build())
.setName("GOCD-Mesos")
@@ -160,21 +153,24 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
.setFailoverTimeout(60.seconds.toMillis)
.build()
- val thread = new Thread(runnable)
- thread.start()
val scheduler = new GoCDScheduler(config)
val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, config.mesosMaster)
+ val scalar = GOCDScalar(config, poller, driver)
println("Starting the driver")
- val status = if(driver.run() == Status.DRIVER_STOPPED) 0 else 1
-
- //driver.run }
-
- // println(driver.join())
+ val timeInterval = 3 * 60 * 1000
+ val runnable = new Runnable {
+ override def run(): Unit = {
+ while(true) {
+ scalar.reconcileAndScale
+ Thread.sleep(timeInterval)
+ }
+ }
+ }
- println(status)
+ val status = if(driver.run() == Status.DRIVER_STOPPED) 0 else 1
// Ensure that the driver process terminates.
- driver.stop();
+ driver.stop()
// For this test to pass reliably on some platforms, this sleep is
// required to ensure that the SchedulerDriver teardown is complete
// before the JVM starts running native object destructors after
@@ -183,8 +179,6 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler {
// TODO(greg): Ideally, we would inspect the status of the driver
// and its associated tasks via the Java API and wait until their
// teardown is complete to exit.
- Thread.sleep(500);
-
- System.exit(status);
- println("===================================")
-}
\ No newline at end of file
+ Thread.sleep(500)
+ System.exit(status)
+ }
\ No newline at end of file
diff --git a/src/main/scala/com/indix/mesos/TaskQueue.scala b/src/main/scala/com/indix/mesos/TaskQueue.scala
index 5fa0985..0997138 100644
--- a/src/main/scala/com/indix/mesos/TaskQueue.scala
+++ b/src/main/scala/com/indix/mesos/TaskQueue.scala
@@ -1,24 +1,133 @@
package com.indix.mesos
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.UUID
+import org.apache.mesos.Protos.TaskState
+
+
+object GoTaskState {
+ abstract class GoTaskState
+ object Pending extends GoTaskState
+ object Scheduled extends GoTaskState
+ object Running extends GoTaskState
+ object Stopped extends GoTaskState
+}
case class GoTaskResource(cpu: Double = 1, memory: Double = 256, disk: Double = 1*1024)
-case class GoTask(cmdString: String, dockerImage: String, uri: String, resource: GoTaskResource = GoTaskResource()) {}
+
+
+case class GoTask(dockerImage: String,
+ uri: String,
+ state: GoTaskState.GoTaskState,
+ goAgentUuid: String,
+ mesosTaskId: String,
+ mesosFailureMessage: Option[String] = None,
+ resource: GoTaskResource = GoTaskResource()) {
+ def updateState(state: TaskState): GoTask = {
+ state match {
+ case TaskState.TASK_FAILED | TaskState.TASK_KILLED | TaskState.TASK_FINISHED => this.copy(state = GoTaskState.Stopped, mesosFailureMessage = Some(""))
+ case _ => this
+ }
+ }
+}
+
+object GoTask {
+ def generateTaskId(): String = {
+ "Mesos-gocd-task-" + System.currentTimeMillis()
+ }
+
+ def generateAgentUUID(): String = {
+ UUID.randomUUID().toString
+ }
+
+ def apply(dockerImage: String) = {
+ new GoTask(dockerImage, "", GoTaskState.Pending, generateAgentUUID(), generateTaskId(), None)
+ }
+}
+
+private[mesos] case class TaskQueue(queue: Map[String, GoTask] = Map.empty[String, GoTask]) {
+
+ def getRunningTasks = {
+ queue.values.filter { task: GoTask =>
+ task.state match {
+ case GoTaskState.Running => true
+ case _ => false
+ }
+ }
+ }
+
+ def getPendingTasks = {
+ queue.values.filter { task: GoTask =>
+ task.state match {
+ case GoTaskState.Pending => true
+ case GoTaskState.Scheduled => true
+ case _ => false
+ }
+ }
+ }
+
+
+ def add(task: GoTask): TaskQueue = {
+ this.copy(queue = queue + (task.mesosTaskId -> task))
+ }
+
+ def findNext: Option[GoTask] = {
+ queue.values.find(task => {
+ task.state match {
+ case GoTaskState.Pending => true
+ case _ => false
+ }
+ })
+ }
+
+ def updateState(id: String, state: TaskState): TaskQueue = {
+ this.queue.get(id).map(task => {
+ this.copy(queue = queue + (id -> task.updateState(state)))
+ }).getOrElse(this)
+ }
+
+ def prune: TaskQueue = {
+ val updatedQueue = this.queue.filterNot(pair => {
+ val (id, task) = pair
+ task.state match {
+ case GoTaskState.Stopped => true
+ case _ => false
+ }
+ })
+ this.copy(queue = updatedQueue)
+ }
+}
object TaskQueue {
- val queue : ConcurrentLinkedQueue[GoTask] = new ConcurrentLinkedQueue[GoTask]()
- def enqueue(task : GoTask) {
- queue.offer(task)
+ var queue = TaskQueue()
+
+
+ def getAllJobIds = queue.queue.keys.toList
+
+ def getRunningTasks = queue.getRunningTasks
+
+ def getPendingTasks = queue.getPendingTasks
+
+
+ def add(task : GoTask) {
+ synchronized {
+ queue = queue.add(task)
+ }
+ }
+
+ def findNext = {
+ queue.findNext
}
- def dequeue : GoTask = {
- queue.poll()
+ def updateState(id: String, state: TaskState) = {
+ synchronized {
+ queue = queue.updateState(id, state)
+ }
}
def reset = {
- queue.clear()
+ queue.prune
}
}
diff --git a/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala b/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala
new file mode 100644
index 0000000..86eae5e
--- /dev/null
+++ b/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala
@@ -0,0 +1,7 @@
+package com.indix.mesos.common
+
+import java.util.logging.Logger
+
+trait GocdMesosLogger {
+ val logger = Logger.getLogger(this.getClass.getName)
+}
diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf
index ff326bb..d31e05e 100644
--- a/src/test/resources/application.conf
+++ b/src/test/resources/application.conf
@@ -12,6 +12,8 @@ gocd-mesos {
}
go-agent {
+ min-agents: 2
+ max-agents: 12
# This key should be already shared with the go server and configured in its config.xml as autoRegisterKey
auto-register-key: "6a92073d04fc4eff99137d7f8c7e794d"
# Docker image containing the go agent
diff --git a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala
index 9890bba..0465323 100644
--- a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala
+++ b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala
@@ -3,6 +3,7 @@ package com.indix.mesos
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.mockito.Mockito._
+import play.api.libs.json.Json
class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
@@ -17,47 +18,270 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
TaskQueue.reset
}
- "GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in {
+ "GoCDPoller#goIdleAgents" should "fetch idle agents count given a valid json" in {
+ val responseJson = Json.parse(
+ """
+ |
+ |{
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | }
+ | },
+ | "_embedded": {
+ | "agents": [
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Idle",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | }
+ | ]
+ | }
+ |}
+ |
+ """.stripMargin)
// Given
- val pollerSpy = spy(poller);
- doReturn(1).when(pollerSpy).goTaskQueueSize()
+ val pollerSpy = spy(poller)
+ doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents")
- // When
- for(i <- 0 to 5) {
- pollerSpy.pollAndAddTask()
- }
-
- // Then
- TaskQueue.queue.size() should be(1)
+ // When, then
+ pollerSpy.getIdleAgents.size should be(1)
}
-
- "GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in {
+ "GoCDPoller#getAllAgents" should "fetch all agents count given a valid json" in {
+ val responseJson = Json.parse(
+ """
+ |
+ |{
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | }
+ | },
+ | "_embedded": {
+ | "agents": [
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Idle",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | },
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Idle",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | },
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Idle",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | }
+ | ]
+ | }
+ |}
+ |
+ """.stripMargin)
// Given
- val pollerSpy = spy(poller);
- doReturn(1).when(pollerSpy).goTaskQueueSize()
-
- // When
- for(i <- 0 to 20) {
- pollerSpy.pollAndAddTask()
- }
+ val pollerSpy = spy(poller)
+ doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents")
- // Then
- TaskQueue.queue.size() should be(3)
+ // When, then
+ pollerSpy.getAllAgents.size should be(3)
}
- "GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in {
+ "GoCDPoller#getBuildingAgents" should "fetch building agents count given a valid json" in {
+ val responseJson = Json.parse(
+ """
+ |
+ |{
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | }
+ | },
+ | "_embedded": {
+ | "agents": [
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Idle",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | },
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Building",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | },
+ | {
+ | "_links": {
+ | "self": {
+ | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
+ | },
+ | "doc": {
+ | "href": "http://api.go.cd/#agents"
+ | },
+ | "find": {
+ | "href": "https://ci.example.com/go/api/agents/:uuid"
+ | }
+ | },
+ | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
+ | "hostname": "agent01.example.com",
+ | "ip_address": "10.12.20.47",
+ | "enabled": true,
+ | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
+ | "status": "Idle",
+ | "operating_system": "Mac OS X",
+ | "free_space": 84983328768,
+ | "resources": ["java", "linux", "firefox"],
+ | "environments": ["perf", "UAT"]
+ | }
+ | ]
+ | }
+ |}
+ |
+ """.stripMargin)
// Given
- val pollerSpy = spy(poller);
- doReturn(1).when(pollerSpy).goTaskQueueSize()
+ val pollerSpy = spy(poller)
+ doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents")
- // When
- for(i <- 0 to 5) {
- pollerSpy.pollAndAddTask()
- }
+ // When, then
+ pollerSpy.getBuildingAgents.size should be(1)
+ }
+
+ "GoCDPoller#getPendingJobsCount" should "fetch pending jobs count given a valid json" in {
+ val responseJson =
+ """
+ |
+ |
+ |
+ | mypipeline/5/defaultStage/1/job1
+ |
+ |
+ |
+ | mypipeline/5/defaultStage/1/job2
+ |
+ |
+ """.stripMargin
+ // Given
+ val pollerSpy = spy(poller)
+ doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml")
- // Then
- TaskQueue.queue.size() should be(1)
- TaskQueue.dequeue.dockerImage should be ("travix/gocd-agent:latest")
+ // When, then
+ pollerSpy.getPendingJobsCount should be(2)
}
}
diff --git a/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala b/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala
new file mode 100644
index 0000000..10477d2
--- /dev/null
+++ b/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala
@@ -0,0 +1,63 @@
+package com.indix.mesos
+
+import com.typesafe.config.ConfigFactory
+import org.apache.mesos.Protos._
+import org.apache.mesos.MesosSchedulerDriver
+import org.apache.mesos.Protos.FrameworkID
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, FlatSpec}
+import scala.concurrent.duration._
+
+
+class GoCDScalarSpec extends FlatSpec with Matchers {
+ val conf = new FrameworkConfig(ConfigFactory.load())
+ val scheduler = new GoCDScheduler(conf)
+ val poller = new GOCDPoller(conf)
+ val frameworkInfo = FrameworkInfo.newBuilder()
+ .setId(FrameworkID.newBuilder().setValue("").build())
+ .setName("GOCD-Mesos")
+ .setUser("")
+ .setRole("*")
+ .setHostname("pattigai")
+ .setCheckpoint(true)
+ .setFailoverTimeout(60.seconds.toMillis)
+ .build()
+ val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, "")
+
+ val scalar = new GOCDScalar(conf, poller, driver)
+
+ "GoCDScalar#computeScaledown" should "return correct number to scale down" in {
+ scalar.computeScaledown(9, 6, 12, 2) should be (3)
+ scalar.computeScaledown(14, 13, 12, 2) should be (2)
+ scalar.computeScaledown(4, 0, 12, 2) should be (2)
+ scalar.computeScaledown(12, 0, 12, 2) should be (10)
+ scalar.computeScaledown(12, 1, 12, 2) should be (10)
+ }
+
+ "GoCDScalar#computeScaleUp" should "return correct number to scale up" in {
+ scalar.computeScaleup(6, 9, 12, 2) should be (3)
+ scalar.computeScaleup(13, 14, 12, 2) should be (0)
+ scalar.computeScaleup(0, 4, 12, 2) should be (4)
+ scalar.computeScaleup(0, 14, 12, 2) should be (12)
+ scalar.computeScaleup(1, 12, 12, 2) should be (11)
+ }
+
+ "GoCDScalar#getSupply" should "return the supply metric" in {
+ val pollerSpy = spy(poller)
+ val scalar = new GOCDScalar(conf, pollerSpy, driver)
+ val scalarSpy = spy(scalar)
+ doReturn(Iterable(GoTask("", "", GoTaskState.Scheduled, "idle3", ""), GoTask("", "", GoTaskState.Pending, "idle4", ""))).when(scalarSpy).pendingTasks
+ doReturn(Iterable(GoTask("", "", GoTaskState.Running, "idle1", ""), GoTask("", "", GoTaskState.Running, "active1", ""))).when(scalarSpy).runningTasks
+ doReturn(List(GoAgent("idle1", "Idle"))).when(pollerSpy).getIdleAgents
+ scalarSpy.getSupply should be (3)
+ }
+
+ "GoCDScalar#getDemand" should "return the demand metric" in {
+ val pollerSpy = spy(poller)
+ val scalar = new GOCDScalar(conf, pollerSpy, driver)
+ val scalarSpy = spy(scalar)
+ doReturn(Iterable(GoAgent("active1", "Building"))).when(pollerSpy).getBuildingAgents
+ doReturn(4).when(pollerSpy).getPendingJobsCount
+ scalarSpy.getDemand should be (5)
+ }
+}
diff --git a/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala b/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala
index 4c7ed0a..e11da57 100644
--- a/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala
+++ b/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala
@@ -51,11 +51,11 @@ class GoCDSchedulerSpec extends FlatSpec with Matchers {
}
"GoCDScheduler#deployGoAgentTask" should "refuse offer if there are not enough resources" in {
- scheduler.deployGoAgentTask(GoTask("", "docker-image", ""), inadequateOffer) should be(None)
+ scheduler.deployGoAgentTask(GoTask("docker-image"), inadequateOffer) should be(None)
}
"GOCDScheduler#deployGoAgentTask" should "accept offer if there are enough resources" in {
- val taskOpt = scheduler.deployGoAgentTask(GoTask("", "docker-image",""), generoursResourceOffer)
+ val taskOpt = scheduler.deployGoAgentTask(GoTask("docker-image"), generoursResourceOffer)
taskOpt.isDefined should be(true)
taskOpt.get.getExecutor.getContainer.getDocker.getImage should be("docker-image")
}