Actor 创建并发送和接收消息
struct MyActor{
count:usize
}
///
/// Actor
impl Actor for MyActor {
// 每个actor都有一个context
type Context = Context<Self>;
}
#[derive(Message)]
#[rtype(result = "usize")]
struct Ping(usize);
impl Handler<Ping> for MyActor {
type Result = usize;
///
/// 接受Ping类型的消息 然后返回usize
fn handle(&mut self, msg: Ping, ctx: &mut Self::Context) -> Self::Result {
self.count += msg.0;
self.count
}
}
发送,接收处理
#[actix::test]
async fn test1(){
// 开启新的actor并且返回地址也就近似于akka 中的 ActorRef
let addr = MyActor { count:10}.start();
// send 然后handler处理返回
let res = addr.send(Ping(10)).await;
print!("Res : {}\n",res.unwrap());
let id = System::current().id();
print!("id:{} will stop",id);
System::current().stop();
}
生命周期函数
///
/// 生命周期有
/// + Started
/// + Running
/// + Stopping
/// + Stopped
///
/// 重写生命周期函数started,stopped
impl Actor for MineActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("started");
}
fn stopped(&mut self, ctx: &mut Self::Context) {
println!("stopped")
}
}
可Response的Message
///
/// 为了可以返回Responses 我们为Responses实现MessageResponse
impl<A,M> MessageResponse<A,M> for Responses
where A:Actor,
M:Message<Result = Responses> {
fn handle(self, ctx: &mut <A as Actor>::Context, tx: Option<actix::dev::OneshotSender<<M as Message>::Result>>) {
if let Some(tx) = tx {
tx.send(self);
}
}
}
两个Actor互相发的结构
use actix::prelude::*;
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "()")]
struct Ping {
pub id: usize,
}
struct Game {
counter: usize,
name: String,
// 给其他actor发送
recipient: Recipient<Ping>,
}
impl Actor for Game {
type Context = Context<Game>;
}
impl Handler<Ping> for Game {
type Result = ();
fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) {
self.counter += 1;
if self.counter > 10 {
System::current().stop();
} else {
println!("[{0}] Ping received {1}", self.name, msg.id);
ctx.run_later(Duration::new(0, 100), move |act, _| {
// 给recipient发 在这个例子里就是 另一个Game Actor
act.recipient.do_send(Ping { id: msg.id + 1 });
});
}
}
}
示例互啄术
///
/// game 互啄
fn main() {
let mut system = System::new();
let addr = system.block_on(async {
Game::create(|ctx| {
// game1 的 addr
let addr = ctx.address();
// game2
let addr2 = Game {
counter: 0,
name: String::from("Game 2"),
// game1 的 recipient
recipient: addr.recipient(),
}
.start();
// game2 先发送
addr2.do_send(Ping { id: 10 });
Game {
counter: 0,
name: String::from("Game 1"),
recipient: addr2.recipient(),
}
});
});
system.run();
}
Arbiter
let sys = System::new();
let exec = async {
TheActor.start();
};
// 使用Arbiter管理Actor
let arbiter = Arbiter::new();
Arbiter::spawn(&arbiter, exec);
System::current().stop();
sys.run();
SyncArbiter
use actix::prelude::*;
struct MySyncActor;
impl Actor for MySyncActor {
type Context = SyncContext<Self>;
}
// 线程数2则可以有同时两个Actor在处理
let addr = SyncArbiter::start(2, || MySyncActor);