8000 Adding YARNContainerFactory. This allows OpenWhisk to run actions on Apache Hadoop clusters. by SamHjelmfelt · Pull Request #4129 · apache/openwhisk · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Adding YARNContainerFactory. This allows OpenWhisk to run actions on Apache Hadoop clusters. #4129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments. Retry
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,18 @@ whisk {
}
}

yarn {
master-url="http://localhost:8088" //YARN Resource Manager endpoint to be accessed from the invoker
yarn-link-log-message=true //If true, display a link to YARN in the static log message, otherwise do not include a link to YARN.
service-name="openwhisk-action-service" //Name of the YARN Service created by the invoker. The invoker number will be appended.
auth-type="simple" //Authentication type for YARN (simple or kerberos)
kerberos-principal="" //Kerberos principal to use for the YARN service. Note: must include a hostname
kerberos-keytab="" //Location of keytab accessible by all node managers
queue="default" //Name of the YARN queue where the service will be created
memory=256 //Memory used by each YARN container
cpus=1 //CPUs used by each YARN container
}

logstore {
#SplunkLogStore configuration
#splunk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ object ConfigKeys {
val logStoreElasticSearch = s"$logStore.elasticsearch"

val mesos = "whisk.mesos"
val yarn = "whisk.yarn"

val containerProxy = "whisk.container-proxy"
val containerProxyTimeouts = s"$containerProxy.timeouts"
Expand Down
8000
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.yarn

import akka.actor.{Actor, ActorSystem}
import akka.http.scaladsl.model.{HttpMethods, StatusCodes}
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.yarn.YARNComponentActor.{CreateContainerAsync, RemoveContainer}
import spray.json.{JsArray, JsNumber, JsObject, JsString}

import scala.concurrent.ExecutionContext

/** Submits create and decommission commands to YARN */
object YARNComponentActor {
case object CreateContainerAsync
case class RemoveContainer(component_instance_name: String)
}

class YARNComponentActor(actorSystem: ActorSystem,
logging: Logging,
yarnConfig: YARNConfig,
serviceName: String,
imageName: ImageName)
extends Actor {

implicit val as: ActorSystem = actorSystem
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher

//Adding a container via the YARN REST API is actually done by flexing the component's container pool to a certain size.
// This actor must track the current containerCount in order to make the correct scale-up request.
var containerCount: Int = 0

def receive: PartialFunction[Any, Unit] = {
case CreateContainerAsync =>
sender ! createContainerAsync

case RemoveContainer(component_instance_name) =>
sender ! removeContainer(component_instance_name)

case input =>
throw new IllegalArgumentException("Unknown input: " + input)
sender ! false
}

def createContainerAsync(): Unit = {
logging.info(this, s"Using YARN to create a container with image ${imageName.name}...")

val body = JsObject("number_of_containers" -> JsNumber(containerCount + 1)).compactPrint
val response = YARNRESTUtil.submitRequestWithAuth(
yarnConfig.authType,
HttpMethods.PUT,
s"${yarnConfig.masterUrl}/app/v1/services/$serviceName/components/${imageName.name}",
body)
response match {
case httpresponse(StatusCodes.OK, content) =>
logging.info(this, s"Added container: ${imageName.name}. Response: $content")
containerCount += 1

case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
}
}

def removeContainer(component_instance_name: String): Unit = {
logging.info(this, s"Removing ${imageName.name} container: $component_instance_name ")
if (containerCount <= 0) {
logging.warn(this, "Already at 0 containers")
} else {
val body = JsObject(
"components" -> JsArray(
JsObject(
"name" -> JsString(imageName.name),
"decommissioned_instances" -> JsArray(JsString(component_instance_name))))).compactPrint
val response = YARNRESTUtil.submitRequestWithAuth(
yarnConfig.authType,
HttpMethods.PUT,
s"${yarnConfig.masterUrl}/app/v1/services/$serviceName",
body)
response match {
case httpresponse(StatusCodes.OK, content) =>
logging.info(
this,
s"Successfully removed ${imageName.name} container: $component_instance_name. Response: $content")
containerCount -= 1

case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.yarn

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.http.scaladsl.model.{HttpMethods, StatusCodes}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
import org.apache.openwhisk.core.yarn.YARNComponentActor.CreateContainerAsync
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import pureconfig.loadConfigOrThrow
import spray.json._

import scala.collection.immutable.HashMap
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.concurrent.duration._
import YARNJsonProtocol._
import akka.stream.ActorMaterializer

case class YARNConfig(masterUrl: String,
yarnLinkLogMessage: Boolean,
serviceName: String,
authType: String,
kerberosPrincipal: String,
kerberosKeytab: String,
queue: String,
memory: String,
cpus: Int)

object YARNContainerFactoryProvider extends ContainerFactoryProvider {
override def instance(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
instance: InvokerInstanceId,
parameters: Map[String, Set[String]]): ContainerFactory =
new YARNContainerFactory(actorSystem, logging, config, instance, parameters)
}

class YARNContainerFactory(actorSystem: ActorSystem,
logging: Logging,
config: WhiskConfig,
instance: InvokerInstanceId,
parameters: Map[String, Set[String]],
containerArgs: ContainerArgsConfig =
loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs),
yarnConfig: YARNConfig = loadConfigOrThrow[YARNConfig](ConfigKeys.yarn))
extends ContainerFactory {

val images: Set[ImageName] = ExecManifest.runtimesManifest.runtimes.flatMap(a => a.versions.map(b => b.image))

//One actor of each type per image for parallelism
private var yarnComponentActors: Map[ImageName, ActorRef] = HashMap[ImageName, ActorRef]()
private var YARNContainerInfoActors: Map[ImageName, ActorRef] = HashMap[ImageName, ActorRef]()

val serviceStartTimeoutMS = 60000
val retryWaitMS = 1000
val runCommand = ""
val version = "1.0.0"
val description = "OpenWhisk Action Service"

//Allows for invoker HA
val serviceName: String = yarnConfig.serviceName + "-" + instance.toInt

val containerStartTimeoutMS = 60000

implicit val as: ActorSystem = actorSystem
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher

override def init(): Unit = {
yarnComponentActors = images
.map(
i =>
(
i,
actorSystem.actorOf(
Props(new YARNComponentActor(actorSystem, logging, yarnConfig, serviceName, i)),
name = s"YARNComponentActor-${i.name}")))
.toMap
YARNContainerInfoActors = images
.map(
i =>
(
i,
actorSystem.actorOf(
Props(new YARNContainerInfoActor(actorSystem, logging, yarnConfig, serviceName, i)),
name = s"YARNComponentInfoActor-${i.name}")))
.toMap
blocking {
implicit val timeout: Timeout = Timeout(serviceStartTimeoutMS.milliseconds)

//Remove service if it already exists
val serviceDef =
YARNRESTUtil.downloadServiceDefinition(yarnConfig.authType, serviceName, yarnConfig.masterUrl)(logging)

if (serviceDef != null)
removeService()

createService()
}
}
override def createContainer(
unusedtid: TransactionId,
unusedname: String,
actionImage: ExecManifest.ImageName,
unuseduserProvidedImage: Boolean,
unusedmemory: ByteSize,
unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
implicit val timeout: Timeout = Timeout(containerStartTimeoutMS.milliseconds)

//First send the create command to YARN, then with a different actor, wait for the container to be ready
ask(yarnComponentActors(actionImage), CreateContainerAsync).flatMap(_ =>
ask(YARNContainerInfoActors(actionImage), GetContainerInfo(yarnComponentActors(actionImage))).mapTo[Container])
}
override def cleanup(): Unit = {
removeService()
yarnComponentActors foreach { case (k, v) => actorSystem.stop(v) }
YARNContainerInfoActors foreach { case (k, v) => actorSystem.stop(v) }
}
def createService(): Unit = {
logging.info(this, "Creating Service with images: " + images.map(i => i.publicImageName).mkString(", "))

val componentList = images
.map(
i =>
ComponentDefinition(
i.name.replace('.', '-'), //name must be [a-z][a-z0-9-]*
Some(0), //start with zero containers
Some(runCommand),
Option.empty,
Some(ArtifactDefinition(i.publicImageName, "DOCKER")),
Some(ResourceDefinition(yarnConfig.cpus, yarnConfig.memory)),
Some(ConfigurationDefinition(Map(("YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE", "true")))),
List[String]()))
.toList

//Add kerberos def if necessary
var kerberosDef: Option[KerberosPrincipalDefinition] = None
if (yarnConfig.authType.equals(YARNRESTUtil.KERBEROSAUTH))
kerberosDef = Some(
KerberosPrincipalDefinition(Some(yarnConfig.kerberosPrincipal), Some(yarnConfig.kerberosKeytab)))

val service = ServiceDefinition(
Some(serviceName),
Some(version),
Some(description),
Some("STABLE"),
Some(yarnConfig.queue),
componentList,
kerberosDef)

//Submit service
val response =
YARNRESTUtil.submitRequestWithAuth(
yarnConfig.authType,
HttpMethods.POST,
s"${yarnConfig.masterUrl}/app/v1/services",
service.toJson.compactPrint)

//Handle response
response match {
case httpresponse(StatusCodes.OK, content) =>
logging.info(this, s"Service submitted. Response: $content")

case httpresponse(StatusCodes.Accepted, content) =>
logging.info(this, s"Service submitted. Response: $content")

case httpresponse(_, _) => YARNRESTUtil.handleYARNRESTError(logging)
}

//Wait for service start (up to serviceStartTimeoutMS milliseconds)
var started = false
var retryCount = 0
val maxRetryCount = serviceStartTimeoutMS / retryWaitMS
while (!started && retryCount < maxRetryCount) {
val serviceDef =
YARNRESTUtil.downloadServiceDefinition(yarnConfig.authType, serviceName, yarnConfig.masterUrl)(logging)

if (serviceDef == null) {
logging.info(this, "Service not found yet")
Thread.sleep(retryWaitMS)
} else {
serviceDef.state.getOrElse(None) match {
case "STABLE" | "STARTED" =>
logging.info(this, "YARN service achieved stable state")
started = true

case state =>
logging.info(
this,
s"YARN service is not in stable state yet ($retryCount/$maxRetryCount). Current state: $state")
Thread.sleep(retryWaitMS)
}
}
retryCount += 1
}
if (!started)
throw new Exception(s"After ${serviceStartTimeoutMS}ms YARN service did not achieve stable state")
}
def removeService(): Unit = {
val response: httpresponse =
YARNRESTUtil.submitRequestWithAuth(
yarnConfig.authType,
HttpMethods.DELETE,
s"${yarnConfig.masterUrl}/app/v1/services/$serviceName",
"")

response match {
case httpresponse(StatusCodes.OK, _) =>
logging.info(this, "YARN service Removed")

case httpresponse(StatusCodes.NotFound, _) =>
logging.warn(this, "YARN service did not exist")

case httpresponse(StatusCodes.BadRequest, _) =>
logging.warn(this, "YARN service did not exist")

case httpresponse(_, _) =>
YARNRESTUtil.handleYARNRESTError(logging)
}
}
}
Loading
0