Actors
Actors are very lightweight concurrent entities. They process messages asynchronously using an event-driven receive loop. Pattern matching against messages is a convenient way to express an actor's behavior. They raise the abstraction level and make it much easier to write, test, understand and maintain concurrent and/or distributed systems. You focus on workflow—how the messages flow in the system—instead of low level primitives like threads, locks and socket IO. Learn More.
case class Greeting(who: String)
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ⇒ log.info("Hello " + who)
}
}
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
greeter ! Greeting("Charlie Parker")
public class Greeting implements Serializable {
public final String who;
public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof Greeting)
log.info("Hello " + ((Greeting) message).who);
}
}
ActorSystem system = ActorSystem.create("MySystem");
ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
greeter.tell(new Greeting("Charlie Parker"), ActorRef.noSender());
Remoting
Actors are location transparent and distributable by design. This means that you can write your application without hardcoding how it
will be deployed and distributed, and then later just configure your actor system against a certain topology with all of the application’s
semantics, including actor supervision, retained. Learn more.
// ------------------------------
// config on all machines
akka {
actor {
provider = akka.remote.RemoteActorRefProvider
deployment {
/greeter {
remote = akka.tcp://MySystem@machine1:2552
}
}
}
}
// ------------------------------
// define the greeting actor and the greeting message
case class Greeting(who: String) extends Serializable
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ⇒ log.info("Hello " + who)
}
}
// ------------------------------
// on machine 1: empty system, target for deployment from machine 2
val system = ActorSystem("MySystem")
// ------------------------------
// on machine 2: Remote Deployment - deploying on machine1
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
// ------------------------------
// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)
val system = ActorSystem("MySystem")
val greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter")
greeter ! Greeting("Sonny Rollins")
// ------------------------------
// config on all machines
akka {
actor {
provider = akka.remote.RemoteActorRefProvider
deployment {
/greeter {
remote = akka.tcp://MySystem@machine1:2552
}
}
}
}
// ------------------------------
// define the greeting actor and the greeting message
public class Greeting implements Serializable {
public final String who;
public Greeting(String who) { this.who = who; }
}
public class GreetingActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof Greeting)
log.info("Hello " + ((Greeting) message).who);
}
}
// ------------------------------
// on machine 1: empty system, target for deployment from machine 2
ActorSystem system = ActorSystem.create("MySystem");
// ------------------------------
// on machine 2: Remote Deployment - deploying on machine1
ActorSystem system = ActorSystem.create("MySystem");
ActorRef greeter = system.actorOf(new Props(GreetingActor.class), "greeter");
// ------------------------------
// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)
ActorSystem system = ActorSystem.create("MySystem");
ActorSelection greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter");
greeter.tell(new Greeting("Sonny Rollins"), ActorRef.noSender());
Supervision
Actors form a tree with actors being parents to the actors they've created. As a parent, the actor is responsible for handling its
children’s failures (so-called supervision), forming a chain of responsibility, all the way to the top. When an actor crashes, its
parent can either restart or stop it, or escalate the failure up the hierarchy of actors. This enables a clean set of semantics for
managing failures in a concurrent, distributed system and allows for writing highly fault-tolerant systems that self-heal. Learn more.
class Supervisor extends Actor {
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException ⇒ Resume
case _: NullPointerException ⇒ Restart
case _: Exception ⇒ Escalate
}
val worker = context.actorOf(Props[Worker])
def receive = {
case n: Int => worker forward n
}
}
class Supervisor extends UntypedActor {
private SupervisorStrategy strategy = new OneForOneStrategy(
10, Duration.parse("1 minute"), new Function() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) return resume();
else if (t instanceof NullPointerException) return restart();
else return escalate();
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
ActorRef worker = context.actorOf(new Props(Worker.class));
public void onReceive(Object message) throws Exception {
if (message instanceof Integer) worker.forward(message, getContext());
}
}