Rust 中的套接字操作推荐方式是使用 std/tokio socket 并配合 socket2 的 SockRef 进行使用:

let listner = Arc::new(tokio::net::TcpListener::bind(addr.clone()).await?);
let sck_ref: socket2::SockRef = (&listner).into();
sck_ref.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
sck_ref.set_reuse_port(true)?;

多发多播套接字

根据 Zmq 的 Socket 接口,我们可以模拟出来一个多发多收的 socket:

use std::sync::Arc;

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    sync::Mutex,
};

#[derive(Debug)]
pub enum TcpSocketOption {
    Ttl(u32),
}

#[derive(Debug)]
enum SocketInner {
    Stream(tokio::net::TcpStream),
    Listener(Arc<tokio::net::TcpListener>, async_channel::Sender<Vec<u8>>),
    None,
}

impl SocketInner {
    fn get_ref(&self) -> socket2::SockRef {
        match self {
            SocketInner::Stream(sck) => sck.into(),
            SocketInner::Listener(sck, _) => sck.into(),
            SocketInner::None => panic!("There has no a socket inside {self:?}"),
        }
    }

    fn set_option(&self, option: TcpSocketOption) -> anyhow::Result<()> {
        let sck_ref = self.get_ref();

        match option {
            TcpSocketOption::Ttl(ttl) => sck_ref.set_ttl(ttl)?,
        };

        Ok(())
    }
}

#[derive(Debug)]
pub struct AsyncTcpSocket {
    inner: Mutex<SocketInner>,
}

impl AsyncTcpSocket {
    pub fn new() -> Self {
        Self {
            inner: Mutex::new(SocketInner::None),
        }
    }

    pub async fn listen(&self, addr: String) -> anyhow::Result<()> {
        let listner = Arc::new(tokio::net::TcpListener::bind(addr.clone()).await?);
        let sck_ref: socket2::SockRef = (&listner).into();
        sck_ref.set_reuse_address(true)?;
        #[cfg(not(target_os = "windows"))]
        sck_ref.set_reuse_port(true)?;

        let (tx, rx) = async_channel::unbounded::<Vec<u8>>();

        let sub_listener = Arc::clone(&listner);
        tokio::spawn(
            async move {
                loop {
                    let Ok((mut stream, addr)) = sub_listener.accept().await else {
                        break;
                    };
                    let rx = rx.clone();
                    tokio::spawn(
                        async move {
                            loop {
                                if let Ok(data) = rx.recv().await {
                                    match stream.write_all(&data).await {
                                        Ok(_) => {}
                                        Err(_) => break,
                                    }
                                }
                            }
                        }
                    );
                }
            }
        );

        let mut inner = self.inner.lock().await;
        assert!(matches!(*inner, SocketInner::None));

        *inner = SocketInner::Listener(listner, tx);

        Ok(())
    }

    pub async fn connect(&self, addr: String) -> anyhow::Result<()> {
        let sck = tokio::net::TcpStream::connect(addr).await?;
        let sck_ref: socket2::SockRef = (&sck).into();
        sck_ref.set_reuse_address(true)?;
        #[cfg(not(target_os = "windows"))]
        sck_ref.set_reuse_port(true)?;

        let mut inner = self.inner.lock().await;
        assert!(matches!(*inner, SocketInner::None));

        *inner = SocketInner::Stream(sck);

        Ok(())
    }

    pub async fn set_option(&self, option: TcpSocketOption) -> anyhow::Result<()> {
        self.inner.lock().await.set_option(option)
    }

    pub async fn send(&self, msg: Vec<u8>) -> anyhow::Result<()> {
        match *self.inner.lock().await {
            SocketInner::Listener(_, ref tx) => Ok(tx.send(msg).await?),
            ref var => panic!("Only TcpListener can send msg, but there is {var:?}"),
        }
    }

    pub async fn read(&self, buffer: &mut [u8]) -> anyhow::Result<usize> {
        match *self.inner.lock().await {
            SocketInner::Stream(ref mut stream) => Ok(stream.read(buffer).await?),
            ref var => panic!("Only TcpStream can receive msg, but there is {var:?}",),
        }
    }

    pub async fn read_exact(&self, buffer: &mut [u8]) -> anyhow::Result<usize> {
        match *self.inner.lock().await {
            SocketInner::Stream(ref mut stream) => Ok(stream.read_exact(buffer).await?),
            ref var => panic!("Only TcpStream can receive msg, but there is {var:?}",),
        }
    }
}

impl Default for AsyncTcpSocket {
    fn default() -> Self {
        Self::new()
    }
}
Last moify: 2023-08-21 09:07:06
Build time:2025-07-18 09:41:42
Powered By asphinx