Skip to content
On this page

这部分主要说的是服务器端对于来自client连接的数据的处理. 主要功能包括

  1. 接收消息
  2. 收到sub消息,就记录到全局列表中
  3. 收到pub消息,就发送给相关订阅的client
  4. 出错,删除订阅,关闭连接

数据结构定义

Client中除了cid以外,其他两项都使用了Mutex进行保护,上一篇讲到过,凡是多线程读写的都需要Arc<Mutex>保护.

  • srv: 主要还是pub sub的时候都需要访问全局的sublist.
  • msg_sender: 之所以用Mutex保护是因为除了client自己要发送消息,当其他client pub消息的时候也要通过这个ClientMessageSender发送消息 ClientMessageSender在我们这个版本中则非常简单,就是一个TcpStream的writer.
rust
#[derive(Debug)]
pub struct Client<T: SubListTrait> {
    pub srv: Arc<Mutex<ServerState<T>>>,
    pub cid: u64,
    pub msg_sender: Arc<Mutex<ClientMessageSender>>,
}
#[derive(Debug)]
pub struct ClientMessageSender {
    writer: WriteHalf<TcpStream>,
}

代码实现

process_connection

  • 创建Client以及可以共享使用的ClientMessageSender
  • 启动client_task
rust
 impl<T: SubListTrait + Send + 'static> Client<T> {

    pub fn process_connection(
        cid: u64,
        srv: Arc<Mutex<ServerState<T>>>,
        conn: TcpStream,
    ) -> Arc<Mutex<ClientMessageSender>> {
        let (reader, writer) = tokio::io::split(conn);
        let msg_sender = Arc::new(Mutex::new(ClientMessageSender::new(writer)));
        let c = Client {
            srv: srv,
            cid,
            msg_sender: msg_sender.clone(),
        };
        tokio::spawn(Client::client_task(c, reader));
        msg_sender
    }
     
    ...
 }

client_task

主要功能:

  • 读取,解析消息
  • 分发消息给相应的处理函数
    • process_error
    • process_sub
    • process_pub

这个其实就是一个tcp连接的主循环,说到这里我想把tokio::spawn 和 go语言中的go关键字做一个类比. 在go中TcpServer接收到一个连接以后,紧接着就是单独起一个goroutine来处理.类似于go client.processConnection(),而到了Rust中基本上可以等价为

rust
tokio::spawn(async move{
 Client::process_connection();
});

当然Rust重要复杂很多,涉及到所有权,生命周期等一系列问题.

rust
 async fn client_task(self, mut reader: ReadHalf<TcpStream>) {
        let mut buf = [0; 1024];
        let mut parser = Parser::new();
        let mut subs = HashMap::new();
        loop {
            let r = reader.read(&mut buf[..]).await;
            if r.is_err() {
                let e = r.unwrap_err();
                self.process_error(e, subs).await;
                return;
            }
            let r = r.unwrap();
            let n = r;
            if n == 0 {
                self.process_error(NError::new(ERROR_CONNECTION_CLOSED), subs)
                    .await;
                return;
            }
            let mut buf = &buf[0..n];
            loop {
                let r = parser.parse(&buf[..]);
                if r.is_err() {
                    self.process_error(r.unwrap_err(), subs).await;
                    return;
                }
                let (result, left) = r.unwrap();

                match result {
                    ParseResult::NoMsg => {
                        break;
                    }
                    ParseResult::Sub(ref sub) => {
                        if let Err(e) = self.process_sub(sub, &mut subs).await {
                            self.process_error(e, subs).await;
                            return;
                        }
                    }
                    ParseResult::Pub(ref pub_arg) => {
                        if let Err(e) = self.process_pub(pub_arg).await {
                            self.process_error(e, subs).await;
                            return;
                        }
                    }
                }
                if left == buf.len() {
                    break;
                }
                buf = &buf[left..];
            }
        }
 }

