Last active
January 17, 2020 22:09
-
-
Save kalexmills/97cb8bb4828a93971c0678ebf27d4cef to your computer and use it in GitHub Desktop.
Rust: asyncronous CBOR channels over raw TCP using tokio-serde and serde_cbor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Asynchronous CBOR message traffic via tokio / serde. More at https://cbor.io | |
* | |
* Test via `cargo run` and, in a separate terminal test using these messages. | |
* | |
* $ echo '00000017A2616364526561646161826568656C6C6F65776F726C64' | xxd -r -p | nc 127.0.0.1 6142 | |
* $ echo '0000000FA1616D6B68656C6C6F20776F726C64' | xxd -r -p | nc 127.0.0.1 6142 | |
* | |
* First 8 bytes are the length of the data frame, rest is the CBOR message. | |
*/ | |
use futures::*; | |
use serde::{Deserialize, Serialize}; | |
use tokio::net::{tcp, TcpListener, TcpStream}; | |
use tokio_serde::SymmetricallyFramed; | |
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let addr = "127.0.0.1:6142"; | |
let mut listener: TcpListener = TcpListener::bind(addr).await.unwrap(); | |
let server = async move { | |
let mut incoming = listener.incoming(); | |
while let Some(socket_res) = incoming.next().await { | |
match socket_res { | |
Err(err) => println!("accept error = {:?}", err), | |
Ok(mut sock) => { | |
tokio::spawn(async move { | |
// split the socket and wrap the read / write halves in the protocol stack. | |
let (mut w_in, mut w_out) = framed_cbor_split::<Protocol>(&mut sock); | |
while let Some(msg) = w_in.try_next().await.unwrap() { | |
println!("Got: {:?}", msg); | |
w_out.send(msg).await.unwrap(); | |
} | |
}); | |
} | |
} | |
} | |
}; | |
println!("Server running on localhost:6142"); | |
// Start the server and block this async fn until `server` spins down. | |
server.await; | |
Ok(()) | |
} | |
/** | |
* Protocol is a simple Rust enum describing possible messages crossing the channel. | |
*/ | |
#[derive(Debug, Deserialize, Serialize)] | |
#[serde(untagged)] // GOTCHA: serde(untagged) avoids extra fields but implies each message must have a unique shape | |
enum Protocol { | |
Chat { | |
#[serde(rename = "m")] // single byte names for serialization; nice names for code. | |
msg: String, | |
}, | |
Command { | |
#[serde(rename = "c")] | |
cmd: Command, | |
#[serde(rename = "a")] | |
args: Vec<String>, | |
}, | |
} | |
#[derive(Debug, Deserialize, Serialize)] | |
enum Command { | |
// TODO: currently serialized as strings. Can be made even more compact using serde(rename) as above. | |
Read, | |
Write, | |
Panic, | |
} | |
/** | |
* FramedCborRead is a wrapped socket that knows how to read length-framed CBOR values of type T | |
* serialized using Serde. | |
*/ | |
type FramedCborRead<'a, T> = | |
SymmetricallyFramed<FramedRead<tcp::ReadHalf<'a>, LengthDelimitedCodec>, T, Cbor<T, T>>; | |
type FramedCborWrite<'a, T> = | |
SymmetricallyFramed<FramedWrite<tcp::WriteHalf<'a>, LengthDelimitedCodec>, T, Cbor<T, T>>; | |
/** | |
* framed_cbor_split wraps a TcpStream, producing a pair of wrapped protocols for reading and writing CBOR | |
* values of type T | |
*/ | |
fn framed_cbor_split<'de, T: Deserialize<'de> + Serialize>( | |
sock: &'de mut TcpStream, | |
) -> (FramedCborRead<T>, FramedCborWrite<T>) { | |
let (read, write): (tcp::ReadHalf, tcp::WriteHalf) = sock.split(); | |
(framed_cbor_read::<T>(read), framed_cbor_write::<T>(write)) | |
} | |
/** | |
* framed_cbor_read wraps a TcpStream, producing a framed protocol suitable for reading CBOR values of type T. | |
*/ | |
fn framed_cbor_read<'de, T: Deserialize<'de>>(sock: tcp::ReadHalf) -> FramedCborRead<T> { | |
let length_delimited = FramedRead::new(sock, LengthDelimitedCodec::new()); | |
tokio_serde::SymmetricallyFramed::new(length_delimited, SymmetricalCbor::<T>::default()) | |
} | |
/** | |
* framed_cbor_write wraps a TcpStream, producing a framed protocol suitable for writing CBOR values of type T. | |
*/ | |
fn framed_cbor_write<T: Serialize>(sock: tcp::WriteHalf) -> FramedCborWrite<T> { | |
let length_delimited = FramedWrite::new(sock, LengthDelimitedCodec::new()); | |
tokio_serde::SymmetricallyFramed::new(length_delimited, SymmetricalCbor::<T>::default()) | |
} | |
// *** below is all tokio-serde boilerplate for setting up CBOR codecs | |
use bytes::{buf::BufExt, Bytes, BytesMut}; | |
use derivative::Derivative; | |
use std::{io, marker::PhantomData, pin::Pin}; | |
use tokio_serde::{Deserializer, Serializer}; | |
type SymmetricalCbor<Item> = Cbor<Item, Item>; | |
#[derive(Derivative)] | |
#[derivative(Default(bound = ""))] | |
struct Cbor<Item, SinkItem> { | |
ghost: PhantomData<(Item, SinkItem)>, | |
} | |
impl<Item, SinkItem> Deserializer<Item> for Cbor<Item, SinkItem> | |
where | |
for<'a> Item: Deserialize<'a>, | |
{ | |
type Error = io::Error; | |
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<Item, Self::Error> { | |
Ok(serde_cbor::from_reader(std::io::Cursor::new(src).reader()) | |
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?) | |
} | |
} | |
impl<Item, SinkItem> Serializer<SinkItem> for Cbor<Item, SinkItem> | |
where | |
SinkItem: Serialize, | |
{ | |
type Error = io::Error; | |
fn serialize(self: Pin<&mut Self>, src: &SinkItem) -> Result<Bytes, Self::Error> { | |
Ok(serde_cbor::to_vec(src) | |
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? | |
.into()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment