-
Notifications
You must be signed in to change notification settings - Fork 0
Improved the agent demand calculation logic. #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tamizhgeek
wants to merge
5
commits into
master
Choose a base branch
from
agent_demand_logic_improvement
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
b4a0ecf
Improved the agent demand calculation logic.
tamizhgeek eebc327
Separate config to enable/disable authentication with the GoCD API
tamizhgeek 3f39e57
Slightly better demand/supply management
tamizhgeek 0684913
New Scalar implementation to orchestrate agent scaling up/down
tamizhgeek 989c5d9
Added specs for the new Scalar implementation
tamizhgeek File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,79 @@ | ||
package com.indix.mesos | ||
|
||
import java.net.{UnknownHostException, SocketTimeoutException} | ||
|
||
import com.google.common.io.BaseEncoding | ||
|
||
import scala.collection.mutable | ||
import scala.util.{Failure, Success, Try} | ||
import scalaj.http._ | ||
import play.api.libs.json._ | ||
|
||
case class GOCDPoller(conf: FrameworkConfig) { | ||
|
||
case class GoAgent(id: String, status: String) | ||
|
||
case class GOCDPoller(conf: FrameworkConfig) { | ||
val authToken = BaseEncoding.base64().encode(s"${conf.goUserName}:${conf.goPassword}".getBytes("UTF-8")); | ||
|
||
val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int] | ||
|
||
def goTaskQueueSize(): Int = { | ||
|
||
private[mesos] def request(url: String) = { | ||
val request = if(conf.goAuthEnabled) { | ||
Http(url) | ||
.header("Authorization", s"Basic $authToken") | ||
} else { | ||
Http(url) | ||
}.header("Accept", "application/vnd.go.cd.v1+json") | ||
request.asString.body | ||
} | ||
|
||
private[mesos] def jsonRequest(url: String) = { | ||
Json.parse(request(url)) | ||
} | ||
|
||
private[mesos] def xmlRequest(url: String) = { | ||
scala.xml.XML.loadString(request(url)) | ||
} | ||
|
||
private def buildUrl() = { | ||
s"http://${conf.goServerHost}:${conf.goServerPort}" | ||
} | ||
|
||
|
||
def getPendingJobsCount: Int = { | ||
println("Polling GO Server for scheduled jobs") | ||
try { | ||
val response: HttpResponse[String] = Http(s"http://${conf.goServerHost}:${conf.goServerPort}" + "/go/api/jobs/scheduled.xml").asString //.header("Authorization", s"Basic ${authToken}").asString | ||
val responseXml = scala.xml.XML.loadString(response.body) | ||
(responseXml \\ "scheduledJobs" \\ "job").size | ||
} catch { | ||
case e: SocketTimeoutException => { | ||
println("GOCD Server timed out!!") | ||
0 | ||
} | ||
} | ||
} | ||
|
||
def goIdleAgentsCount() = { | ||
} | ||
|
||
def pollAndAddTask() = { | ||
val scheduled : Int = goTaskQueueSize() | ||
println(s"Go server has ${scheduled.toString} pending jobs to be scheduled") | ||
if(scheduled > 0) | ||
responseHistory += scheduled | ||
|
||
if(responseHistory.size > 5) { | ||
println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.") | ||
TaskQueue.enqueue(GoTask("", conf.goAgentDocker, "")) | ||
responseHistory.clear() | ||
} | ||
withRetry(3){ | ||
val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml") | ||
(responseXml \\ "scheduledJobs" \\ "job").size | ||
}.getOrElse(0) | ||
} | ||
|
||
def getAllAgents: List[GoAgent] = { | ||
println("Polling Go server for agents") | ||
withRetry(3) { | ||
val responseJson = jsonRequest(buildUrl() + "/go/api/agents") | ||
(responseJson \ "_embedded" \ "agents").toOption.map(jsValue => { | ||
jsValue.as[JsArray] | ||
.value | ||
.map(agent => GoAgent((agent \ "uuid").get.as[String], (agent \ "status").get.as[String])) | ||
.toList | ||
}).getOrElse(List.empty) | ||
}.getOrElse(List.empty) | ||
} | ||
|
||
def getIdleAgents: List[GoAgent] = { | ||
getAllAgents.filter(_.status.equalsIgnoreCase("idle")) | ||
} | ||
|
||
def getBuildingAgents = { | ||
getAllAgents.filter(_.status.equalsIgnoreCase("building")) | ||
} | ||
|
||
|
||
|
||
private def withRetry[T](n: Int)(fn: => T): Try[T] = { | ||
Try(fn) match { | ||
case res: Success[T] => res | ||
case _ if n < 1 => withRetry(n - 1)(fn) | ||
case Failure(ex) => throw ex | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package com.indix.mesos | ||
|
||
import com.indix.mesos.common.GocdMesosLogger | ||
import org.apache.mesos.MesosSchedulerDriver | ||
import org.apache.mesos.Protos.{TaskState, TaskID, TaskStatus} | ||
import scala.collection.JavaConverters._ | ||
|
||
|
||
case class GOCDScalar(conf: FrameworkConfig, poller: GOCDPoller, driver: MesosSchedulerDriver) extends GocdMesosLogger { | ||
|
||
def reconcileTasks() = { | ||
logger.info("Going to reconcile tasks now") | ||
val runningTasks = runningTaskIds.map(id => TaskStatus | ||
.newBuilder() | ||
.setTaskId(TaskID | ||
.newBuilder() | ||
.setValue(id) | ||
.build()) | ||
.setState(TaskState.TASK_RUNNING) | ||
.build()) | ||
logger.info(s"There are ${runningTasks.size} tasks that need to reconciled") | ||
driver.reconcileTasks(runningTasks.asJavaCollection) | ||
} | ||
|
||
def scale() = { | ||
logger.info("SCALAR going to do autoscale operation") | ||
val supply = getSupply | ||
val demand = getDemand | ||
if(demand > supply) { | ||
logger.info(s"The demand: $demand is greater than supply: $supply. Now computing the agents needed according to min and max agents.") | ||
val needed = computeScaleup(supply, demand, conf.goMaxAgents, conf.goMinAgents) | ||
if(needed > 0) { | ||
logger.info(s"Adding $needed more agents to the fleet.") | ||
scaleUp(needed) | ||
} | ||
} else if (demand < supply) { | ||
logger.info(s"The demand: $demand is less than supply: $supply. Now computing the agents not needed according to min and max agents.") | ||
val notNeeded = computeScaledown(supply, demand, conf.goMaxAgents, conf.goMinAgents) | ||
if(notNeeded > 0) { | ||
logger.info(s"Removing $notNeeded agents from the fleet.") | ||
scaleDown(notNeeded) | ||
} | ||
} | ||
} | ||
|
||
def reconcileAndScale() = { | ||
reconcileTasks() | ||
scale() | ||
} | ||
|
||
private[mesos] def runningTaskIds = runningTasks.map(_.goAgentUuid) | ||
|
||
private[mesos] def runningTasks = TaskQueue.getRunningTasks | ||
|
||
private[mesos] def pendingTasks = TaskQueue.getPendingTasks | ||
|
||
|
||
private[mesos] def getTotalAgents = { | ||
poller.getAllAgents.size | ||
} | ||
|
||
private[mesos] def getSupply = { | ||
// Supply is number of 'idle GoAgents that are launched via Mesos' + 'GoAgents that are waiting to be launched' | ||
val running = runningTasks | ||
.toList | ||
.map(_.goAgentUuid) | ||
.intersect(poller.getIdleAgents.map(_.id)).size | ||
val pending = pendingTasks.size | ||
running + pending | ||
} | ||
|
||
private[mesos] def getDemand = { | ||
// Demand is number of 'jobs pending in Go Server' + 'agents in building state' | ||
poller.getPendingJobsCount + poller.getBuildingAgents.size | ||
} | ||
|
||
private[mesos] def scaleDown(agentCount: Int) = { | ||
val idleAgents = poller.getIdleAgents | ||
idleAgents.take(agentCount).foreach(agent => { | ||
TaskQueue.getRunningTasks.toList.find(_.goAgentUuid == agent.id).foreach { task => | ||
driver.killTask(TaskID.newBuilder().setValue(task.mesosTaskId).build()) | ||
} | ||
}) | ||
} | ||
|
||
private[mesos] def scaleUp(agentCount: Int) = { | ||
for(_ <- 0 to agentCount) { | ||
TaskQueue.add(GoTask(conf.goAgentDocker)) | ||
} | ||
} | ||
|
||
private[mesos] def computeScaleup(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { | ||
assert(demand > supply) | ||
if(supply > goMaxAgents) { | ||
return 0 | ||
} | ||
val needed = demand - supply | ||
if(supply + needed > goMaxAgents) { | ||
goMaxAgents - supply | ||
} else { | ||
needed | ||
} | ||
} | ||
|
||
|
||
private[mesos] def computeScaledown(supply: Int, demand: Int, goMaxAgents: Int, goMinAgents: Int): Int = { | ||
assert(supply > demand) | ||
if(supply < goMinAgents) { | ||
return 0 | ||
} | ||
val notNeeded = supply - demand | ||
if(supply - notNeeded < goMinAgents) { | ||
supply - goMinAgents | ||
} else if(supply - notNeeded > goMaxAgents) { | ||
supply - goMaxAgents | ||
} else { | ||
notNeeded | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tamizhgeek I know the api.go.cd response says this is the schema of the response, but can you quickly check the same on our servers? Our main build is running the latest 16.2.0. They look different actually. We might want to file a bug against it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashwanthkumar The difference in the responses are because of the Accept header we send. The difference is this:
curl -X GET http://build.gocd.io/go/api/agents -H "Authorization: Basic asdadsadasdasd="
and
curl -X GET http://build.gocd.io/go/api/agents -H "Authorization: Basic adsdasdasda=" -H "Accept: application/vnd.go.cd.v1+json"
:)