Created
September 14, 2024 04:56
-
-
Save sandeshbhusal/d500f60d1b2178016e7db8460f0fae8b to your computer and use it in GitHub Desktop.
Simple actor framework definition in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
future::Future, | |
io::{Error, Read, Write}, | |
net::UdpSocket, | |
}; | |
use anyhow::Result; | |
use async_channel::{Receiver, Sender}; | |
use serde::{Deserialize, Serialize}; | |
use tokio::{select, task::JoinHandle}; | |
#[derive(Debug, Serialize, Deserialize)] | |
enum Message { | |
Normal(String), | |
HeartBeat, | |
ShutDown, | |
} | |
enum MessageReply { | |
Ok, | |
Fail(Error), | |
NotImplemented, | |
} | |
enum Transport { | |
Udp(UdpSocket), | |
File(std::fs::File), | |
AsyncChannel { | |
sender: Sender<Message>, | |
receiver: Receiver<Message>, | |
}, | |
} | |
struct MailBox { | |
in_box: Transport, | |
out_box: Option<Transport>, | |
} | |
impl MailBox { | |
async fn send_message(&mut self, message: Message) { | |
if let Some(out_box) = &mut self.out_box { | |
match out_box { | |
Transport::Udp(socket) => { | |
socket | |
.send(bincode::serialize(&message).unwrap().as_ref()) | |
.unwrap(); | |
} | |
Transport::File(file) => { | |
file.write_all(bincode::serialize(&message).unwrap().as_ref()) | |
.unwrap(); | |
} | |
Transport::AsyncChannel { sender, .. } => { | |
sender.send(message).await.unwrap(); | |
} | |
} | |
} else { | |
println!("No outbox found for module"); | |
} | |
} | |
async fn receive_message(&mut self) -> Result<Option<Message>> { | |
let message = match &mut self.in_box { | |
Transport::Udp(socket) => { | |
let mut buf = [0; 1024]; | |
let (size, _) = socket.recv_from(&mut buf)?; | |
if size == 0 { | |
None | |
} else { | |
let message = std::str::from_utf8(&buf[..size])?; | |
Some(Message::Normal(message.to_string())) | |
} | |
} | |
// Do stuff like inotify. | |
Transport::File(ref mut file) => { | |
let mut buf = String::new(); | |
file.read_to_string(&mut buf)?; | |
Some(Message::Normal(buf)) | |
} | |
Transport::AsyncChannel { receiver, .. } => { | |
Some(receiver.recv().await.expect("Async failed")) | |
} | |
}; | |
Ok(message) | |
} | |
} | |
trait Actor: Send + 'static { | |
async fn run(self, mut mailbox: MailBox) -> JoinHandle<()> | |
where | |
Self: Sized, | |
{ | |
tokio::spawn(async move { | |
let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(1)); | |
loop { | |
select! { | |
message = mailbox.receive_message() => { | |
if let Ok(Some(message)) = message { | |
let _ = self.handle_message(message, &mut mailbox).await; | |
} else { | |
// We either got an empty message, or an error. | |
// Send an error to outbox. | |
mailbox.send_message(Message::ShutDown).await; | |
} | |
}, | |
_ = ticker.tick() => { | |
mailbox.send_message(Message::HeartBeat).await; | |
} | |
} | |
} | |
}) | |
} | |
fn handle_message( | |
&self, | |
message: Message, | |
mailbox: &mut MailBox, | |
) -> impl Future<Output = MessageReply> + Send; | |
} | |
struct Ping; | |
impl Actor for Ping { | |
async fn handle_message(&self, message: Message, mailbox: &mut MailBox) -> MessageReply { | |
match message { | |
Message::Normal(message) => { | |
println!("Received message: {}", message); | |
std::io::stdout().flush().unwrap(); | |
// Send a reply back. | |
mailbox | |
.send_message(Message::Normal("Ping".to_string())) | |
.await; | |
} | |
Message::HeartBeat => { | |
return MessageReply::NotImplemented; | |
} | |
Message::ShutDown => { | |
return MessageReply::NotImplemented; | |
} | |
} | |
MessageReply::Ok | |
} | |
} | |
struct Pong; | |
impl Actor for Pong { | |
async fn handle_message(&self, message: Message, mailbox: &mut MailBox) -> MessageReply { | |
match message { | |
Message::Normal(message) => { | |
println!("Received message: {}", message); | |
std::io::stdout().flush().unwrap(); | |
// Send a reply back. | |
mailbox | |
.send_message(Message::Normal("Pong".to_string())) | |
.await; | |
} | |
Message::HeartBeat => { | |
return MessageReply::NotImplemented; | |
} | |
Message::ShutDown => { | |
return MessageReply::NotImplemented; | |
} | |
} | |
MessageReply::Ok | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let (tx, rx) = async_channel::unbounded(); | |
let (tx2, rx2) = async_channel::unbounded(); | |
let mailbox = MailBox { | |
in_box: Transport::AsyncChannel { | |
sender: tx.clone(), | |
receiver: rx.clone(), | |
}, | |
out_box: Some(Transport::AsyncChannel { | |
sender: tx2.clone(), | |
receiver: rx2.clone(), | |
}), | |
}; | |
let pong_mailbox = MailBox { | |
in_box: Transport::AsyncChannel { | |
sender: tx2.clone(), | |
receiver: rx2.clone(), | |
}, | |
out_box: Some(Transport::AsyncChannel { | |
sender: tx.clone(), | |
receiver: rx.clone(), | |
}), | |
}; | |
let ping = Ping; | |
let pong = Pong; | |
let jh = ping.run(mailbox); | |
let kh = pong.run(pong_mailbox); | |
// Ping should receive one message :) | |
let send = tx.send(Message::Normal("Hello!".to_string())).await.unwrap(); | |
println!("Starting.."); | |
let _ = tokio::join!(jh, kh); | |
loop{} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment