Rust 中的套接字操作推荐方式是使用 std/tokio socket 并配合 socket2 的 SockRef 进行使用:
let listner = Arc::new(tokio::net::TcpListener::bind(addr.clone()).await?);
let sck_ref: socket2::SockRef = (&listner).into();
sck_ref.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
sck_ref.set_reuse_port(true)?;
多发多播套接字
根据 Zmq 的 Socket 接口,我们可以模拟出来一个多发多收的 socket:
use std::sync::Arc;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::Mutex,
};
#[derive(Debug)]
pub enum TcpSocketOption {
Ttl(u32),
}
#[derive(Debug)]
enum SocketInner {
Stream(tokio::net::TcpStream),
Listener(Arc<tokio::net::TcpListener>, async_channel::Sender<Vec<u8>>),
None,
}
impl SocketInner {
fn get_ref(&self) -> socket2::SockRef {
match self {
SocketInner::Stream(sck) => sck.into(),
SocketInner::Listener(sck, _) => sck.into(),
SocketInner::None => panic!("There has no a socket inside {self:?}"),
}
}
fn set_option(&self, option: TcpSocketOption) -> anyhow::Result<()> {
let sck_ref = self.get_ref();
match option {
TcpSocketOption::Ttl(ttl) => sck_ref.set_ttl(ttl)?,
};
Ok(())
}
}
#[derive(Debug)]
pub struct AsyncTcpSocket {
inner: Mutex<SocketInner>,
}
impl AsyncTcpSocket {
pub fn new() -> Self {
Self {
inner: Mutex::new(SocketInner::None),
}
}
pub async fn listen(&self, addr: String) -> anyhow::Result<()> {
let listner = Arc::new(tokio::net::TcpListener::bind(addr.clone()).await?);
let sck_ref: socket2::SockRef = (&listner).into();
sck_ref.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
sck_ref.set_reuse_port(true)?;
let (tx, rx) = async_channel::unbounded::<Vec<u8>>();
let sub_listener = Arc::clone(&listner);
tokio::spawn(
async move {
loop {
let Ok((mut stream, addr)) = sub_listener.accept().await else {
break;
};
let rx = rx.clone();
tokio::spawn(
async move {
loop {
if let Ok(data) = rx.recv().await {
match stream.write_all(&data).await {
Ok(_) => {}
Err(_) => break,
}
}
}
}
);
}
}
);
let mut inner = self.inner.lock().await;
assert!(matches!(*inner, SocketInner::None));
*inner = SocketInner::Listener(listner, tx);
Ok(())
}
pub async fn connect(&self, addr: String) -> anyhow::Result<()> {
let sck = tokio::net::TcpStream::connect(addr).await?;
let sck_ref: socket2::SockRef = (&sck).into();
sck_ref.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
sck_ref.set_reuse_port(true)?;
let mut inner = self.inner.lock().await;
assert!(matches!(*inner, SocketInner::None));
*inner = SocketInner::Stream(sck);
Ok(())
}
pub async fn set_option(&self, option: TcpSocketOption) -> anyhow::Result<()> {
self.inner.lock().await.set_option(option)
}
pub async fn send(&self, msg: Vec<u8>) -> anyhow::Result<()> {
match *self.inner.lock().await {
SocketInner::Listener(_, ref tx) => Ok(tx.send(msg).await?),
ref var => panic!("Only TcpListener can send msg, but there is {var:?}"),
}
}
pub async fn read(&self, buffer: &mut [u8]) -> anyhow::Result<usize> {
match *self.inner.lock().await {
SocketInner::Stream(ref mut stream) => Ok(stream.read(buffer).await?),
ref var => panic!("Only TcpStream can receive msg, but there is {var:?}",),
}
}
pub async fn read_exact(&self, buffer: &mut [u8]) -> anyhow::Result<usize> {
match *self.inner.lock().await {
SocketInner::Stream(ref mut stream) => Ok(stream.read_exact(buffer).await?),
ref var => panic!("Only TcpStream can receive msg, but there is {var:?}",),
}
}
}
impl Default for AsyncTcpSocket {
fn default() -> Self {
Self::new()
}
}