diff --git a/README.md b/README.md index 9610e6b..cf35156 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,8 @@ gocd-mesos { port: "8080" user-name: "random-guy" password: "random-insecure-string" + # Enable if the go server mandatorily requires user name and password + auth-enabled: false } go-agent { diff --git a/pom.xml b/pom.xml index 485dd80..1efa135 100644 --- a/pom.xml +++ b/pom.xml @@ -35,10 +35,6 @@ repo.codahale.com http://repo.codahale.com/ - - morphia.googlecode.com - http://morphia.googlecode.com/svn/mavenrepo/ - maven.obiba.org/ http://maven.obiba.org/maven2/ @@ -91,6 +87,11 @@ scala-xml 2.11.0-M4 + + com.typesafe.play + play-json_2.11 + 2.4.6 + org.scalatest scalatest_2.11 @@ -101,9 +102,12 @@ mockito-all 1.9.5 + + com.typesafe.scala-logging + scala-logging_2.11 + 3.1.0 + - - diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index ff326bb..3915b3f 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -5,10 +5,12 @@ gocd-mesos { # GO server config to poll and connect the agent go-server { - host: "localhost" - port: "8080" + host: "192.168.33.10" + port: "8153" user-name: "azhagu" password: "random-insecure-string" + # Enable if the go server mandatorily requires user name and password + auth-enabled: false } go-agent { diff --git a/src/main/scala/com/indix/mesos/FrameworkConfig.scala b/src/main/scala/com/indix/mesos/FrameworkConfig.scala index 89d35d5..6216241 100644 --- a/src/main/scala/com/indix/mesos/FrameworkConfig.scala +++ b/src/main/scala/com/indix/mesos/FrameworkConfig.scala @@ -14,8 +14,14 @@ class FrameworkConfig(config: Config) { val goServerPort = rootConfig.getString("go-server.port") - val goUserName = rootConfig.getString("go-server.user-name") - val goPassword = rootConfig.getString("go-server.password") + val goMinAgents = rootConfig.getInt("go-agent.min-agents") + + val goMaxAgents = rootConfig.getInt("go-agent.max-agents") + + lazy val goUserName = rootConfig.getString("go-server.user-name") + lazy val goPassword = rootConfig.getString("go-server.password") + + val goAuthEnabled = if(rootConfig.hasPath("go-server.auth-enabled")) rootConfig.getBoolean("go-server.auth-enabled") else false val goAgentKey = if(rootConfig.hasPath("go-agent.auto-register-key")) Some(rootConfig.getString("go-agent.auto-register-key")) else None } diff --git a/src/main/scala/com/indix/mesos/GOCDPoller.scala b/src/main/scala/com/indix/mesos/GOCDPoller.scala index e56776d..d7221c2 100644 --- a/src/main/scala/com/indix/mesos/GOCDPoller.scala +++ b/src/main/scala/com/indix/mesos/GOCDPoller.scala @@ -1,46 +1,79 @@ package com.indix.mesos -import java.net.{UnknownHostException, SocketTimeoutException} - import com.google.common.io.BaseEncoding - import scala.collection.mutable +import scala.util.{Failure, Success, Try} import scalaj.http._ +import play.api.libs.json._ -case class GOCDPoller(conf: FrameworkConfig) { +case class GoAgent(id: String, status: String) + +case class GOCDPoller(conf: FrameworkConfig) { val authToken = BaseEncoding.base64().encode(s"${conf.goUserName}:${conf.goPassword}".getBytes("UTF-8")); val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int] - def goTaskQueueSize(): Int = { + + private[mesos] def request(url: String) = { + val request = if(conf.goAuthEnabled) { + Http(url) + .header("Authorization", s"Basic $authToken") + } else { + Http(url) + }.header("Accept", "application/vnd.go.cd.v1+json") + request.asString.body + } + + private[mesos] def jsonRequest(url: String) = { + Json.parse(request(url)) + } + + private[mesos] def xmlRequest(url: String) = { + scala.xml.XML.loadString(request(url)) + } + + private def buildUrl() = { + s"http://${conf.goServerHost}:${conf.goServerPort}" + } + + + def getPendingJobsCount: Int = { println("Polling GO Server for scheduled jobs") - try { - val response: HttpResponse[String] = Http(s"http://${conf.goServerHost}:${conf.goServerPort}" + "/go/api/jobs/scheduled.xml").asString //.header("Authorization", s"Basic ${authToken}").asString - val responseXml = scala.xml.XML.loadString(response.body) - (responseXml \\ "scheduledJobs" \\ "job").size - } catch { - case e: SocketTimeoutException => { - println("GOCD Server timed out!!") - 0 - } - } - } - - def goIdleAgentsCount() = { - } - - def pollAndAddTask() = { - val scheduled : Int = goTaskQueueSize() - println(s"Go server has ${scheduled.toString} pending jobs to be scheduled") - if(scheduled > 0) - responseHistory += scheduled - - if(responseHistory.size > 5) { - println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.") - TaskQueue.enqueue(GoTask("", conf.goAgentDocker, "")) - responseHistory.clear() - } + withRetry(3){ + val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml") + (responseXml \\ "scheduledJobs" \\ "job").size + }.getOrElse(0) + } + + def getAllAgents: List[GoAgent] = { + println("Polling Go server for agents") + withRetry(3) { + val responseJson = jsonRequest(buildUrl() + "/go/api/agents") + (responseJson \ "_embedded" \ "agents").toOption.map(jsValue => { + jsValue.as[JsArray] + .value + .map(agent => GoAgent((agent \ "uuid").get.as[String], (agent \ "status").get.as[String])) + .toList + }).getOrElse(List.empty) + }.getOrElse(List.empty) + } + + def getIdleAgents: List[GoAgent] = { + getAllAgents.filter(_.status.equalsIgnoreCase("idle")) + } + + def getBuildingAgents = { + getAllAgents.filter(_.status.equalsIgnoreCase("building")) } + + + private def withRetry[T](n: Int)(fn: => T): Try[T] = { + Try(fn) match { + case res: Success[T] => res + case _ if n < 1 => withRetry(n - 1)(fn) + case Failure(ex) => throw ex + } + } } diff --git a/src/main/scala/com/indix/mesos/GOCDScalar.scala b/src/main/scala/com/indix/mesos/GOCDScalar.scala new file mode 100644 index 0000000..1d72b8e --- /dev/null +++ b/src/main/scala/com/indix/mesos/GOCDScalar.scala @@ -0,0 +1,121 @@ +package com.indix.mesos + +import com.indix.mesos.common.GocdMesosLogger +import org.apache.mesos.MesosSchedulerDriver +import org.apache.mesos.Protos.{TaskState, TaskID, TaskStatus} +import scala.collection.JavaConverters._ + + +case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) extends GocdMesosLogger { + + def reconcileTasks() = { + logger.info("Going to reconcile tasks now") + val runningTasks = runningTaskIds.map(id => TaskStatus + .newBuilder() + .setTaskId(TaskID + .newBuilder() + .setValue(id) + .build()) + .setState(TaskState.TASK_RUNNING) + .build()) + logger.info(s"There are ${runningTasks.size} tasks that need to reconciled") + driver.reconcileTasks(runningTasks.asJavaCollection) + } + + def scale() = { + logger.info("SCALAR going to do autoscale operation") + val supply = getSupply + val demand = getDemand + if(demand > supply) { + logger.info(s"The demand: $demand is greater than supply: $supply. Now computing the agents needed according to min and max agents.") + val needed = computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents) + if(needed > 0) { + logger.info(s"Adding $needed more agents to the fleet.") + scaleUp(needed) + } + } else if (demand < supply) { + logger.info(s"The demand: $demand is less than supply: $supply. Now computing the agents not needed according to min and max agents.") + val notNeeded = computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents) + if(notNeeded > 0) { + logger.info(s"Removing $notNeeded agents from the fleet.") + scaleDown(notNeeded) + } + } + } + + def reconcileAndScale() = { + reconcileTasks() + scale() + } + + private[mesos] def runningTaskIds = runningTasks.map(_.goAgentUuid) + + private[mesos] def runningTasks = TaskQueue.getRunningTasks + + private[mesos] def pendingTasks = TaskQueue.getPendingTasks + + + private[mesos] def getTotalAgents = { + poller.getAllAgents.size + } + + private[mesos] def getSupply = { + // Supply is number of 'idle GoAgents that are launched via Mesos' + 'GoAgents that are waiting to be launched' + val running = runningTasks + .toList + .map(_.goAgentUuid) + .intersect(poller.getIdleAgents.map(_.id)).size + val pending = pendingTasks.size + running + pending + } + + private[mesos] def getDemand = { + // Demand is number of 'jobs pending in Go Server' + 'agents in building state' + poller.getPendingJobsCount + poller.getBuildingAgents.size + } + + private[mesos] def scaleDown(agentCount: Int) = { + val idleAgents = poller.getIdleAgents + idleAgents.take(agentCount).foreach(agent => { + TaskQueue.getRunningTasks.toList.find(_.goAgentUuid == agent.id).foreach { task => + driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build()) + } + }) + } + + private[mesos] def scaleUp(agentCount: Int) = { + for(_ <- 0 to agentCount) { + TaskQueue.add(GoTask(conf.goAgentDocker)) + } + } + + private[mesos] def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { + assert(demand > supply) + if(supply > goMaxAgents) { + return 0 + } + val needed = demand - supply + if(supply + needed > goMaxAgents) { + goMaxAgents - supply + } else { + needed + } + } + + + private[mesos] def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { + assert(supply > demand) + if(supply < goMinAgents) { + return 0 + } + val notNeeded = supply - demand + if(supply - notNeeded < goMinAgents) { + supply - goMinAgents + } else if(supply - notNeeded > goMaxAgents) { + supply - goMaxAgents + } else { + notNeeded + } + } + +} diff --git a/src/main/scala/com/indix/mesos/GoCDScheduler.scala b/src/main/scala/com/indix/mesos/GoCDScheduler.scala index f3e5d37..b0a6b8b 100644 --- a/src/main/scala/com/indix/mesos/GoCDScheduler.scala +++ b/src/main/scala/com/indix/mesos/GoCDScheduler.scala @@ -2,6 +2,7 @@ package com.indix.mesos import java.util.UUID +import com.indix.mesos.common.GocdMesosLogger import com.typesafe.config.ConfigFactory import org.apache.mesos.Protos.ContainerInfo.DockerInfo import org.apache.mesos.Protos.Environment.Variable @@ -9,10 +10,11 @@ import org.apache.mesos.Protos._ import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} import scala.collection.JavaConverters._ +import scala.concurrent.Future import scala.concurrent.duration._ -class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { +class GoCDScheduler(conf : FrameworkConfig) extends Scheduler with GocdMesosLogger { lazy val envForGoCDTask = Environment.newBuilder() .addVariables(Variable.newBuilder().setName("GO_SERVER").setValue(conf.goServerHost).build()) .addVariables(Variable.newBuilder().setName("AGENT_KEY").setValue(conf.goAgentKey.getOrElse(UUID.randomUUID().toString)).build()) @@ -20,19 +22,22 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { override def error(driver: SchedulerDriver, message: String) {} override def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) { - println(s"executor completed execution with status: $status") + logger.info(s"executor completed execution with status: $status") } override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {} override def disconnected(driver: SchedulerDriver): Unit = { - println(s"Received Disconnected message $driver") + logger.info(s"Received Disconnected message $driver") } - override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]) {} + override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = { + + } override def statusUpdate(driver: SchedulerDriver, status: TaskStatus) { - println(s"received status update $status") + logger.info(s"received status update $status") + TaskQueue.updateState(status.getTaskId.getValue, status.getState) } override def offerRescinded(driver: SchedulerDriver, offerId: OfferID) {} @@ -43,26 +48,23 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { * */ override def resourceOffers(driver: SchedulerDriver, offers: java.util.List[Offer]) { - println(s"Received resource offer size: ${offers.size()}") + logger.info(s"Received resource offer size: ${offers.size()}") //for every available offer run tasks for (offer <- offers.asScala) { - println(s"offer $offer") - val nextTask = TaskQueue.dequeue - if(nextTask != null) { - val task = deployGoAgentTask(nextTask, offer) - task match { - case Some(tt) => driver.launchTasks(List(offer.getId).asJava, List(tt).asJava) + logger.info(s"offer $offer") + TaskQueue.findNext.foreach { goTask => + val mesosTask = deployGoAgentTask(goTask, offer) + mesosTask match { + case Some(tt) => { + driver.launchTasks(List(offer.getId).asJava, List(tt).asJava) + TaskQueue.add(goTask.copy(state = GoTaskState.Scheduled)) + } case None => { - TaskQueue.enqueue(nextTask) - println(s"declining unused offer because offer is not enough") + logger.info(s"declining unused offer because offer is not enough") driver.declineOffer(offer.getId) } } } - else { - println(s"declining unused offer because there is no task") - driver.declineOffer(offer.getId) - } } } @@ -94,7 +96,13 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { .setContainer(dockerContainerInfo(goTask).build()) .setCommand(Protos.CommandInfo.newBuilder() .setEnvironment(Protos.Environment.newBuilder() - .addAllVariables(envForGoCDTask.getVariablesList)) + .addAllVariables(envForGoCDTask + .addVariables(Variable + .newBuilder() + .setName("GO_AGENT_UUID") + .setValue(goTask.goAgentUuid) + .build()) + .getVariablesList)) .setShell(false)) } @@ -104,7 +112,7 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { val available = Resources(offer) if(available.canSatisfy(needed)) { val currentTimeStamp = System.currentTimeMillis() - val taskId = "gocd-agent-task-" + currentTimeStamp + val taskId = goTask.mesosTaskId val task = TaskInfo.newBuilder .setExecutor(executorInfo(goTask, currentTimeStamp).build()) .setName(taskId) @@ -120,36 +128,21 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { } override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo) { - println(s"RE-registered with mesos master.") + logger.info(s"RE-registered with mesos master.") } override def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - println(s"registered with mesos master. Framework id is ${frameworkId.getValue}") + logger.info(s"registered with mesos master. Framework id is ${frameworkId.getValue}") } - } - object GoCDMesosFramework extends App { - val config = new FrameworkConfig(ConfigFactory.load()) - + object GoCDMesosFramework extends App with GocdMesosLogger { + val config = new FrameworkConfig(ConfigFactory.load()) val id = "GOCD-Mesos-" + System.currentTimeMillis() - - - println(s"The Framework id is $id") - + logger.info(s"The Framework id is $id") val poller = GOCDPoller(config) - val timeInterval = 2 * 60 * 1000 - val runnable = new Runnable { - override def run(): Unit = { - while(true) { - poller.pollAndAddTask - Thread.sleep(timeInterval) - } - } - } - val frameworkInfo = FrameworkInfo.newBuilder() .setId(FrameworkID.newBuilder().setValue(id).build()) .setName("GOCD-Mesos") @@ -160,21 +153,24 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { .setFailoverTimeout(60.seconds.toMillis) .build() - val thread = new Thread(runnable) - thread.start() val scheduler = new GoCDScheduler(config) val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, config.mesosMaster) + val scalar = GOCDScalar(config, poller, driver) println("Starting the driver") - val status = if(driver.run() == Status.DRIVER_STOPPED) 0 else 1 - - //driver.run } - - // println(driver.join()) + val timeInterval = 3 * 60 * 1000 + val runnable = new Runnable { + override def run(): Unit = { + while(true) { + scalar.reconcileAndScale + Thread.sleep(timeInterval) + } + } + } - println(status) + val status = if(driver.run() == Status.DRIVER_STOPPED) 0 else 1 // Ensure that the driver process terminates. - driver.stop(); + driver.stop() // For this test to pass reliably on some platforms, this sleep is // required to ensure that the SchedulerDriver teardown is complete // before the JVM starts running native object destructors after @@ -183,8 +179,6 @@ class GoCDScheduler(conf : FrameworkConfig) extends Scheduler { // TODO(greg): Ideally, we would inspect the status of the driver // and its associated tasks via the Java API and wait until their // teardown is complete to exit. - Thread.sleep(500); - - System.exit(status); - println("===================================") -} \ No newline at end of file + Thread.sleep(500) + System.exit(status) + } \ No newline at end of file diff --git a/src/main/scala/com/indix/mesos/TaskQueue.scala b/src/main/scala/com/indix/mesos/TaskQueue.scala index 5fa0985..0997138 100644 --- a/src/main/scala/com/indix/mesos/TaskQueue.scala +++ b/src/main/scala/com/indix/mesos/TaskQueue.scala @@ -1,24 +1,133 @@ package com.indix.mesos -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.UUID +import org.apache.mesos.Protos.TaskState + + +object GoTaskState { + abstract class GoTaskState + object Pending extends GoTaskState + object Scheduled extends GoTaskState + object Running extends GoTaskState + object Stopped extends GoTaskState +} case class GoTaskResource(cpu: Double = 1, memory: Double = 256, disk: Double = 1*1024) -case class GoTask(cmdString: String, dockerImage: String, uri: String, resource: GoTaskResource = GoTaskResource()) {} + + +case class GoTask(dockerImage: String, + uri: String, + state: GoTaskState.GoTaskState, + goAgentUuid: String, + mesosTaskId: String, + mesosFailureMessage: Option[String] = None, + resource: GoTaskResource = GoTaskResource()) { + def updateState(state: TaskState): GoTask = { + state match { + case TaskState.TASK_FAILED | TaskState.TASK_KILLED | TaskState.TASK_FINISHED => this.copy(state = GoTaskState.Stopped, mesosFailureMessage = Some("")) + case _ => this + } + } +} + +object GoTask { + def generateTaskId(): String = { + "Mesos-gocd-task-" + System.currentTimeMillis() + } + + def generateAgentUUID(): String = { + UUID.randomUUID().toString + } + + def apply(dockerImage: String) = { + new GoTask(dockerImage, "", GoTaskState.Pending, generateAgentUUID(), generateTaskId(), None) + } +} + +private[mesos] case class TaskQueue(queue: Map[String, GoTask] = Map.empty[String, GoTask]) { + + def getRunningTasks = { + queue.values.filter { task: GoTask => + task.state match { + case GoTaskState.Running => true + case _ => false + } + } + } + + def getPendingTasks = { + queue.values.filter { task: GoTask => + task.state match { + case GoTaskState.Pending => true + case GoTaskState.Scheduled => true + case _ => false + } + } + } + + + def add(task: GoTask): TaskQueue = { + this.copy(queue = queue + (task.mesosTaskId -> task)) + } + + def findNext: Option[GoTask] = { + queue.values.find(task => { + task.state match { + case GoTaskState.Pending => true + case _ => false + } + }) + } + + def updateState(id: String, state: TaskState): TaskQueue = { + this.queue.get(id).map(task => { + this.copy(queue = queue + (id -> task.updateState(state))) + }).getOrElse(this) + } + + def prune: TaskQueue = { + val updatedQueue = this.queue.filterNot(pair => { + val (id, task) = pair + task.state match { + case GoTaskState.Stopped => true + case _ => false + } + }) + this.copy(queue = updatedQueue) + } +} object TaskQueue { - val queue : ConcurrentLinkedQueue[GoTask] = new ConcurrentLinkedQueue[GoTask]() - def enqueue(task : GoTask) { - queue.offer(task) + var queue = TaskQueue() + + + def getAllJobIds = queue.queue.keys.toList + + def getRunningTasks = queue.getRunningTasks + + def getPendingTasks = queue.getPendingTasks + + + def add(task : GoTask) { + synchronized { + queue = queue.add(task) + } + } + + def findNext = { + queue.findNext } - def dequeue : GoTask = { - queue.poll() + def updateState(id: String, state: TaskState) = { + synchronized { + queue = queue.updateState(id, state) + } } def reset = { - queue.clear() + queue.prune } } diff --git a/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala b/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala new file mode 100644 index 0000000..86eae5e --- /dev/null +++ b/src/main/scala/com/indix/mesos/common/GocdMesosLogger.scala @@ -0,0 +1,7 @@ +package com.indix.mesos.common + +import java.util.logging.Logger + +trait GocdMesosLogger { + val logger = Logger.getLogger(this.getClass.getName) +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index ff326bb..d31e05e 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -12,6 +12,8 @@ gocd-mesos { } go-agent { + min-agents: 2 + max-agents: 12 # This key should be already shared with the go server and configured in its config.xml as autoRegisterKey auto-register-key: "6a92073d04fc4eff99137d7f8c7e794d" # Docker image containing the go agent diff --git a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala index 9890bba..0465323 100644 --- a/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDPollerSpec.scala @@ -3,6 +3,7 @@ package com.indix.mesos import com.typesafe.config.ConfigFactory import org.scalatest._ import org.mockito.Mockito._ +import play.api.libs.json.Json class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { @@ -17,47 +18,270 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { TaskQueue.reset } - "GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in { + "GoCDPoller#goIdleAgents" should "fetch idle agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) // Given - val pollerSpy = spy(poller); - doReturn(1).when(pollerSpy).goTaskQueueSize() + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") - // When - for(i <- 0 to 5) { - pollerSpy.pollAndAddTask() - } - - // Then - TaskQueue.queue.size() should be(1) + // When, then + pollerSpy.getIdleAgents.size should be(1) } - - "GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in { + "GoCDPoller#getAllAgents" should "fetch all agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) // Given - val pollerSpy = spy(poller); - doReturn(1).when(pollerSpy).goTaskQueueSize() - - // When - for(i <- 0 to 20) { - pollerSpy.pollAndAddTask() - } + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") - // Then - TaskQueue.queue.size() should be(3) + // When, then + pollerSpy.getAllAgents.size should be(3) } - "GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in { + "GoCDPoller#getBuildingAgents" should "fetch building agents count given a valid json" in { + val responseJson = Json.parse( + """ + | + |{ + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | } + | }, + | "_embedded": { + | "agents": [ + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Building", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | }, + | { + | "_links": { + | "self": { + | "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da" + | }, + | "doc": { + | "href": "http://api.go.cd/#agents" + | }, + | "find": { + | "href": "https://ci.example.com/go/api/agents/:uuid" + | } + | }, + | "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da", + | "hostname": "agent01.example.com", + | "ip_address": "10.12.20.47", + | "enabled": true, + | "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent", + | "status": "Idle", + | "operating_system": "Mac OS X", + | "free_space": 84983328768, + | "resources": ["java", "linux", "firefox"], + | "environments": ["perf", "UAT"] + | } + | ] + | } + |} + | + """.stripMargin) // Given - val pollerSpy = spy(poller); - doReturn(1).when(pollerSpy).goTaskQueueSize() + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents") - // When - for(i <- 0 to 5) { - pollerSpy.pollAndAddTask() - } + // When, then + pollerSpy.getBuildingAgents.size should be(1) + } + + "GoCDPoller#getPendingJobsCount" should "fetch pending jobs count given a valid json" in { + val responseJson = + """ + | + | + | + | mypipeline/5/defaultStage/1/job1 + | + | + | + | mypipeline/5/defaultStage/1/job2 + | + | + """.stripMargin + // Given + val pollerSpy = spy(poller) + doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml") - // Then - TaskQueue.queue.size() should be(1) - TaskQueue.dequeue.dockerImage should be ("travix/gocd-agent:latest") + // When, then + pollerSpy.getPendingJobsCount should be(2) } } diff --git a/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala b/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala new file mode 100644 index 0000000..10477d2 --- /dev/null +++ b/src/test/scala/com/indix/mesos/GoCDScalarSpec.scala @@ -0,0 +1,63 @@ +package com.indix.mesos + +import com.typesafe.config.ConfigFactory +import org.apache.mesos.Protos._ +import org.apache.mesos.MesosSchedulerDriver +import org.apache.mesos.Protos.FrameworkID +import org.mockito.Mockito._ +import org.scalatest.{Matchers, FlatSpec} +import scala.concurrent.duration._ + + +class GoCDScalarSpec extends FlatSpec with Matchers { + val conf = new FrameworkConfig(ConfigFactory.load()) + val scheduler = new GoCDScheduler(conf) + val poller = new GOCDPoller(conf) + val frameworkInfo = FrameworkInfo.newBuilder() + .setId(FrameworkID.newBuilder().setValue("").build()) + .setName("GOCD-Mesos") + .setUser("") + .setRole("*") + .setHostname("pattigai") + .setCheckpoint(true) + .setFailoverTimeout(60.seconds.toMillis) + .build() + val driver = new MesosSchedulerDriver(scheduler, frameworkInfo, "") + + val scalar = new GOCDScalar(conf, poller, driver) + + "GoCDScalar#computeScaledown" should "return correct number to scale down" in { + scalar.computeScaledown(9, 6, 12, 2) should be (3) + scalar.computeScaledown(14, 13, 12, 2) should be (2) + scalar.computeScaledown(4, 0, 12, 2) should be (2) + scalar.computeScaledown(12, 0, 12, 2) should be (10) + scalar.computeScaledown(12, 1, 12, 2) should be (10) + } + + "GoCDScalar#computeScaleUp" should "return correct number to scale up" in { + scalar.computeScaleup(6, 9, 12, 2) should be (3) + scalar.computeScaleup(13, 14, 12, 2) should be (0) + scalar.computeScaleup(0, 4, 12, 2) should be (4) + scalar.computeScaleup(0, 14, 12, 2) should be (12) + scalar.computeScaleup(1, 12, 12, 2) should be (11) + } + + "GoCDScalar#getSupply" should "return the supply metric" in { + val pollerSpy = spy(poller) + val scalar = new GOCDScalar(conf, pollerSpy, driver) + val scalarSpy = spy(scalar) + doReturn(Iterable(GoTask("", "", GoTaskState.Scheduled, "idle3", ""), GoTask("", "", GoTaskState.Pending, "idle4", ""))).when(scalarSpy).pendingTasks + doReturn(Iterable(GoTask("", "", GoTaskState.Running, "idle1", ""), GoTask("", "", GoTaskState.Running, "active1", ""))).when(scalarSpy).runningTasks + doReturn(List(GoAgent("idle1", "Idle"))).when(pollerSpy).getIdleAgents + scalarSpy.getSupply should be (3) + } + + "GoCDScalar#getDemand" should "return the demand metric" in { + val pollerSpy = spy(poller) + val scalar = new GOCDScalar(conf, pollerSpy, driver) + val scalarSpy = spy(scalar) + doReturn(Iterable(GoAgent("active1", "Building"))).when(pollerSpy).getBuildingAgents + doReturn(4).when(pollerSpy).getPendingJobsCount + scalarSpy.getDemand should be (5) + } +} diff --git a/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala b/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala index 4c7ed0a..e11da57 100644 --- a/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala +++ b/src/test/scala/com/indix/mesos/GoCDSchedulerSpec.scala @@ -51,11 +51,11 @@ class GoCDSchedulerSpec extends FlatSpec with Matchers { } "GoCDScheduler#deployGoAgentTask" should "refuse offer if there are not enough resources" in { - scheduler.deployGoAgentTask(GoTask("", "docker-image", ""), inadequateOffer) should be(None) + scheduler.deployGoAgentTask(GoTask("docker-image"), inadequateOffer) should be(None) } "GOCDScheduler#deployGoAgentTask" should "accept offer if there are enough resources" in { - val taskOpt = scheduler.deployGoAgentTask(GoTask("", "docker-image",""), generoursResourceOffer) + val taskOpt = scheduler.deployGoAgentTask(GoTask("docker-image"), generoursResourceOffer) taskOpt.isDefined should be(true) taskOpt.get.getExecutor.getContainer.getDocker.getImage should be("docker-image") }