From b4a0ecfef3323c8601c928d9095bb93a42f16786 Mon Sep 17 00:00:00 2001 From: Azhagu Selvan Date: Sat, 20 Feb 2016 18:38:29 +0530 Subject: [PATCH 1/5] Improved the agent demand calculation logic. A new agent is launched only when all of the above are true: 1. The scheduled pending jobs should be either positive or constantly greater than 1, for last 5 times. 2. The idle agents in the server should be 0. --- pom.xml | 7 +- .../com/indix/mesos/FrameworkConfig.scala | 2 + .../scala/com/indix/mesos/GOCDPoller.scala | 69 ++++++++-- .../com/indix/mesos/GoCDPollerSpec.scala | 123 +++++++++++++++++- 4 files changed, 187 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 485dd80..d5f0f1b 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,11 @@ scala-xml 2.11.0-M4 + + com.typesafe.play + play-json_2.11 + 2.4.6 + org.scalatest scalatest_2.11 @@ -102,8 +107,6 @@ 1.9.5 - - diff --git a/src/main/scala/com/indix/mesos/FrameworkConfig.scala b/src/main/scala/com/indix/mesos/FrameworkConfig.scala index 89d35d5..8f1f940 100644 --- a/src/main/scala/com/indix/mesos/FrameworkConfig.scala +++ b/src/main/scala/com/indix/mesos/FrameworkConfig.scala @@ -17,5 +17,7 @@ class FrameworkConfig(config: Config) { val goUserName = rootConfig.getString("go-server.user-name") val goPassword = rootConfig.getString("go-server.password") + val goAuthEnabled = if(rootConfig.hasPath("go-server.auth-enabled")) rootConfig.getBoolean("go-server.auth-enabled") else false + val goAgentKey = if(rootConfig.hasPath("go-agent.auto-register-key")) Some(rootConfig.getString("go-agent.auto-register-key")) else None } diff --git a/src/main/scala/com/indix/mesos/GOCDPoller.scala b/src/main/scala/com/indix/mesos/GOCDPoller.scala index e56776d..23b289f 100644 --- a/src/main/scala/com/indix/mesos/GOCDPoller.scala +++ b/src/main/scala/com/indix/mesos/GOCDPoller.scala @@ -6,6 +6,7 @@ import com.google.common.io.BaseEncoding import scala.collection.mutable import scalaj.http._ +import play.api.libs.json._ case class GOCDPoller(conf: FrameworkConfig) { @@ -13,11 +14,34 @@ case class GOCDPoller(conf: FrameworkConfig) { val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int] + + private[mesos] def request(url: String) = { + val request = if(conf.goAuthEnabled) { + Http(url) + .header("Authorization", s"Basic $authToken") + } else { + Http(url) + }.header("Accept", "application/vnd.go.cd.v1+json") + request.asString.body + } + + private[mesos] def jsonRequest(url: String) = { + Json.parse(request(url)) + } + + private[mesos] def xmlRequest(url: String) = { + scala.xml.XML.loadString(request(url)) + } + + private def buildUrl() = { + s"http://${conf.goServerHost}:${conf.goServerPort}" + } + + def goTaskQueueSize(): Int = { println("Polling GO Server for scheduled jobs") try { - val response: HttpResponse[String] = Http(s"http://${conf.goServerHost}:${conf.goServerPort}" + "/go/api/jobs/scheduled.xml").asString //.header("Authorization", s"Basic ${authToken}").asString - val responseXml = scala.xml.XML.loadString(response.body) + val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml") (responseXml \\ "scheduledJobs" \\ "job").size } catch { case e: SocketTimeoutException => { @@ -28,19 +52,48 @@ case class GOCDPoller(conf: FrameworkConfig) { } def goIdleAgentsCount() = { + println("Polling Go server for idle agents") + try { + val responseJson = jsonRequest(buildUrl() + "/go/api/agents") + (responseJson \ "_embedded" \ "agents").toOption.map(jsValue => { + jsValue.as[JsArray].value.map(agent => (agent \ "status").get.as[String]).count(_.equalsIgnoreCase("idle")) + }).getOrElse(0) + } catch { + case e: SocketTimeoutException => { + println("GOCD Server timed out!!") + 1 + } + } + } + + // Return true when the demand remains, same or increased during the last five attempts. + private[mesos] def isDemandPositive(): Boolean = { + val demandTrend = responseHistory.foldRight(0)((h1, h2) => { + if(h2 >= h1) { + 1 + } else { + -1 + } + }) + demandTrend == 1 } def pollAndAddTask() = { val scheduled : Int = goTaskQueueSize() println(s"Go server has ${scheduled.toString} pending jobs to be scheduled") - if(scheduled > 0) + if(scheduled > 0) { responseHistory += scheduled - + } if(responseHistory.size > 5) { - println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.") - TaskQueue.enqueue(GoTask("", conf.goAgentDocker, "")) - responseHistory.clear() + println(s"More than 5 jobs pending in the GOCD. checking if any agents are idle.") + if(goIdleAgentsCount() == 0 && isDemandPositive()) { + println("No idle agents found. Launching a new agent launch now.") + TaskQueue.enqueue(GoTask("", conf.goAgentDocker, "")) + responseHistory.clear() + } else { + println("Idle agents found. Not launching any new Go Agent now.") + responseHistory.drop(0) + } } } - } diff --git a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala index 9890bba..6f1f693 100644 --- a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala @@ -3,6 +3,7 @@ package com.indix.mesos import com.typesafe.config.ConfigFactory import org.scalatest._ import org.mockito.Mockito._ +import play.api.libs.json.Json class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { @@ -19,8 +20,10 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { "GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in { // Given - val pollerSpy = spy(poller); + val pollerSpy = spy(poller) doReturn(1).when(pollerSpy).goTaskQueueSize() + doReturn(0).when(pollerSpy).goIdleAgentsCount() + doReturn(true).when(pollerSpy).isDemandPositive() // When for(i <- 0 to 5) { @@ -31,11 +34,47 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { TaskQueue.queue.size() should be(1) } + "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if Idle agents count is > 0" in { + // Given + val pollerSpy = spy(poller) + doReturn(1).when(pollerSpy).goTaskQueueSize() + doReturn(2).when(pollerSpy).goIdleAgentsCount() + doReturn(true).when(pollerSpy).isDemandPositive() + + // When + for(i <- 0 to 5) { + pollerSpy.pollAndAddTask() + } + + // Then + TaskQueue.queue.size() should be(0) + } + + "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if isDemandPositive returns false" in { + // Given + val pollerSpy = spy(poller) + doReturn(1).when(pollerSpy).goTaskQueueSize() + doReturn(0).when(pollerSpy).goIdleAgentsCount() + doReturn(false).when(pollerSpy).isDemandPositive() + + // When + for(i <- 0 to 5) { + pollerSpy.pollAndAddTask() + } + + // Then + TaskQueue.queue.size() should be(0) + } + + "GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in { // Given - val pollerSpy = spy(poller); + val pollerSpy = spy(poller) doReturn(1).when(pollerSpy).goTaskQueueSize() + doReturn(0).when(pollerSpy).goIdleAgentsCount() + doReturn(true).when(pollerSpy).isDemandPositive() + // When for(i <- 0 to 20) { @@ -43,13 +82,16 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { } // Then - TaskQueue.queue.size() should be(3) + TaskQueue.queue.size() should be(4) } "GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in { // Given - val pollerSpy = spy(poller); + val pollerSpy = spy(poller) doReturn(1).when(pollerSpy).goTaskQueueSize() + doReturn(0).when(pollerSpy).goIdleAgentsCount() + doReturn(true).when(pollerSpy).isDemandPositive() + // When for(i <- 0 to 5) { @@ -60,4 +102,77 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { TaskQueue.queue.size() should be(1) TaskQueue.dequeue.dockerImage should be ("travix/gocd-agent:latest") } + + "GoCDPoller#goIdleAgentsCount" should "fetch idle agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") + + // When, then + pollerSpy.goIdleAgentsCount() should be(1) + } + + "GoCDPoller#goTaskQueueSize" should "fetch pending jobs count given a valid json" in { + val responseJson = + """ + | + | + | + | mypipeline/5/defaultStage/1/job1 + | + | + | + | mypipeline/5/defaultStage/1/job2 + | + | + """.stripMargin + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml") + + // When, then + pollerSpy.goTaskQueueSize() should be(2) + } } From eebc3275daf1f0b65b4ab7ef3c0892745296a230 Mon Sep 17 00:00:00 2001 From: Azhagu Selvan Date: Sat, 20 Feb 2016 18:49:01 +0530 Subject: [PATCH 2/5] Separate config to enable/disable authentication with the GoCD API --- README.md | 2 ++ src/main/resources/application.conf | 2 ++ src/main/scala/com/indix/mesos/FrameworkConfig.scala | 4 ++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9610e6b..cf35156 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ gocd-mesos { port: "8080" user-name: "random-guy" password: "random-insecure-string" + # Enable if the go server mandatorily requires user name and password + auth-enabled: false } go-agent { diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index ff326bb..f41a640 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -9,6 +9,8 @@ gocd-mesos { port: "8080" user-name: "azhagu" password: "random-insecure-string" + # Enable if the go server mandatorily requires user name and password + auth-enabled: false } go-agent { diff --git a/src/main/scala/com/indix/mesos/FrameworkConfig.scala b/src/main/scala/com/indix/mesos/FrameworkConfig.scala index 8f1f940..d38fcc5 100644 --- a/src/main/scala/com/indix/mesos/FrameworkConfig.scala +++ b/src/main/scala/com/indix/mesos/FrameworkConfig.scala @@ -14,8 +14,8 @@ class FrameworkConfig(config: Config) { val goServerPort = rootConfig.getString("go-server.port") - val goUserName = rootConfig.getString("go-server.user-name") - val goPassword = rootConfig.getString("go-server.password") + lazy val goUserName = rootConfig.getString("go-server.user-name") + lazy val goPassword = rootConfig.getString("go-server.password") val goAuthEnabled = if(rootConfig.hasPath("go-server.auth-enabled")) rootConfig.getBoolean("go-server.auth-enabled") else false From 3f39e5746f0e2c099e035f39b0728b944fc4025d Mon Sep 17 00:00:00 2001 From: tamizhgeek Date: Thu, 17 Mar 2016 18:22:38 +0530 Subject: [PATCH 3/5] Slightly better demand/supply management --- pom.xml | 9 +- src/main/resources/application.conf | 4 +- .../com/indix/mesos/FrameworkConfig.scala | 4 + .../scala/com/indix/mesos/GOCDPoller.scala | 84 ++-- .../scala/com/indix/mesos/GOCDScalar.scala | 93 +++++ .../scala/com/indix/mesos/GoCDScheduler.scala | 65 ++-- .../scala/com/indix/mesos/TaskQueue.scala | 125 +++++- src/test/resources/application.conf | 2 + .../com/indix/mesos/GoCDPollerSpec.scala | 367 +++++++++--------- .../com/indix/mesos/GoCDSchedulerSpec.scala | 4 +- .../scala/com/indix/mesos/GoScalarSpec.scala | 35 ++ 11 files changed, 507 insertions(+), 285 deletions(-) create mode 100644 src/main/scala/com/indix/mesos/GOCDScalar.scala create mode 100644 src/test/scala/com/indix/mesos/GoScalarSpec.scala diff --git a/pom.xml b/pom.xml index d5f0f1b..1efa135 100644 --- a/pom.xml +++ b/pom.xml @@ -35,10 +35,6 @@ repo.codahale.com http://repo.codahale.com/ - - morphia.googlecode.com - http://morphia.googlecode.com/svn/mavenrepo/ - maven.obiba.org/ http://maven.obiba.org/maven2/ @@ -106,6 +102,11 @@ mockito-all 1.9.5 + + com.typesafe.scala-logging + scala-logging_2.11 + 3.1.0 + diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index f41a640..3915b3f 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -5,8 +5,8 @@ gocd-mesos { # GO server config to poll and connect the agent go-server { - host: "localhost" - port: "8080" + host: "192.168.33.10" + port: "8153" user-name: "azhagu" password: "random-insecure-string" # Enable if the go server mandatorily requires user name and password diff --git a/src/main/scala/com/indix/mesos/FrameworkConfig.scala b/src/main/scala/com/indix/mesos/FrameworkConfig.scala index d38fcc5..6216241 100644 --- a/src/main/scala/com/indix/mesos/FrameworkConfig.scala +++ b/src/main/scala/com/indix/mesos/FrameworkConfig.scala @@ -14,6 +14,10 @@ class FrameworkConfig(config: Config) { val goServerPort = rootConfig.getString("go-server.port") + val goMinAgents = rootConfig.getInt("go-agent.min-agents") + + val goMaxAgents = rootConfig.getInt("go-agent.max-agents") + lazy val goUserName = rootConfig.getString("go-server.user-name") lazy val goPassword = rootConfig.getString("go-server.password") diff --git a/src/main/scala/com/indix/mesos/GOCDPoller.scala b/src/main/scala/com/indix/mesos/GOCDPoller.scala index 23b289f..def3d79 100644 --- a/src/main/scala/com/indix/mesos/GOCDPoller.scala +++ b/src/main/scala/com/indix/mesos/GOCDPoller.scala @@ -1,15 +1,15 @@ package com.indix.mesos -import java.net.{UnknownHostException, SocketTimeoutException} - import com.google.common.io.BaseEncoding - import scala.collection.mutable +import scala.util.{Failure, Success, Try} import scalaj.http._ import play.api.libs.json._ -case class GOCDPoller(conf: FrameworkConfig) { +case class GoAgent(id: String, status: String) + +case class GOCDPoller(conf: FrameworkConfig) { val authToken = BaseEncoding.base64().encode(s"${conf.goUserName}:${conf.goPassword}".getBytes("UTF-8")); val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int] @@ -38,62 +38,42 @@ case class GOCDPoller(conf: FrameworkConfig) { } - def goTaskQueueSize(): Int = { + def getPendingJobsCount: Int = { println("Polling GO Server for scheduled jobs") - try { - val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml") - (responseXml \\ "scheduledJobs" \\ "job").size - } catch { - case e: SocketTimeoutException => { - println("GOCD Server timed out!!") - 0 - } - } + withRetry(3){ + val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml") + (responseXml \\ "scheduledJobs" \\ "job").size + }.getOrElse(0) } - def goIdleAgentsCount() = { - println("Polling Go server for idle agents") - try { + def getGoAgents: List[GoAgent] = { + println("Polling Go server for agents") + withRetry(3) { val responseJson = jsonRequest(buildUrl() + "/go/api/agents") (responseJson \ "_embedded" \ "agents").toOption.map(jsValue => { - jsValue.as[JsArray].value.map(agent => (agent \ "status").get.as[String]).count(_.equalsIgnoreCase("idle")) - }).getOrElse(0) - } catch { - case e: SocketTimeoutException => { - println("GOCD Server timed out!!") - 1 - } - } + jsValue.as[JsArray] + .value + .map(agent => GoAgent((agent \ "uuid").get.as[String], (agent \ "status").get.as[String])) + .toList + }).getOrElse(List.empty) + }.getOrElse(List.empty) } - // Return true when the demand remains, same or increased during the last five attempts. - private[mesos] def isDemandPositive(): Boolean = { - val demandTrend = responseHistory.foldRight(0)((h1, h2) => { - if(h2 >= h1) { - 1 - } else { - -1 - } - }) - demandTrend == 1 + def goIdleAgents: List[GoAgent] = { + getGoAgents.filter(_.status.equalsIgnoreCase("idle")) } - def pollAndAddTask() = { - val scheduled : Int = goTaskQueueSize() - println(s"Go server has ${scheduled.toString} pending jobs to be scheduled") - if(scheduled > 0) { - responseHistory += scheduled - } - if(responseHistory.size > 5) { - println(s"More than 5 jobs pending in the GOCD. checking if any agents are idle.") - if(goIdleAgentsCount() == 0 && isDemandPositive()) { - println("No idle agents found. Launching a new agent launch now.") - TaskQueue.enqueue(GoTask("", conf.goAgentDocker, "")) - responseHistory.clear() - } else { - println("Idle agents found. Not launching any new Go Agent now.") - responseHistory.drop(0) - } - } + def getBuildingAgents = { + getGoAgents.filter(_.status.equalsIgnoreCase("building")) + } + + + + private def withRetry[T](n: Int)(fn: => T): Try[T] = { + Try(fn) match { + case res: Success[T] => res + case _ if n < 1 => withRetry(n - 1)(fn) + case Failure(ex) => throw ex + } } } diff --git a/src/main/scala/com/indix/mesos/GOCDScalar.scala b/src/main/scala/com/indix/mesos/GOCDScalar.scala new file mode 100644 index 0000000..58e521f --- /dev/null +++ b/src/main/scala/com/indix/mesos/GOCDScalar.scala @@ -0,0 +1,93 @@ +package com.indix.mesos + +import org.apache.mesos.MesosSchedulerDriver +import org.apache.mesos.Protos.{TaskState, TaskID, TaskStatus} +import scala.collection.JavaConverters._ + + +case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) { + + + def runningTaskIds = TaskQueue.getAllJobIds + + + def reconcileTasks() = { + val runningTasks = runningTaskIds.map(id => TaskStatus + .newBuilder() + .setTaskId(TaskID + .newBuilder() + .setValue(id) + .build()) + .setState(TaskState.TASK_RUNNING) + .build()) + driver.reconcileTasks(runningTasks.asJavaCollection) + } + + def getTotalAgents = { + poller.getGoAgents.size + } + + def getSupply = { + TaskQueue + .getRunningJobs + .toList + .map(_.goAgentUuid) + .union(poller.goIdleAgents.map(_.id)).size + + TaskQueue.getPendingJobs.size + } + + def getDemand = { + poller.getPendingJobsCount + poller.getBuildingAgents.size + } + + def scaleDown(agentCount: Int) = { + val idleAgents = poller.goIdleAgents + idleAgents.take(agentCount).foreach(agent => { + TaskQueue.getRunningJobs.toList.find(_.goAgentUuid == agent.id).foreach { task => + driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build()) + } + }) + } + + def scaleUp(agentCount: Int) = { + for(_ <- 0 to agentCount) { + TaskQueue.add(GoTask(conf.goAgentDocker)) + } + } + + def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { + assert(demand > supply) + val needed = demand - supply + if(supply + needed > goMaxAgents) { + goMaxAgents - supply + } else { + needed + } + } + + + def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { + assert(supply > demand) + val notNeeded = supply - demand + if(supply - notNeeded < goMinAgents) { + supply - goMinAgents + } else if(supply - notNeeded > goMaxAgents) { + supply - goMaxAgents + } else { + notNeeded + } + } + + + def scale = { + val supply = getSupply + val demand = getDemand + if(demand > supply) { + computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents) + } else if (demand < supply) { + computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents) + } else { + // do Nothing + } + } +} diff --git a/src/main/scala/com/indix/mesos/GoCDScheduler.scala b/src/main/scala/com/indix/mesos/GoCDScheduler.scala index f3e5d37..8447489 100644 --- a/src/main/scala/com/indix/mesos/GoCDScheduler.scala +++ b/src/main/scala/com/indix/mesos/GoCDScheduler.scala @@ -29,10 +29,13 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { println(s"Received Disconnected message $driver") } - override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]) {} + override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = { + + } override def statusUpdate(driver: SchedulerDriver, status: TaskStatus) { println(s"received status update $status") + TaskQueue.updateState(status.getTaskId.getValue, status.getState) } override def offerRescinded(driver: SchedulerDriver, offerId: OfferID) {} @@ -47,22 +50,19 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { //for every available offer run tasks for (offer <- offers.asScala) { println(s"offer $offer") - val nextTask = TaskQueue.dequeue - if(nextTask != null) { - val task = deployGoAgentTask(nextTask, offer) - task match { - case Some(tt) => driver.launchTasks(List(offer.getId).asJava, List(tt).asJava) + TaskQueue.findNext.foreach { goTask => + val mesosTask = deployGoAgentTask(goTask, offer) + mesosTask match { + case Some(tt) => { + driver.launchTasks(List(offer.getId).asJava, List(tt).asJava) + TaskQueue.add(goTask.copy(state = GoTaskState.Scheduled)) + } case None => { - TaskQueue.enqueue(nextTask) println(s"declining unused offer because offer is not enough") driver.declineOffer(offer.getId) } } } - else { - println(s"declining unused offer because there is no task") - driver.declineOffer(offer.getId) - } } } @@ -94,7 +94,13 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { .setContainer(dockerContainerInfo(goTask).build()) .setCommand(Protos.CommandInfo.newBuilder() .setEnvironment(Protos.Environment.newBuilder() - .addAllVariables(envForGoCDTask.getVariablesList)) + .addAllVariables(envForGoCDTask + .addVariables(Variable + .newBuilder() + .setName("GO_AGENT_UUID") + .setValue(goTask.goAgentUuid) + .build()) + .getVariablesList)) .setShell(false)) } @@ -104,7 +110,7 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { val available = Resources(offer) if(available.canSatisfy(needed)) { val currentTimeStamp = System.currentTimeMillis() - val taskId = "gocd-agent-task-" + currentTimeStamp + val taskId = goTask.mesosTaskId val task = TaskInfo.newBuilder .setExecutor(executorInfo(goTask, currentTimeStamp).build()) .setName(taskId) @@ -126,7 +132,6 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { override def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { println(s"registered with mesos master. Framework id is ${frameworkId.getValue}") } - } @@ -139,16 +144,11 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { println(s"The Framework id is $id") + val goTaskQueue = TaskQueue() + val poller = GOCDPoller(config) - val timeInterval = 2 * 60 * 1000 - val runnable = new Runnable { - override def run(): Unit = { - while(true) { - poller.pollAndAddTask - Thread.sleep(timeInterval) - } - } - } + // val timeInterval = 2 * 60 * 1000 + val frameworkInfo = FrameworkInfo.newBuilder() .setId(FrameworkID.newBuilder().setValue(id).build()) @@ -160,21 +160,11 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { .setFailoverTimeout(60.seconds.toMillis) .build() - val thread = new Thread(runnable) - thread.start() val scheduler = new GoCDScheduler(config) val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, config.mesosMaster) println("Starting the driver") - - val status = if(driver.run() == Status.DRIVER_STOPPED) 0 else 1 - - //driver.run } - - // println(driver.join()) - - println(status) // Ensure that the driver process terminates. - driver.stop(); + driver.stop() // For this test to pass reliably on some platforms, this sleep is // required to ensure that the SchedulerDriver teardown is complete // before the JVM starts running native object destructors after @@ -183,8 +173,5 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { // TODO(greg): Ideally, we would inspect the status of the driver // and its associated tasks via the Java API and wait until their // teardown is complete to exit. - Thread.sleep(500); - - System.exit(status); - println("===================================") -} \ No newline at end of file + Thread.sleep(500) + } \ No newline at end of file diff --git a/src/main/scala/com/indix/mesos/TaskQueue.scala b/src/main/scala/com/indix/mesos/TaskQueue.scala index 5fa0985..764fa8e 100644 --- a/src/main/scala/com/indix/mesos/TaskQueue.scala +++ b/src/main/scala/com/indix/mesos/TaskQueue.scala @@ -1,24 +1,133 @@ package com.indix.mesos -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.UUID +import org.apache.mesos.Protos.TaskState + + +object GoTaskState { + abstract class GoTaskState + object Pending extends GoTaskState + object Scheduled extends GoTaskState + object Running extends GoTaskState + object Stopped extends GoTaskState +} case class GoTaskResource(cpu: Double = 1, memory: Double = 256, disk: Double = 1*1024) -case class GoTask(cmdString: String, dockerImage: String, uri: String, resource: GoTaskResource = GoTaskResource()) {} + + +case class GoTask(dockerImage: String, + uri: String, + state: GoTaskState.GoTaskState, + goAgentUuid: String, + mesosTaskId: String, + mesosFailureMessage: Option[String] = None, + resource: GoTaskResource = GoTaskResource()) { + def updateState(state: TaskState): GoTask = { + state match { + case TaskState.TASK_FAILED | TaskState.TASK_KILLED | TaskState.TASK_FINISHED => this.copy(state = GoTaskState.Stopped, mesosFailureMessage = Some("")) + case _ => this + } + } +} + +object GoTask { + def generateTaskId(): String = { + "Mesos-gocd-task-" + System.currentTimeMillis() + } + + def generateAgentUUID(): String = { + UUID.randomUUID().toString + } + + def apply(dockerImage: String) = { + new GoTask(dockerImage, "", GoTaskState.Pending, generateAgentUUID(), generateTaskId(), None) + } +} + +case class TaskQueue(queue: Map[String, GoTask] = Map.empty[String, GoTask]) { + + def getRunningJobs = { + queue.values.filter { task: GoTask => + task.state match { + case GoTaskState.Running => true + case _ => false + } + } + } + + def getPendingJobs = { + queue.values.filter { task: GoTask => + task.state match { + case GoTaskState.Pending => true + case GoTaskState.Scheduled=> true + case _ => false + } + } + } + + + def add(task: GoTask): TaskQueue = { + this.copy(queue = queue + (task.mesosTaskId -> task)) + } + + def findNext: Option[GoTask] = { + queue.values.find(task => { + task.state match { + case GoTaskState.Pending => true + case _ => false + } + }) + } + + def updateState(id: String, state: TaskState): TaskQueue = { + this.queue.get(id).map(task => { + this.copy(queue = queue + (id -> task.updateState(state))) + }).getOrElse(this) + } + + def prune: TaskQueue = { + val updatedQueue = this.queue.filterNot(pair => { + val (id, task) = pair + task.state match { + case GoTaskState.Stopped => true + case _ => false + } + }) + this.copy(queue = updatedQueue) + } +} object TaskQueue { - val queue : ConcurrentLinkedQueue[GoTask] = new ConcurrentLinkedQueue[GoTask]() - def enqueue(task : GoTask) { - queue.offer(task) + var queue = TaskQueue() + + + def getAllJobIds = queue.queue.keys.toList + + def getRunningJobs = queue.getRunningJobs + + def getPendingJobs = queue.getPendingJobs + + + def add(task : GoTask) { + synchronized { + queue = queue.add(task) + } + } + + def findNext = { + queue.findNext } - def dequeue : GoTask = { - queue.poll() + def updateState(id: String, state: TaskState) = { + synchronized { + queue = queue.updateState(id, state) + } } def reset = { - queue.clear() + queue.prune } } diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index ff326bb..d31e05e 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -12,6 +12,8 @@ gocd-mesos { } go-agent { + min-agents: 2 + max-agents: 12 # This key should be already shared with the go server and configured in its config.xml as autoRegisterKey auto-register-key: "6a92073d04fc4eff99137d7f8c7e794d" # Docker image containing the go agent diff --git a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala index 6f1f693..bbc19f2 100644 --- a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala @@ -1,178 +1,189 @@ -package com.indix.mesos - -import com.typesafe.config.ConfigFactory -import org.scalatest._ -import org.mockito.Mockito._ -import play.api.libs.json.Json - -class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - - val poller = new GOCDPoller(new FrameworkConfig(ConfigFactory.load())) - - override def beforeEach() = { - TaskQueue.reset - } - - override def afterEach() = { - TaskQueue.reset - } - - "GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in { - // Given - val pollerSpy = spy(poller) - doReturn(1).when(pollerSpy).goTaskQueueSize() - doReturn(0).when(pollerSpy).goIdleAgentsCount() - doReturn(true).when(pollerSpy).isDemandPositive() - - // When - for(i <- 0 to 5) { - pollerSpy.pollAndAddTask() - } - - // Then - TaskQueue.queue.size() should be(1) - } - - "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if Idle agents count is > 0" in { - // Given - val pollerSpy = spy(poller) - doReturn(1).when(pollerSpy).goTaskQueueSize() - doReturn(2).when(pollerSpy).goIdleAgentsCount() - doReturn(true).when(pollerSpy).isDemandPositive() - - // When - for(i <- 0 to 5) { - pollerSpy.pollAndAddTask() - } - - // Then - TaskQueue.queue.size() should be(0) - } - - "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if isDemandPositive returns false" in { - // Given - val pollerSpy = spy(poller) - doReturn(1).when(pollerSpy).goTaskQueueSize() - doReturn(0).when(pollerSpy).goIdleAgentsCount() - doReturn(false).when(pollerSpy).isDemandPositive() - - // When - for(i <- 0 to 5) { - pollerSpy.pollAndAddTask() - } - - // Then - TaskQueue.queue.size() should be(0) - } - - - - "GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in { - // Given - val pollerSpy = spy(poller) - doReturn(1).when(pollerSpy).goTaskQueueSize() - doReturn(0).when(pollerSpy).goIdleAgentsCount() - doReturn(true).when(pollerSpy).isDemandPositive() - - - // When - for(i <- 0 to 20) { - pollerSpy.pollAndAddTask() - } - - // Then - TaskQueue.queue.size() should be(4) - } - - "GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in { - // Given - val pollerSpy = spy(poller) - doReturn(1).when(pollerSpy).goTaskQueueSize() - doReturn(0).when(pollerSpy).goIdleAgentsCount() - doReturn(true).when(pollerSpy).isDemandPositive() - - - // When - for(i <- 0 to 5) { - pollerSpy.pollAndAddTask() - } - - // Then - TaskQueue.queue.size() should be(1) - TaskQueue.dequeue.dockerImage should be ("travix/gocd-agent:latest") - } - - "GoCDPoller#goIdleAgentsCount" should "fetch idle agents count given a valid json" in { - val responseJson = Json.parse( - """ - | - |{ - | "_links": { - | "self": { - | "href": "https://ci.example.com/go/api/agents" - | }, - | "doc": { - | "href": "http://api.go.cd/#agents" - | } - | }, - | "_embedded": { - | "agents": [ - | { - | "_links": { - | "self": { - | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" - | }, - | "doc": { - | "href": "http://api.go.cd/#agents" - | }, - | "find": { - | "href": "https://ci.example.com/go/api/agents/:uuid" - | } - | }, - | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", - | "hostname": "agent01.example.com", - | "ip_address": "10.12.20.47", - | "enabled": true, - | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", - | "status": "Idle", - | "operating_system": "Mac OS X", - | "free_space": 84983328768, - | "resources": ["java", "linux", "firefox"], - | "environments": ["perf", "UAT"] - | } - | ] - | } - |} - | - """.stripMargin) - // Given - val pollerSpy = spy(poller) - doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") - - // When, then - pollerSpy.goIdleAgentsCount() should be(1) - } - - "GoCDPoller#goTaskQueueSize" should "fetch pending jobs count given a valid json" in { - val responseJson = - """ - | - | - | - | mypipeline/5/defaultStage/1/job1 - | - | - | - | mypipeline/5/defaultStage/1/job2 - | - | - """.stripMargin - // Given - val pollerSpy = spy(poller) - doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml") - - // When, then - pollerSpy.goTaskQueueSize() should be(2) - } -} +//package com.indix.mesos +// +//import com.typesafe.config.ConfigFactory +//import org.scalatest._ +//import org.mockito.Mockito._ +//import play.api.libs.json.Json +// +//class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { +// +// +// val poller = new GOCDPoller(new FrameworkConfig(ConfigFactory.load())) +// +// override def beforeEach() = { +// TaskQueue.reset +// } +// +// override def afterEach() = { +// TaskQueue.reset +// } +// +// "GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in { +// // Given +// val pollerSpy = spy(poller) +// doReturn(1).when(pollerSpy).goTaskQueueSize() +// doReturn(0).when(pollerSpy).goIdleAgentsCount() +// doReturn(true).when(pollerSpy).isDemandPositive() +// +// +// // When +// for(i <- 0 to 5) { +// pollerSpy.pollAndAddTask() +// } +// +// // Then +// TaskQueue.queue.size() should be(1) +// } +// +// "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if Idle agents count is > 0" in { +// // Given +// val pollerSpy = spy(poller) +// doReturn(1).when(pollerSpy).getPendingJobs() +// doReturn(2).when(pollerSpy).goIdleAgentsCount() +// doReturn(true).when(pollerSpy).isDemandPositive() +// +// // When +// for(i <- 0 to 5) { +// pollerSpy.pollAndAddTask() +// } +// +// // Then +// TaskQueue.queue.size() should be(0) +// } +// +// "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if isDemandPositive returns false" in { +// // Given +// val pollerSpy = spy(poller) +// doReturn(1).when(pollerSpy).getPendingJobs() +// doReturn(0).when(pollerSpy).goIdleAgentsCount() +// doReturn(false).when(pollerSpy).isDemandPositive() +// +// // When +// for(i <- 0 to 5) { +// pollerSpy.pollAndAddTask() +// } +// +// // Then +// TaskQueue.queue.size() should be(0) +// } +// +// +// +// "GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in { +// // Given +//<<<<<<< Updated upstream +// val pollerSpy = spy(poller) +// doReturn(1).when(pollerSpy).goTaskQueueSize() +// doReturn(0).when(pollerSpy).goIdleAgentsCount() +// doReturn(true).when(pollerSpy).isDemandPositive() +// +//======= +// val pollerSpy = spy(poller); +// doReturn(1).when(pollerSpy).getPendingJobs() +//>>>>>>> Stashed changes +// +// // When +// for(i <- 0 to 20) { +// pollerSpy.pollAndAddTask() +// } +// +// // Then +// TaskQueue.queue.size() should be(4) +// } +// +// "GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in { +// // Given +//<<<<<<< Updated upstream +// val pollerSpy = spy(poller) +// doReturn(1).when(pollerSpy).goTaskQueueSize() +// doReturn(0).when(pollerSpy).goIdleAgentsCount() +// doReturn(true).when(pollerSpy).isDemandPositive() +// +//======= +// val pollerSpy = spy(poller); +// doReturn(1).when(pollerSpy).getPendingJobs() +//>>>>>>> Stashed changes +// +// // When +// for(i <- 0 to 5) { +// pollerSpy.pollAndAddTask() +// } +// +// // Then +// TaskQueue.queue.size() should be(1) +// TaskQueue.find.dockerImage should be ("travix/gocd-agent:latest") +// } +// +// "GoCDPoller#goIdleAgentsCount" should "fetch idle agents count given a valid json" in { +// val responseJson = Json.parse( +// """ +// | +// |{ +// | "_links": { +// | "self": { +// | "href": "https://ci.example.com/go/api/agents" +// | }, +// | "doc": { +// | "href": "http://api.go.cd/#agents" +// | } +// | }, +// | "_embedded": { +// | "agents": [ +// | { +// | "_links": { +// | "self": { +// | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" +// | }, +// | "doc": { +// | "href": "http://api.go.cd/#agents" +// | }, +// | "find": { +// | "href": "https://ci.example.com/go/api/agents/:uuid" +// | } +// | }, +// | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", +// | "hostname": "agent01.example.com", +// | "ip_address": "10.12.20.47", +// | "enabled": true, +// | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", +// | "status": "Idle", +// | "operating_system": "Mac OS X", +// | "free_space": 84983328768, +// | "resources": ["java", "linux", "firefox"], +// | "environments": ["perf", "UAT"] +// | } +// | ] +// | } +// |} +// | +// """.stripMargin) +// // Given +// val pollerSpy = spy(poller) +// doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") +// +// // When, then +// pollerSpy.goIdleAgentsCount() should be(1) +// } +// +// "GoCDPoller#goTaskQueueSize" should "fetch pending jobs count given a valid json" in { +// val responseJson = +// """ +// | +// | +// | +// | mypipeline/5/defaultStage/1/job1 +// | +// | +// | +// | mypipeline/5/defaultStage/1/job2 +// | +// | +// """.stripMargin +// // Given +// val pollerSpy = spy(poller) +// doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml") +// +// // When, then +// pollerSpy.getPendingJobs() should be(2) +// } +//} diff --git a/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala b/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala index 4c7ed0a..e11da57 100644 --- a/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala @@ -51,11 +51,11 @@ class GoCDSchedulerSpec extends FlatSpec with Matchers { } "GoCDScheduler#deployGoAgentTask" should "refuse offer if there are not enough resources" in { - scheduler.deployGoAgentTask(GoTask("", "docker-image", ""), inadequateOffer) should be(None) + scheduler.deployGoAgentTask(GoTask("docker-image"), inadequateOffer) should be(None) } "GOCDScheduler#deployGoAgentTask" should "accept offer if there are enough resources" in { - val taskOpt = scheduler.deployGoAgentTask(GoTask("", "docker-image",""), generoursResourceOffer) + val taskOpt = scheduler.deployGoAgentTask(GoTask("docker-image"), generoursResourceOffer) taskOpt.isDefined should be(true) taskOpt.get.getExecutor.getContainer.getDocker.getImage should be("docker-image") } diff --git a/src/test/scala/com/indix/mesos/GoScalarSpec.scala b/src/test/scala/com/indix/mesos/GoScalarSpec.scala new file mode 100644 index 0000000..6a3c7da --- /dev/null +++ b/src/test/scala/com/indix/mesos/GoScalarSpec.scala @@ -0,0 +1,35 @@ +package com.indix.mesos + +import com.typesafe.config.ConfigFactory +import org.apache.mesos.Protos._ +import org.apache.mesos.MesosSchedulerDriver +import org.apache.mesos.Protos.FrameworkID +import org.scalatest.{Matchers, FlatSpec} +import scala.concurrent.duration._ + + +class GoScalarSpec extends FlatSpec with Matchers { + val conf = new FrameworkConfig(ConfigFactory.load()) + val scheduler = new GoCDScheduler(conf) + val poller = new GOCDPoller(conf) + val frameworkInfo = FrameworkInfo.newBuilder() + .setId(FrameworkID.newBuilder().setValue("").build()) + .setName("GOCD-Mesos") + .setUser("") + .setRole("*") + .setHostname("pattigai") + .setCheckpoint(true) + .setFailoverTimeout(60.seconds.toMillis) + .build() + val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, "") + + val scalar = new GOCDScalar(conf, poller, driver) + + "GoCDScalar#computeScaledown" should "return correct number to scale up" in { + scalar.computeScaledown(9, 6, 12, 2) should be (3) + scalar.computeScaledown(14, 13, 12, 2) should be (2) + scalar.computeScaledown(4, 0, 12, 2) should be (2) + scalar.computeScaledown(12, 0, 12, 2) should be (10) + scalar.computeScaledown(12, 1, 12, 2) should be (10) + } +} From 0684913535b726e2b0bfbc0891d1f4c7b6a852d1 Mon Sep 17 00:00:00 2001 From: tamizhgeek Date: Sun, 27 Mar 2016 20:19:51 +0530 Subject: [PATCH 4/5] New Scalar implementation to orchestrate agent scaling up/down 1. The scalar runs in a thread instead of the poller and supports scale down and scaleup. 2. The agents are remembered using a unique UUID after launch. This helps to find the agents launched by us vs other systems. --- .../scala/com/indix/mesos/GOCDPoller.scala | 8 +- .../scala/com/indix/mesos/GOCDScalar.scala | 43 +- .../scala/com/indix/mesos/GoCDScheduler.scala | 49 +- .../scala/com/indix/mesos/TaskQueue.scala | 2 +- .../indix/mesos/common/GocdMesosLogger.scala | 7 + .../com/indix/mesos/GoCDPollerSpec.scala | 476 +++++++++++------- .../scala/com/indix/mesos/GoScalarSpec.scala | 10 +- 7 files changed, 364 insertions(+), 231 deletions(-) create mode 100644 src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala diff --git a/src/main/scala/com/indix/mesos/GOCDPoller.scala b/src/main/scala/com/indix/mesos/GOCDPoller.scala index def3d79..d7221c2 100644 --- a/src/main/scala/com/indix/mesos/GOCDPoller.scala +++ b/src/main/scala/com/indix/mesos/GOCDPoller.scala @@ -46,7 +46,7 @@ case class GOCDPoller(conf: FrameworkConfig) { }.getOrElse(0) } - def getGoAgents: List[GoAgent] = { + def getAllAgents: List[GoAgent] = { println("Polling Go server for agents") withRetry(3) { val responseJson = jsonRequest(buildUrl() + "/go/api/agents") @@ -59,12 +59,12 @@ case class GOCDPoller(conf: FrameworkConfig) { }.getOrElse(List.empty) } - def goIdleAgents: List[GoAgent] = { - getGoAgents.filter(_.status.equalsIgnoreCase("idle")) + def getIdleAgents: List[GoAgent] = { + getAllAgents.filter(_.status.equalsIgnoreCase("idle")) } def getBuildingAgents = { - getGoAgents.filter(_.status.equalsIgnoreCase("building")) + getAllAgents.filter(_.status.equalsIgnoreCase("building")) } diff --git a/src/main/scala/com/indix/mesos/GOCDScalar.scala b/src/main/scala/com/indix/mesos/GOCDScalar.scala index 58e521f..3e7dec2 100644 --- a/src/main/scala/com/indix/mesos/GOCDScalar.scala +++ b/src/main/scala/com/indix/mesos/GOCDScalar.scala @@ -23,25 +23,44 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc driver.reconcileTasks(runningTasks.asJavaCollection) } + def scale() = { + val supply = getSupply + val demand = getDemand + if(demand > supply) { + val needed = computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents) + scaleUp(needed) + } else if (demand < supply) { + val notNeeded = computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents) + scaleDown(notNeeded) + } + } + + def reconcileAndScale() = { + reconcileTasks() + scale() + } + def getTotalAgents = { - poller.getGoAgents.size + poller.getAllAgents.size } def getSupply = { + // Supply is number of 'idle GoAgents that are launched via Mesos' + 'GoAgents that are waiting to be launched' TaskQueue .getRunningJobs .toList .map(_.goAgentUuid) - .union(poller.goIdleAgents.map(_.id)).size + .union(poller.getIdleAgents.map(_.id)).size + TaskQueue.getPendingJobs.size } def getDemand = { + // Demand is number of 'jobs pending in Go Server' + 'agents in building state' poller.getPendingJobsCount + poller.getBuildingAgents.size } def scaleDown(agentCount: Int) = { - val idleAgents = poller.goIdleAgents + val idleAgents = poller.getIdleAgents idleAgents.take(agentCount).foreach(agent => { TaskQueue.getRunningJobs.toList.find(_.goAgentUuid == agent.id).foreach { task => driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build()) @@ -57,6 +76,9 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { assert(demand > supply) + if(supply > goMaxAgents) { + return 0 + } val needed = demand - supply if(supply + needed > goMaxAgents) { goMaxAgents - supply @@ -68,6 +90,9 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { assert(supply > demand) + if(supply < goMinAgents) { + return 0 + } val notNeeded = supply - demand if(supply - notNeeded < goMinAgents) { supply - goMinAgents @@ -78,16 +103,4 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc } } - - def scale = { - val supply = getSupply - val demand = getDemand - if(demand > supply) { - computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents) - } else if (demand < supply) { - computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents) - } else { - // do Nothing - } - } } diff --git a/src/main/scala/com/indix/mesos/GoCDScheduler.scala b/src/main/scala/com/indix/mesos/GoCDScheduler.scala index 8447489..b0a6b8b 100644 --- a/src/main/scala/com/indix/mesos/GoCDScheduler.scala +++ b/src/main/scala/com/indix/mesos/GoCDScheduler.scala @@ -2,6 +2,7 @@ package com.indix.mesos import java.util.UUID +import com.indix.mesos.common.GocdMesosLogger import com.typesafe.config.ConfigFactory import org.apache.mesos.Protos.ContainerInfo.DockerInfo import org.apache.mesos.Protos.Environment.Variable @@ -9,10 +10,11 @@ import org.apache.mesos.Protos._ import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} import scala.collection.JavaConverters._ +import scala.concurrent.Future import scala.concurrent.duration._ -class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { +class GoCDScheduler(conf : FrameworkConfig) extends Scheduler with GocdMesosLogger { lazy val envForGoCDTask = Environment.newBuilder() .addVariables(Variable.newBuilder().setName("GO_SERVER").setValue(conf.goServerHost).build()) .addVariables(Variable.newBuilder().setName("AGENT_KEY").setValue(conf.goAgentKey.getOrElse(UUID.randomUUID().toString)).build()) @@ -20,13 +22,13 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { override def error(driver: SchedulerDriver, message: String) {} override def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) { - println(s"executor completed execution with status: $status") + logger.info(s"executor completed execution with status: $status") } override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {} override def disconnected(driver: SchedulerDriver): Unit = { - println(s"Received Disconnected message $driver") + logger.info(s"Received Disconnected message $driver") } override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = { @@ -34,7 +36,7 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { } override def statusUpdate(driver: SchedulerDriver, status: TaskStatus) { - println(s"received status update $status") + logger.info(s"received status update $status") TaskQueue.updateState(status.getTaskId.getValue, status.getState) } @@ -46,10 +48,10 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { * */ override def resourceOffers(driver: SchedulerDriver, offers: java.util.List[Offer]) { - println(s"Received resource offer size: ${offers.size()}") + logger.info(s"Received resource offer size: ${offers.size()}") //for every available offer run tasks for (offer <- offers.asScala) { - println(s"offer $offer") + logger.info(s"offer $offer") TaskQueue.findNext.foreach { goTask => val mesosTask = deployGoAgentTask(goTask, offer) mesosTask match { @@ -58,7 +60,7 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { TaskQueue.add(goTask.copy(state = GoTaskState.Scheduled)) } case None => { - println(s"declining unused offer because offer is not enough") + logger.info(s"declining unused offer because offer is not enough") driver.declineOffer(offer.getId) } } @@ -126,30 +128,21 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { } override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo) { - println(s"RE-registered with mesos master.") + logger.info(s"RE-registered with mesos master.") } override def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - println(s"registered with mesos master. Framework id is ${frameworkId.getValue}") + logger.info(s"registered with mesos master. Framework id is ${frameworkId.getValue}") } } - object GoCDMesosFramework extends App { - val config = new FrameworkConfig(ConfigFactory.load()) - + object GoCDMesosFramework extends App with GocdMesosLogger { + val config = new FrameworkConfig(ConfigFactory.load()) val id = "GOCD-Mesos-" + System.currentTimeMillis() - - - println(s"The Framework id is $id") - - - val goTaskQueue = TaskQueue() + logger.info(s"The Framework id is $id") val poller = GOCDPoller(config) - // val timeInterval = 2 * 60 * 1000 - - val frameworkInfo = FrameworkInfo.newBuilder() .setId(FrameworkID.newBuilder().setValue(id).build()) .setName("GOCD-Mesos") @@ -162,7 +155,20 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { val scheduler = new GoCDScheduler(config) val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, config.mesosMaster) + val scalar = GOCDScalar(config, poller, driver) println("Starting the driver") + + val timeInterval = 3 * 60 * 1000 + val runnable = new Runnable { + override def run(): Unit = { + while(true) { + scalar.reconcileAndScale + Thread.sleep(timeInterval) + } + } + } + + val status = if(driver.run() == Status.DRIVER_STOPPED) 0 else 1 // Ensure that the driver process terminates. driver.stop() // For this test to pass reliably on some platforms, this sleep is @@ -174,4 +180,5 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { // and its associated tasks via the Java API and wait until their // teardown is complete to exit. Thread.sleep(500) + System.exit(status) } \ No newline at end of file diff --git a/src/main/scala/com/indix/mesos/TaskQueue.scala b/src/main/scala/com/indix/mesos/TaskQueue.scala index 764fa8e..3c58ca2 100644 --- a/src/main/scala/com/indix/mesos/TaskQueue.scala +++ b/src/main/scala/com/indix/mesos/TaskQueue.scala @@ -45,7 +45,7 @@ object GoTask { } } -case class TaskQueue(queue: Map[String, GoTask] = Map.empty[String, GoTask]) { +private[mesos] case class TaskQueue(queue: Map[String, GoTask] = Map.empty[String, GoTask]) { def getRunningJobs = { queue.values.filter { task: GoTask => diff --git a/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala b/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala new file mode 100644 index 0000000..86eae5e --- /dev/null +++ b/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala @@ -0,0 +1,7 @@ +package com.indix.mesos.common + +import java.util.logging.Logger + +trait GocdMesosLogger { + val logger = Logger.getLogger(this.getClass.getName) +} diff --git a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala index bbc19f2..0465323 100644 --- a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala @@ -1,189 +1,287 @@ -//package com.indix.mesos -// -//import com.typesafe.config.ConfigFactory -//import org.scalatest._ -//import org.mockito.Mockito._ -//import play.api.libs.json.Json -// -//class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { -// -// -// val poller = new GOCDPoller(new FrameworkConfig(ConfigFactory.load())) -// -// override def beforeEach() = { -// TaskQueue.reset -// } -// -// override def afterEach() = { -// TaskQueue.reset -// } -// -// "GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in { -// // Given -// val pollerSpy = spy(poller) -// doReturn(1).when(pollerSpy).goTaskQueueSize() -// doReturn(0).when(pollerSpy).goIdleAgentsCount() -// doReturn(true).when(pollerSpy).isDemandPositive() -// -// -// // When -// for(i <- 0 to 5) { -// pollerSpy.pollAndAddTask() -// } -// -// // Then -// TaskQueue.queue.size() should be(1) -// } -// -// "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if Idle agents count is > 0" in { -// // Given -// val pollerSpy = spy(poller) -// doReturn(1).when(pollerSpy).getPendingJobs() -// doReturn(2).when(pollerSpy).goIdleAgentsCount() -// doReturn(true).when(pollerSpy).isDemandPositive() -// -// // When -// for(i <- 0 to 5) { -// pollerSpy.pollAndAddTask() -// } -// -// // Then -// TaskQueue.queue.size() should be(0) -// } -// -// "GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if isDemandPositive returns false" in { -// // Given -// val pollerSpy = spy(poller) -// doReturn(1).when(pollerSpy).getPendingJobs() -// doReturn(0).when(pollerSpy).goIdleAgentsCount() -// doReturn(false).when(pollerSpy).isDemandPositive() -// -// // When -// for(i <- 0 to 5) { -// pollerSpy.pollAndAddTask() -// } -// -// // Then -// TaskQueue.queue.size() should be(0) -// } -// -// -// -// "GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in { -// // Given -//<<<<<<< Updated upstream -// val pollerSpy = spy(poller) -// doReturn(1).when(pollerSpy).goTaskQueueSize() -// doReturn(0).when(pollerSpy).goIdleAgentsCount() -// doReturn(true).when(pollerSpy).isDemandPositive() -// -//======= -// val pollerSpy = spy(poller); -// doReturn(1).when(pollerSpy).getPendingJobs() -//>>>>>>> Stashed changes -// -// // When -// for(i <- 0 to 20) { -// pollerSpy.pollAndAddTask() -// } -// -// // Then -// TaskQueue.queue.size() should be(4) -// } -// -// "GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in { -// // Given -//<<<<<<< Updated upstream -// val pollerSpy = spy(poller) -// doReturn(1).when(pollerSpy).goTaskQueueSize() -// doReturn(0).when(pollerSpy).goIdleAgentsCount() -// doReturn(true).when(pollerSpy).isDemandPositive() -// -//======= -// val pollerSpy = spy(poller); -// doReturn(1).when(pollerSpy).getPendingJobs() -//>>>>>>> Stashed changes -// -// // When -// for(i <- 0 to 5) { -// pollerSpy.pollAndAddTask() -// } -// -// // Then -// TaskQueue.queue.size() should be(1) -// TaskQueue.find.dockerImage should be ("travix/gocd-agent:latest") -// } -// -// "GoCDPoller#goIdleAgentsCount" should "fetch idle agents count given a valid json" in { -// val responseJson = Json.parse( -// """ -// | -// |{ -// | "_links": { -// | "self": { -// | "href": "https://ci.example.com/go/api/agents" -// | }, -// | "doc": { -// | "href": "http://api.go.cd/#agents" -// | } -// | }, -// | "_embedded": { -// | "agents": [ -// | { -// | "_links": { -// | "self": { -// | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" -// | }, -// | "doc": { -// | "href": "http://api.go.cd/#agents" -// | }, -// | "find": { -// | "href": "https://ci.example.com/go/api/agents/:uuid" -// | } -// | }, -// | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", -// | "hostname": "agent01.example.com", -// | "ip_address": "10.12.20.47", -// | "enabled": true, -// | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", -// | "status": "Idle", -// | "operating_system": "Mac OS X", -// | "free_space": 84983328768, -// | "resources": ["java", "linux", "firefox"], -// | "environments": ["perf", "UAT"] -// | } -// | ] -// | } -// |} -// | -// """.stripMargin) -// // Given -// val pollerSpy = spy(poller) -// doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") -// -// // When, then -// pollerSpy.goIdleAgentsCount() should be(1) -// } -// -// "GoCDPoller#goTaskQueueSize" should "fetch pending jobs count given a valid json" in { -// val responseJson = -// """ -// | -// | -// | -// | mypipeline/5/defaultStage/1/job1 -// | -// | -// | -// | mypipeline/5/defaultStage/1/job2 -// | -// | -// """.stripMargin -// // Given -// val pollerSpy = spy(poller) -// doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml") -// -// // When, then -// pollerSpy.getPendingJobs() should be(2) -// } -//} +package com.indix.mesos + +import com.typesafe.config.ConfigFactory +import org.scalatest._ +import org.mockito.Mockito._ +import play.api.libs.json.Json + +class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + + val poller = new GOCDPoller(new FrameworkConfig(ConfigFactory.load())) + + override def beforeEach() = { + TaskQueue.reset + } + + override def afterEach() = { + TaskQueue.reset + } + + "GoCDPoller#goIdleAgents" should "fetch idle agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") + + // When, then + pollerSpy.getIdleAgents.size should be(1) + } + + "GoCDPoller#getAllAgents" should "fetch all agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") + + // When, then + pollerSpy.getAllAgents.size should be(3) + } + + "GoCDPoller#getBuildingAgents" should "fetch building agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Building", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") + + // When, then + pollerSpy.getBuildingAgents.size should be(1) + } + + "GoCDPoller#getPendingJobsCount" should "fetch pending jobs count given a valid json" in { + val responseJson = + """ + | + | + | + | mypipeline/5/defaultStage/1/job1 + | + | + | + | mypipeline/5/defaultStage/1/job2 + | + | + """.stripMargin + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml") + + // When, then + pollerSpy.getPendingJobsCount should be(2) + } +} diff --git a/src/test/scala/com/indix/mesos/GoScalarSpec.scala b/src/test/scala/com/indix/mesos/GoScalarSpec.scala index 6a3c7da..c64ee49 100644 --- a/src/test/scala/com/indix/mesos/GoScalarSpec.scala +++ b/src/test/scala/com/indix/mesos/GoScalarSpec.scala @@ -25,11 +25,19 @@ class GoScalarSpec extends FlatSpec with Matchers { val scalar = new GOCDScalar(conf, poller, driver) - "GoCDScalar#computeScaledown" should "return correct number to scale up" in { + "GoCDScalar#computeScaledown" should "return correct number to scale down" in { scalar.computeScaledown(9, 6, 12, 2) should be (3) scalar.computeScaledown(14, 13, 12, 2) should be (2) scalar.computeScaledown(4, 0, 12, 2) should be (2) scalar.computeScaledown(12, 0, 12, 2) should be (10) scalar.computeScaledown(12, 1, 12, 2) should be (10) } + + "GoCDScalar#computeScaleUp" should "return correct number to scale up" in { + scalar.computeScaleup(6, 9, 12, 2) should be (3) + scalar.computeScaleup(13, 14, 12, 2) should be (0) + scalar.computeScaleup(0, 4, 12, 2) should be (4) + scalar.computeScaleup(0, 14, 12, 2) should be (12) + scalar.computeScaleup(1, 12, 12, 2) should be (11) + } } From 989c5d986a750cd6be5a8fd30c2fbbbf1c81e048 Mon Sep 17 00:00:00 2001 From: tamizhgeek Date: Sun, 27 Mar 2016 22:28:26 +0530 Subject: [PATCH 5/5] Added specs for the new Scalar implementation --- .../scala/com/indix/mesos/GOCDScalar.scala | 53 ++++++++++++------- .../scala/com/indix/mesos/TaskQueue.scala | 10 ++-- ...oScalarSpec.scala => GoCDScalarSpec.scala} | 22 +++++++- 3 files changed, 60 insertions(+), 25 deletions(-) rename src/test/scala/com/indix/mesos/{GoScalarSpec.scala => GoCDScalarSpec.scala} (58%) diff --git a/src/main/scala/com/indix/mesos/GOCDScalar.scala b/src/main/scala/com/indix/mesos/GOCDScalar.scala index 3e7dec2..1d72b8e 100644 --- a/src/main/scala/com/indix/mesos/GOCDScalar.scala +++ b/src/main/scala/com/indix/mesos/GOCDScalar.scala @@ -1,17 +1,15 @@ package com.indix.mesos +import com.indix.mesos.common.GocdMesosLogger import org.apache.mesos.MesosSchedulerDriver import org.apache.mesos.Protos.{TaskState, TaskID, TaskStatus} import scala.collection.JavaConverters._ -case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) { - - - def runningTaskIds = TaskQueue.getAllJobIds - +case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) extends GocdMesosLogger { def reconcileTasks() = { + logger.info("Going to reconcile tasks now") val runningTasks = runningTaskIds.map(id => TaskStatus .newBuilder() .setTaskId(TaskID @@ -20,18 +18,28 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc .build()) .setState(TaskState.TASK_RUNNING) .build()) + logger.info(s"There are ${runningTasks.size} tasks that need to reconciled") driver.reconcileTasks(runningTasks.asJavaCollection) } def scale() = { + logger.info("SCALAR going to do autoscale operation") val supply = getSupply val demand = getDemand if(demand > supply) { + logger.info(s"The demand: $demand is greater than supply: $supply. Now computing the agents needed according to min and max agents.") val needed = computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents) - scaleUp(needed) + if(needed > 0) { + logger.info(s"Adding $needed more agents to the fleet.") + scaleUp(needed) + } } else if (demand < supply) { + logger.info(s"The demand: $demand is less than supply: $supply. Now computing the agents not needed according to min and max agents.") val notNeeded = computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents) - scaleDown(notNeeded) + if(notNeeded > 0) { + logger.info(s"Removing $notNeeded agents from the fleet.") + scaleDown(notNeeded) + } } } @@ -40,41 +48,48 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc scale() } - def getTotalAgents = { + private[mesos] def runningTaskIds = runningTasks.map(_.goAgentUuid) + + private[mesos] def runningTasks = TaskQueue.getRunningTasks + + private[mesos] def pendingTasks = TaskQueue.getPendingTasks + + + private[mesos] def getTotalAgents = { poller.getAllAgents.size } - def getSupply = { + private[mesos] def getSupply = { // Supply is number of 'idle GoAgents that are launched via Mesos' + 'GoAgents that are waiting to be launched' - TaskQueue - .getRunningJobs + val running = runningTasks .toList .map(_.goAgentUuid) - .union(poller.getIdleAgents.map(_.id)).size - + TaskQueue.getPendingJobs.size + .intersect(poller.getIdleAgents.map(_.id)).size + val pending = pendingTasks.size + running + pending } - def getDemand = { + private[mesos] def getDemand = { // Demand is number of 'jobs pending in Go Server' + 'agents in building state' poller.getPendingJobsCount + poller.getBuildingAgents.size } - def scaleDown(agentCount: Int) = { + private[mesos] def scaleDown(agentCount: Int) = { val idleAgents = poller.getIdleAgents idleAgents.take(agentCount).foreach(agent => { - TaskQueue.getRunningJobs.toList.find(_.goAgentUuid == agent.id).foreach { task => + TaskQueue.getRunningTasks.toList.find(_.goAgentUuid == agent.id).foreach { task => driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build()) } }) } - def scaleUp(agentCount: Int) = { + private[mesos] def scaleUp(agentCount: Int) = { for(_ <- 0 to agentCount) { TaskQueue.add(GoTask(conf.goAgentDocker)) } } - def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { + private[mesos] def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { assert(demand > supply) if(supply > goMaxAgents) { return 0 @@ -88,7 +103,7 @@ case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSc } - def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { + private[mesos] def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { assert(supply > demand) if(supply < goMinAgents) { return 0 diff --git a/src/main/scala/com/indix/mesos/TaskQueue.scala b/src/main/scala/com/indix/mesos/TaskQueue.scala index 3c58ca2..0997138 100644 --- a/src/main/scala/com/indix/mesos/TaskQueue.scala +++ b/src/main/scala/com/indix/mesos/TaskQueue.scala @@ -47,7 +47,7 @@ object GoTask { private[mesos] case class TaskQueue(queue: Map[String, GoTask] = Map.empty[String, GoTask]) { - def getRunningJobs = { + def getRunningTasks = { queue.values.filter { task: GoTask => task.state match { case GoTaskState.Running => true @@ -56,11 +56,11 @@ private[mesos] case class TaskQueue(queue: Map[String, GoTask] = Map.empty[Strin } } - def getPendingJobs = { + def getPendingTasks = { queue.values.filter { task: GoTask => task.state match { case GoTaskState.Pending => true - case GoTaskState.Scheduled=> true + case GoTaskState.Scheduled => true case _ => false } } @@ -105,9 +105,9 @@ object TaskQueue { def getAllJobIds = queue.queue.keys.toList - def getRunningJobs = queue.getRunningJobs + def getRunningTasks = queue.getRunningTasks - def getPendingJobs = queue.getPendingJobs + def getPendingTasks = queue.getPendingTasks def add(task : GoTask) { diff --git a/src/test/scala/com/indix/mesos/GoScalarSpec.scala b/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala similarity index 58% rename from src/test/scala/com/indix/mesos/GoScalarSpec.scala rename to src/test/scala/com/indix/mesos/GoCDScalarSpec.scala index c64ee49..10477d2 100644 --- a/src/test/scala/com/indix/mesos/GoScalarSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala @@ -4,11 +4,12 @@ import com.typesafe.config.ConfigFactory import org.apache.mesos.Protos._ import org.apache.mesos.MesosSchedulerDriver import org.apache.mesos.Protos.FrameworkID +import org.mockito.Mockito._ import org.scalatest.{Matchers, FlatSpec} import scala.concurrent.duration._ -class GoScalarSpec extends FlatSpec with Matchers { +class GoCDScalarSpec extends FlatSpec with Matchers { val conf = new FrameworkConfig(ConfigFactory.load()) val scheduler = new GoCDScheduler(conf) val poller = new GOCDPoller(conf) @@ -40,4 +41,23 @@ class GoScalarSpec extends FlatSpec with Matchers { scalar.computeScaleup(0, 14, 12, 2) should be (12) scalar.computeScaleup(1, 12, 12, 2) should be (11) } + + "GoCDScalar#getSupply" should "return the supply metric" in { + val pollerSpy = spy(poller) + val scalar = new GOCDScalar(conf, pollerSpy, driver) + val scalarSpy = spy(scalar) + doReturn(Iterable(GoTask("", "", GoTaskState.Scheduled, "idle3", ""), GoTask("", "", GoTaskState.Pending, "idle4", ""))).when(scalarSpy).pendingTasks + doReturn(Iterable(GoTask("", "", GoTaskState.Running, "idle1", ""), GoTask("", "", GoTaskState.Running, "active1", ""))).when(scalarSpy).runningTasks + doReturn(List(GoAgent("idle1", "Idle"))).when(pollerSpy).getIdleAgents + scalarSpy.getSupply should be (3) + } + + "GoCDScalar#getDemand" should "return the demand metric" in { + val pollerSpy = spy(poller) + val scalar = new GOCDScalar(conf, pollerSpy, driver) + val scalarSpy = spy(scalar) + doReturn(Iterable(GoAgent("active1", "Building"))).when(pollerSpy).getBuildingAgents + doReturn(4).when(pollerSpy).getPendingJobsCount + scalarSpy.getDemand should be (5) + } }