8000 Improved the agent demand calculation logic. by tamizhgeek · Pull Request #3 · indix/gocd-mesos · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Improved the agent demand calculation logic. #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ gocd-mesos {
port: "8080"
user-name: "random-guy"
password: "random-insecure-string"
# Enable if the go server mandatorily requires user name and password
auth-enabled: false
}

go-agent {
Expand Down
16 changes: 10 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
<id>repo.codahale.com</id>
<url>http://repo.codahale.com/</url>
</repository>
<repository>
<id>morphia.googlecode.com</id>
<url>http://morphia.googlecode.com/svn/mavenrepo/</url>
</repository>
<repository>
<id>maven.obiba.org/</id>
<url>http://maven.obiba.org/maven2/</url>
Expand Down Expand Up @@ -91,6 +87,11 @@
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play-json_2.11</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
Expand All @@ -101,9 +102,12 @@
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.11</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
Expand Down
6 changes: 4 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ gocd-mesos {

# GO server config to poll and connect the agent
go-server {
host: "localhost"
port: "8080"
host: "192.168.33.10"
port: "8153"
user-name: "azhagu"
password: "random-insecure-string"
# Enable if the go server mandatorily requires user name and password
auth-enabled: false
}

go-agent {
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/com/indix/mesos/FrameworkConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ class FrameworkConfig(config: Config) {

val goServerPort = rootConfig.getString("go-server.port")

val goUserName = rootConfig.getString("go-server.user-name")
val goPassword = rootConfig.getString("go-server.password")
val goMinAgents = rootConfig.getInt("go-agent.min-agents")

val goMaxAgents = rootConfig.getInt("go-agent.max-agents")

lazy val goUserName = rootConfig.getString("go-server.user-name")
lazy val goPassword = rootConfig.getString("go-server.password")

val goAuthEnabled = if(rootConfig.hasPath("go-server.auth-enabled")) rootConfig.getBoolean("go-server.auth-enabled") else false

val goAgentKey = if(rootConfig.hasPath("go-agent.auto-register-key")) Some(rootConfig.getString("go-agent.auto-register-key")) else None
}
95 changes: 64 additions & 31 deletions src/main/scala/com/indix/mesos/GOCDPoller.scala
1E0A
Original file line number Diff line number Diff line change
@@ -1,46 +1,79 @@
package com.indix.mesos

import java.net.{UnknownHostException, SocketTimeoutException}

import com.google.common.io.BaseEncoding

import scala.collection.mutable
import scala.util.{Failure, Success, Try}
import scalaj.http._
import play.api.libs.json._

case class GOCDPoller(conf: FrameworkConfig) {

case class GoAgent(id: String, status: String)

case class GOCDPoller(conf: FrameworkConfig) {
val authToken = BaseEncoding.base64().encode(s"${conf.goUserName}:${conf.goPassword}".getBytes("UTF-8"));

val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int]

def goTaskQueueSize(): Int = {

private[mesos] def request(url: String) = {
val request = if(conf.goAuthEnabled) {
Http(url)
.header("Authorization", s"Basic $authToken")
} else {
Http(url)
}.header("Accept", "application/vnd.go.cd.v1+json")
request.asString.body
}

private[mesos] def jsonRequest(url: String) = {
Json.parse(request(url))
}

private[mesos] def xmlRequest(url: String) = {
scala.xml.XML.loadString(request(url))
}

private def buildUrl() = {
s"http://${conf.goServerHost}:${conf.goServerPort}"
}


def getPendingJobsCount: Int = {
println("Polling GO Server for scheduled jobs")
try {
val response: HttpResponse[String] = Http(s"http://${conf.goServerHost}:${conf.goServerPort}" + "/go/api/jobs/scheduled.xml").asString //.header("Authorization", s"Basic ${authToken}").asString
val responseXml = scala.xml.XML.loadString(response.body)
(responseXml \\ "scheduledJobs" \\ "job").size
} catch {
case e: SocketTimeoutException => {
println("GOCD Server timed out!!")
0
}
}
}

def goIdleAgentsCount() = {
}

def pollAndAddTask() = {
val scheduled : Int = goTaskQueueSize()
println(s"Go server has ${scheduled.toString} pending jobs to be scheduled")
if(scheduled > 0)
responseHistory += scheduled

if(responseHistory.size > 5) {
println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.")
TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
responseHistory.clear()
}
withRetry(3){
val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml")
(responseXml \\ "scheduledJobs" \\ "job").size
}.getOrElse(0)
}

def getAllAgents: List[GoAgent] = {
println("Polling Go server for agents")
withRetry(3) {
val responseJson = jsonRequest(buildUrl() + "/go/api/agents")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tamizhgeek I know the api.go.cd response says this is the schema of the response, but can you quickly check the same on our servers? Our main build is running the latest 16.2.0. They look different actually. We might want to file a bug against it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashwanthkumar The difference in the responses are because of the Accept header we send. The difference is this:

curl -X GET http://build.gocd.io/go/api/agents -H "Authorization: Basic asdadsadasdasd="

and

curl -X GET http://build.gocd.io/go/api/agents -H "Authorization: Basic adsdasdasda=" -H "Accept: application/vnd.go.cd.v1+json"

:)

(responseJson \ "_embedded" \ "agents").toOption.map(jsValue => {
jsValue.as[JsArray]
.value
.map(agent => GoAgent((agent \ "uuid").get.as[String], (agent \ "status").get.as[String]))
.toList
}).getOrElse(List.empty)
}.getOrElse(List.empty)
}

def getIdleAgents: List[GoAgent] = {
getAllAgents.filter(_.status.equalsIgnoreCase("idle"))
}

def getBuildingAgents = {
getAllAgents.filter(_.status.equalsIgnoreCase("building"))
}



private def withRetry[T](n: Int)(fn: => T): Try[T] = {
Try(fn) match {
case res: Success[T] => res
case _ if n < 1 => withRetry(n - 1)(fn)
case Failure(ex) => throw ex
}
}
}
121 changes: 121 additions & 0 deletions src/main/scala/com/indix/mesos/GOCDScalar.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.indix.mesos

import com.indix.mesos.common.GocdMesosLogger
import org.apache.mesos.MesosSchedulerDriver
import org.apache.mesos.Protos.{TaskState, TaskID, TaskStatus}
import scala.collection.JavaConverters._


case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) extends GocdMesosLogger {

def reconcileTasks() = {
logger.info("Going to reconcile tasks now")
val runningTasks = runningTaskIds.map(id => TaskStatus
.newBuilder()
.setTaskId(TaskID
.newBuilder()
.setValue(id)
.build())
.setState(TaskState.TASK_RUNNING)
.build())
logger.info(s"There are ${runningTasks.size} tasks that need to reconciled")
driver.reconcileTasks(runningTasks.asJavaCollection)
}

def scale() = {
logger.info("SCALAR going to do autoscale operation")
val supply = getSupply
val demand = getDemand
if(demand > supply) {
logger.info(s"The demand: $demand is greater than supply: $supply. Now computing the agents needed according to min and max agents.")
val needed = computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents)
if(needed > 0) {
logger.info(s"Adding $needed more agents to the fleet.")
scaleUp(needed)
}
} else if (demand < supply) {
logger.info(s"The demand: $demand is less than supply: $supply. Now computing the agents not needed according to min and max agents.")
val notNeeded = computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents)
if(notNeeded > 0) {
logger.info(s"Removing $notNeeded agents from the fleet.")
scaleDown(notNeeded)
}
}
}

def reconcileAndScale() = {
reconcileTasks()
scale()
}

private[mesos] def runningTaskIds = runningTasks.map(_.goAgentUuid)

private[mesos] def runningTasks = TaskQueue.getRunningTasks

private[mesos] def pendingTasks = TaskQueue.getPendingTasks


private[mesos] def getTotalAgents = {
poller.getAllAgents.size
}

private[mesos] def getSupply = {
// Supply is number of 'idle GoAgents that are launched via Mesos' + 'GoAgents that are waiting to be launched'
val running = runningTasks
.toList
.map(_.goAgentUuid)
.intersect(poller.getIdleAgents.map(_.id)).size
val pending = pendingTasks.size
running + pending
}

private[mesos] def getDemand = {
// Demand is number of 'jobs pending in Go Server' + 'agents in building state'
poller.getPendingJobsCount + poller.getBuildingAgents.size
}

private[mesos] def scaleDown(agentCount: Int) = {
val idleAgents = poller.getIdleAgents
idleAgents.take(agentCount).foreach(agent => {
TaskQueue.getRunningTasks.toList.find(_.goAgentUuid == agent.id).foreach { task =>
driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build())
}
})
}

private[mesos] def scaleUp(agentCount: Int) = {
for(_ <- 0 to agentCount) {
TaskQueue.add(GoTask(conf.goAgentDocker))
}
}

private[mesos] def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = {
assert(demand > supply)
if(supply > goMaxAgents) {
return 0
}
val needed = demand - supply
if(supply + needed > goMaxAgents) {
goMaxAgents - supply
} else {
needed
}
}


private[mesos] def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = {
assert(supply > demand)
if(supply < goMinAgents) {
return 0
}
val notNeeded = supply - demand
if(supply - notNeeded < goMinAgents) {
supply - goMinAgents
} else if(supply - notNeeded > goMaxAgents) {
supply - goMaxAgents
} else {
notNeeded
}
}

}
Loading
0