Appearance
该部分功能相对比较简单,主要是
- listen & accept
- new client 当然真实的Server中会做其他很多工作,比如配置选项等等,我们这里暂时都不考虑了.
数据结构定义
rust
#[derive(Debug, Default)]
pub struct Server<T: SubListTrait> {
state: Arc<Mutex<ServerState<T>>>,
}
#[derive(Debug, Default)]
pub struct ServerState<T: SubListTrait> {
clients: HashMap<u64, Arc<Mutex<ClientMessageSender>>>,
pub sublist: T,
pub gen_cid: u64,
}
其中最核心的就是ServerState,注意到他被放在了 Arc<Mutex>中,这也就意味着
- 他要多线程访问
- 多线程中读写 如果一个复杂结构体,需要多线程读,我们可以使用Arc包裹,避免多次内存分配如果一个变量,需要多线程读写,我们必须使用Mutex包裹,否则肯定无法编译 这里的SubListTrait就是上节课从零实现消息中间件-sublist中讲到的.
关于channel和mutex
标准库中有channel和mutex,tokio也另外提供了一套,他们的接口使用起来差不多. 最大的区别就是标准库里的阻塞是会导致整个线程阻塞,而tokio提供的只是阻塞当前task. 不要在tokio框架中使用标准库中的channel和mutex
泛型 & async
因为ServerState中的sublist,他需要在多个tokio的task之间传递,所以我们要求他除了实现SubListTrait这个我们要求的功能性trait之外,还要满足tokio的要求. 也就是Send+'static. 顺便说一句'static这个生命周期,简单理解他的意思就是我这个struct中不包含任何借用.
rust
impl<T: SubListTrait + Send + 'static> Server<T> {
}
接口设计
从功能上来说,Server这个结构体很简单,就是
- 主要任务就是listen & accept
- 创建Client实例, 后续需要的时候好利用起来. 这分别对应下面的start和new_client两个函数.
rust
impl<T: SubListTrait + Send + 'static> Server<T> {
pub async fn start(&self) -> Result<(), Box<dyn Error>> {
}
async fn new_client(&self, conn: TcpStream) {
}
}
如何使用
这里用到了tokio的使用方式,为了简化使用,tokio提供了两个宏main和test,他们位于tokio-macros这个crate下面. 有了这两个宏,我们的main函数可以简化很多.看起来就和普通的main函数差别不大,只是多了一个async关键字.
rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
println!("server start..");
let s: Server<SimpleSubList> = Server::default();
s.start().await
}
代码实现
rust
use crate::client::*;
use crate::simple_sublist::SubListTrait;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
#[derive(Debug, Default)]
pub struct Server<T: SubListTrait> {
state: Arc<Mutex<ServerState<T>>>,
}
#[derive(Debug, Default)]
pub struct ServerState<T: SubListTrait> {
clients: HashMap<u64, Arc<Mutex<ClientMessageSender>>>,
pub sublist: T,
pub gen_cid: u64,
}
impl<T: SubListTrait + Send + 'static> Server<T> {
pub async fn start(self) -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:4222";
let mut listener = TcpListener::bind(addr).await?;
//go func(){}
loop {
let (conn, _) = listener.accept().await?;
self.new_client(conn).await;
}
Ok(())
}
async fn new_client(&self, conn: TcpStream) {
let state = self.state.clone();
let cid = {
let mut state = state.lock().await;
state.gen_cid += 1;
state.gen_cid
};
let c = Client::process_connection(cid, state, conn);
self.state.lock().await.clients.insert(cid, c);
}
}
其他
相关代码都在我的github rnats 欢迎围观