从整个代码中也可以看出client_task的主要工作就是接受消息,并处理.

process_error

  1. 删除所有订阅
  2. 关闭连接
rust
 async fn process_error<E: Error>(&self, 
    err: E, subs: HashMap<String, ArcSubscription>) {
        println!("client {} process err {:?}", self.cid, err);
        {
            let mut sublist = &mut self.srv.lock().await.sublist;
            for (_, sub) in subs {
                sublist.remove(sub);
            }
        }
        let r = self.msg_sender.lock().await.writer.shutdown().await;
        if r.is_err() {
            println!("shutdown err {:?}", r.unwrap_err());
        }
    }

process_sub

对于收到的sub则是

  1. 全局订阅列表中保存一份
  2. 本地连接保存一份,这样连接断开的时候好删除 为了避免内存分配,我们的SubArg里面使用的还是Parer缓冲区中的内存,当我们需要在连接之外访问这些信息的时候,我们就必须单独保存一份了,这里我们用的是sub.subject.to_string()来分配一个新的内存.
rust
 async fn process_sub(
        &self,
        sub: &SubArg<'_>,
        subs: &mut HashMap<String, ArcSubscription>,
    ) -> crate::error::Result<()> {
    let sub = Subscription {
            subject: sub.subject.to_string(),
            queue: sub.queue.map(|q| q.to_string()),
            sid: sub.sid.to_string(),
            msg_sender: self.msg_sender.clone(),
        };
        let sub = Arc::new(sub);
        subs.insert(sub.subject.clone(), sub.clone());
        let sublist = &mut self.srv.lock().await.sublist;
        sublist.insert(sub);
        Ok(())
    }

process_pub

收到pub消息,

  1. 查找所有的订阅
  2. 将消息逐一转发给他们 转发的过程中要稍微麻烦一点,因为考虑到设计中的负载均衡问题,qsubs则是从同一个queue中随机选择一个来推送消息.
rust
async fn process_pub(&self, pub_arg: &PubArg<'_>) -> crate::error::Result<()> {
    let sub_result = {
            let sub_list = &mut self.srv.lock().await.sublist;
            sub_list.match_subject(pub_arg.subject)?
        };
        for sub in sub_result.subs.iter() {
            self.send_message(sub.as_ref(), pub_arg)
                .await
                .map_err(|e| NError::new(ERROR_CONNECTION_CLOSED))?;
        }
        //qsubs 要考虑负载均衡问题
        let mut rng = rand::rngs::StdRng::from_entropy();
        for qsubs in sub_result.qsubs.iter() {
            let n = rng.next_u32();
            let n = n as usize % qsubs.len();
            let sub = qsubs.get(n).unwrap();
            self.send_message(sub.as_ref(), pub_arg)
                .await
                .map_err(|e| NError::new(ERROR_CONNECTION_CLOSED))?;
        }
        Ok(())
}

send_message

就是拼装消息格式 因为是第一个版本,也是展示关键api的使用,里面用到了大量的await,实际上没有必要. 实际项目中,肯定会使用缓冲区来做.

rust
///消息格式
///```
/// MSG <subject> <sid> <size>\r\n
/// <message>\r\n
/// ```
async fn send_message(&self, sub: &Subscription, pub_arg: &PubArg<'_>) -> std::io::Result<()> {
    let writer = &mut sub.msg_sender.lock().await.writer;
    writer.write("MSG ".as_bytes()).await?;
    writer.write(sub.subject.as_bytes()).await?;
    writer.write(" ".as_bytes()).await?;
    writer.write(sub.sid.as_bytes()).await?;
    writer.write(" ".as_bytes()).await?;
    writer.write(pub_arg.size_buf.as_bytes()).await?;
    writer.write("\r\n".as_bytes()).await?;
    writer.write(pub_arg.msg).await?;
    writer.write("\r\n".as_bytes()).await?;
    Ok(())
}

其他

相关代码都在我的github rnats 欢迎围观