Akka (Actor)
https://akka.io/
基本用法 创建、传递消息
创建
import akka.actor.AbstractActor;
import akka.actor.Props;
import java.time.Duration;
public class DemoRev extends AbstractActor {
public DemoRev(){
// 设置接收消息的超时时间
getContext().setReceiveTimeout(Duration.ofSeconds(10));
}
/**
* 用于创建ActorRef
**/
public static Props props(){
return Props.create(DemoRev.class, DemoRev::new);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class ,//如果是string类型
r->{
System.out.println("rev :" + r);
getSender().tell("rev done",getSelf());
}).match(Integer.class, // 如果是Interger类型
r->{
getSender().tell("give me more!",getSelf());
}).matchAny(a->{ // 其他
System.out.println("any");
}).build();
}
}
传递消息
ActorSystem system = ActorSystem.create("linux");
// 创建
ActorRef p1 = system.actorOf(DemoRev.props());
ActorRef s1 = system.actorOf(DemoSend.props());
// s1 -> p1
p1.tell("hello",s1);
system.terminate();
Inbox 消息
ActorSystem system = ActorSystem.create("linux");
ActorRef p1 = system.actorOf(DemoRev.props());
final Inbox inbox = Inbox.create(system);
// inbox也是一个actor
inbox.send(p1,"hello");
System.out.println(inbox.receive(Duration.ofSeconds(1)));
system.terminate();
周期性消息
import akka.actor.AbstractActorWithTimers;
import java.time.Duration;
public class DemoTimer extends AbstractActorWithTimers {
private static Object TICK_KEY = "TickKey";
private static final class FirstTick {}
private static final class Tick {}
public DemoTimer(){
// 相当于settimeout
getTimers().startSingleTimer(TICK_KEY,new FirstTick(), Duration.ofMillis(500));
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
FirstTick.class,
message -> {
// 周期执行
getTimers().startPeriodicTimer(TICK_KEY,new Tick(),Duration.ofSeconds(1));
}
).match(Tick.class,message -> {
System.out.println(message);
}).build();
}
}
生命周期
import akka.actor.AbstractActor;
import akka.actor.Props;
public class StartStopActor1 extends AbstractActor {
static Props props() {
return Props.create(StartStopActor1.class, StartStopActor1::new);
}
// 启动hock
@Override
public void preStart() throws Exception {
System.out.printf("start %s \n",getSelf().path().toSerializationFormat());
getContext().actorOf(StartStopActor2.props(),"second");
}
// 停止hock
@Override
public void postStop() throws Exception {
System.out.printf("stop %s \n",getSelf().path().toSerializationFormat());
}
/*
也可以用信号停止
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("stop",s->{
getContext().stop(getSelf());
}).build();
}
}
Receive
// 支持动态改变receive方法
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"init",
m1 -> {
initializeMe = "Up and running";
getContext()
.become(
receiveBuilder()
.matchEquals(
"U OK?",
m2 -> {
getSender().tell(initializeMe, getSelf());
})
.build());
})
.build();
ask , pipie
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import scala.Tuple2;
import tech.realcpf.sendrev.DemoRev;
import tech.realcpf.sendrev.DemoSend;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
public class AskDemo {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("sys");
ActorRef actorA = system.actorOf(DemoRev.props());
ActorRef actorB = system.actorOf(DemoRev.props());
ActorRef actorC = system.actorOf(DemoRev.props());
CompletableFuture<Object> future1 =
ask(actorA,"hi A", Duration.ofMillis(1000)).toCompletableFuture();
CompletableFuture<Object> future2 =
ask(actorB,"hi B", Duration.ofMillis(1000)).toCompletableFuture();
CompletableFuture<Tuple2<String,String>> transformed =
CompletableFuture.allOf(future1,future2)
.thenApply(v->{
String x = (String) future1.join();
String s = (String) future2.join();
return new Tuple2(x,s);
});
pipe(transformed,system.dispatcher()).to(actorC);
system.terminate();
}
}