Understanding Actor Model by Examples

1
2
3
4
5
6
7
8
9
10
11
12
13
14

/**
* @program: akka
* @description: Actor Command
* @author: CJ Sun
* @create: 2015-05-01 12:10
**/
public enum ActorCommand {
HeartBeat,
HeartBeatOK,

ExecuteTask,
TaskDone
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import akka.actor.*;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* @program: akka
* @description: Server
* @author: CJ Sun
* @create: 2015-05-01 12:10
**/

class ServerActor extends UntypedActor {

@Override
public void onReceive(Object message) throws Exception {
if (message == ActorCommand.HeartBeat) {
System.out.println("Heart beat request from " + getSender().path().address().host().get());
getSender().tell(ActorCommand.HeartBeatOK, getSelf());
}

if (message == ActorCommand.TaskDone) {
System.out.println(getSender().path().address().host().get() + " Task Done!");
getContext().stop(getSelf());
}
}
}

public class Server {

static class ServerActorTask implements Runnable {
ActorSystem actorSystem;
ActorSelection remoteActor;

public ServerActorTask(ActorSystem actorSystem, ActorSelection remoteActor) {
this.actorSystem = actorSystem;
this.remoteActor = remoteActor;
}

@Override
public void run() {
ActorRef serverActor = actorSystem.actorOf(Props.create(ServerActor.class));
System.out.println("Send task request");
remoteActor.tell(ActorCommand.ExecuteTask, serverActor);
}
}

public static void main(String[] args) {
Config config = ConfigFactory.load("master-application.conf");

final ActorSystem actorSystem = ActorSystem.create("ServerSystem", config.getConfig("ServerSys"));
actorSystem.actorOf(Props.create(ServerActor.class), "serverActor");

String hostname = "127.0.0.1";
int port = 2553;

String uri = "akka.tcp://ServerSystem@" + hostname + ":" + port + "/user/serverActor";
final ActorSelection remoteActor = actorSystem.actorSelection(uri);

FiniteDuration dur1 = Duration.create(5, SECONDS);
FiniteDuration dur2 = Duration.create(5, SECONDS);

// actorSystem.scheduler().schedule(dur1, dur2, new ServerActorTask(actorSystem, remoteActor), actorSystem.dispatcher());

actorSystem.scheduler().schedule(Duration.create(10, SECONDS), Duration.create(10, SECONDS),
new Runnable() {
public void run() {
final ActorRef actor = actorSystem.actorOf(Props.create(ServerActor.class));
remoteActor.tell(ActorCommand.ExecuteTask, actor);
}
}, actorSystem.dispatcher());

try {
Thread.sleep(100000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import akka.actor.*;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* @program: akka
* @description: Agent
* @author: CJ Sun
* @create: 2015-05-01 12:10
**/

class AgentActor extends UntypedActor {

@Override
public void onReceive(Object message) {
if (message == ActorCommand.HeartBeatOK) {
System.out.println("Heart beat response from " + getSender().path().address().host().get());
getContext().stop(getSelf());
}

if (message == ActorCommand.ExecuteTask) {
System.out.println("Executing Task!");
getSender().tell(ActorCommand.TaskDone, getSelf());
}
}
}

public class Agent {

static class AgentActorTask implements Runnable {

ActorSystem actorSystem;
ActorSelection remoteActor;

public AgentActorTask(ActorSystem actorSystem, ActorSelection remoteActor) {
this.actorSystem = actorSystem;
this.remoteActor = remoteActor;
}

@Override
public void run() {
ActorRef clientActor = actorSystem.actorOf(Props.create(AgentActor.class));
remoteActor.tell(ActorCommand.HeartBeat, clientActor);
}
}

public static void main(String[] args) {
Config config = ConfigFactory.load("agent-application.conf");

ActorSystem actorSystem = ActorSystem.create("ServerSystem", config.getConfig("ServerSys"));
actorSystem.actorOf(Props.create(AgentActor.class), "serverActor");

String hostname = "127.0.0.1";
int port = 2552;

String uri = "akka.tcp://ServerSystem@" + hostname + ":" + port + "/user/serverActor";
ActorSelection remoteActor = actorSystem.actorSelection(uri);

FiniteDuration dur1 = Duration.create(5, SECONDS);
FiniteDuration dur2 = Duration.create(5, SECONDS);

actorSystem.scheduler().schedule(dur1, dur2, new AgentActorTask(actorSystem, remoteActor), actorSystem.dispatcher());

try {
Thread.sleep(100000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}