Appearance
消息格式
服务器和客户端来往的消息只有三种,分别是订阅(SUB),发布(PUB),推送消息(MSG). 其中前两种是从客户端向服务端推送,最后一种则是服务端向客户端推送.
服务端需要解析的消息格式
pub
PUB <subject> <size>\r\n
<message>\r\n
sub
SUB <subject> <sid>\r\n
SUB <subject> <queue> <sid>\r\n
客户端需要解析的消息格式
MSG
MSG <subject> <sid> <size>\r\n
<message>\r\n
消息格式解析的思路
出于性能考虑,应该考虑如下问题:
- 尽可能的避免内存分配
- 尽可能的避免内存复制(zero copy)
- 不要使用正则表达式去匹配
实现
根据上述原则,我最终选择使用状态机,这是最灵活的方式,虽然代码绍魏复杂一点,但是可以调整以尽可能满足上述三个原则. 注意这里的实现只针对服务端,相关代码都位于我的github
错误处理
错误处理这是在所有的系统中都要处理的事情,这里我先把可能发生的错误都列在这里,然后定义.
rust
#[derive(Debug)]
pub const ERROR_PARSE: i32 = 1;
pub const ERROR_MESSAGE_SIZE_TOO_LARGE: i32 = 2;
pub const ERROR_INVALID_SUBJECT: i32 = 3;
pub const ERROR_SUBSCRIBTION_NOT_FOUND: i32 = 4;
pub const ERROR_CONNECTION_CLOSED: i32 = 5;
pub const ERROR_UNKOWN_ERROR: i32 = 1000;
pub struct NError {
pub err_code: i32,
}
impl NError {
pub fn new(err_code: i32) -> Self {
Self { err_code }
}
pub fn error_description(&self) -> &'static str {
match self.err_code {
ERROR_PARSE => return "parse error",
...
_ => return "unkown error",
}
}
}
状态定义
这里采用的是逐个byte解析的方式. 只处理pub和sub两种消息. 其中sub支持可选的queue来做负载均衡.
rust
#[derive(Debug, Clone)]
enum ParseState {
OpStart,
OpS,
OpSu,
OpSub,
OPSubSpace,
OpSubArg,
OpP,
OpPu,
OpPub, //pub argument
OpPubSpace,
OpPubArg,
OpMsg, //pub message
OpMsgFull,
}
Parser以及parse结果
在和汉东老师聊的过程中,他和我都认为rust代码写之前最好先定义清楚数据结构. 当然其他语言也需要这样,不过我感觉rust语言如果不限这么做,后续调整起来会更麻烦.
返回结果
parse的结果不外乎四种情况
- 出错了
- 到目前为止还没有收到完整的消息 比如只收到了SUB SUBJECT ,消息不完整,当然不能处理
- 一条PUB消息
- 一条SUB消息
rust
#[derive(Debug, PartialEq)]
pub struct SubArg<'a> {
pub subject: &'a str, //为什么是str而不是String,就是为了避免内存分配,
pub sid: &'a str,
pub queue: Option<&'a str>,
}
#[derive(Debug, PartialEq)]
pub struct PubArg<'a> {
pub subject: &'a str,
pub size_buf: &'a str, // 1024 字符串形式,避免后续再次转换
pub size: usize, //1024 整数形式
pub msg: &'a [u8],
}
#[derive(Debug, PartialEq)]
pub enum ParseResult<'a> {
NoMsg, //buf="sub top.stevenbai.blog" sub消息不完整,我肯定不能处理
Sub(SubArg<'a>),
Pub(PubArg<'a>),
}
Parser
Parser的定义这个版本我们尽量去满足上述三个原则,但是考虑到第二条zero-copy会让代码中 到处都是if-else,所以暂时先不考虑. 后续我们在做优化的时候会进行benchmark.
rust
/*
这个长度很有关系,必须能够将一个完整的主题以及参数放进去,
所以要限制subject的长度
*/
const BUF_LEN: usize = 512;
pub struct Parser {
state: ParseState,
buf: [u8; BUF_LEN], //消息解析缓冲区,如果消息体+消息头不超过512,直接用这个,超过了就必须另分配
arg_len: usize,
msg_buf: Option<Vec<u8>>,
//解析过程中受到新消息,那么 新消息的总长度是msg_total_len,已收到部分应该是msg_len
msg_total_len: usize,
msg_len: usize,
debug: bool,
}
消息解析
有了这些定义以后,真正的消息解析过程就会清晰很多.
parse 函数的定义
rust
/**
对收到的字节序列进行解析,解析完毕后得到pub或者sub消息,
同时有可能没有消息或者缓冲区里面还有其他消息
返回结果中的usize指的是消耗了缓冲区中多少字节
*/
pub fn parse(&mut self, buf: &[u8]) -> Result<(ParseResult, usize)>
parse函数的使用
rust
fn test_sub2() {
let mut p = Parser::new();
let mut buf = "SUB subject 1\r\nSUB subject2 2\r\n".as_bytes();
loop {
let r = p.parse(buf);
assert!(!r.is_err());
let r = r.unwrap();
buf = &buf[r.1..];
match r.0 {
ParseResult::Sub(sub) => {
println!("sub.subect={}", sub.subject);
}
_ => panic!(),
}
if buf.len() == 0 {
break;
}
}
}
完整parse的实现
rust
impl Parser {
pub fn new() -> Self {
Self {
state: ParseState::OpStart,
buf: [0; BUF_LEN],
arg_len: 0,
msg_buf: None,
msg_total_len: 0,
msg_len: 0,
debug: true,
}
}
/**
对收到的字节序列进行解析,解析完毕后得到pub或者sub消息,
同时有可能没有消息或者缓冲区里面还有其他消息
*/
pub fn parse(&mut self, buf: &[u8]) -> Result<(ParseResult, usize)> {
let mut b;
let mut i = 0;
if self.debug {
println!(
"parse string:{},state={:?}",
unsafe { std::str::from_utf8_unchecked(buf) },
self.state
);
}
while i < buf.len() {
use ParseState::*;
b = buf[i] as char;
// println!("state={:?},b={}", self.state, b);
match self.state {
OpStart => match b {
'S' => self.state = OpS,
'P' => self.state = OpP,
_ => parse_error!(),
},
OpS => match b {
'U' => self.state = OpSu,
_ => parse_error!(),
},
OpSu => match b {
'B' => self.state = OpSub,
_ => parse_error!(),
},
OpSub => match b {
//sub stevenbai.top 3 是ok的,但是substevenbai.top 3就不允许
' ' | '\t' => self.state = OPSubSpace,
_ => parse_error!(),
},
OPSubSpace => match b {
' ' | '\t' => {}
_ => {
self.state = OpSubArg;
self.arg_len = 0;
continue;
}
},
OpSubArg => match b {
'\r' => {}
'\n' => {
self.state = OpStart;
let r = self.process_sub()?;
return Ok((r, i + 1));
}
_ => {
self.add_arg(b as u8)?;
}
},
OpP => match b {
'U' => self.state = OpPu,
_ => parse_error!(),
},
OpPu => match b {
'B' => self.state = OpPub,
_ => parse_error!(),
},
OpPub => match b {
' ' | '\t' => self.state = OpPubSpace,
_ => parse_error!(),
},
OpPubSpace => match b {
' ' | '\t' => {}
_ => {
self.state = OpPubArg;
self.arg_len = 0;
continue;
}
},
OpPubArg => match b {
'\r' => {}
'\n' => {
//PUB top.stevenbai 5\r\n
self.state = OpMsg;
let size = self.get_message_size()?;
if size == 0 || size > 1 * 1024 * 1024 {
//消息体长度不应该超过1M,防止Dos攻击
return Err(NError::new(ERROR_MESSAGE_SIZE_TOO_LARGE));
}
if size + self.arg_len > BUF_LEN {
self.msg_buf = Some(Vec::with_capacity(size));
}
self.msg_total_len = size;
}
_ => {
self.add_arg(b as u8)?;
}
},
OpMsg => {
//涉及消息长度
if self.msg_len < self.msg_total_len {
self.add_msg(b as u8);
} else {
self.state = OpMsgFull;
}
}
OpMsgFull => match b {
'\r' => {}
'\n' => {
self.state = OpStart;
let r = self.process_msg()?;
return Ok((r, i + 1));
}
_ => {
parse_error!();
}
},
// _ => panic!("unkown state {:?}", self.state),
}
i += 1;
}
Ok((ParseResult::NoMsg, 0))
}
//一种是消息体比较短,可以直接放在buf中,无需另外分配内存
//另一种是消息体很长,无法放在buf中,额外分配了msg_buf空间
fn add_msg(&mut self, b: u8) {
if let Some(buf) = self.msg_buf.as_mut() {
buf.push(b);
} else {
//消息体比较短的情况
if self.arg_len + self.msg_total_len > BUF_LEN {
panic!("message should allocate space");
}
self.buf[self.arg_len + self.msg_len] = b;
}
self.msg_len += 1;
}
fn add_arg(&mut self, b: u8) -> Result<()> {
//太长的subject
if self.arg_len >= self.buf.len() {
parse_error!();
}
self.buf[self.arg_len] = b;
self.arg_len += 1;
Ok(())
}
//解析缓冲区中的形如stevenbai.top queue 3
fn process_sub(&self) -> Result<ParseResult> {
let buf = &self.buf[0..self.arg_len];
//有可能客户端恶意发送一些无效的utf8字符,这会导致错误.
let ss = unsafe { std::str::from_utf8_unchecked(buf) };
let mut arg_buf = [""; 3]; //如果没有queue,长度就是2,否则长度是3
let mut arg_len = 0;
for s in ss.split(' ') {
if s.len() == 0 {
continue;
}
if arg_len >= 3 {
parse_error!();
}
arg_buf[arg_len] = s;
arg_len += 1;
}
let mut sub_arg = SubArg {
subject: "",
sid: "",
queue: None,
};
sub_arg.subject = arg_buf[0];
//长度为2时不包含queue,为3包含queue,其他都说明格式错误
match arg_len {
2 => {
sub_arg.sid = arg_buf[1];
}
3 => {
sub_arg.sid = arg_buf[2];
sub_arg.queue = Some(arg_buf[1]);
}
_ => parse_error!(),
}
Ok(ParseResult::Sub(sub_arg))
}
//解析缓冲区中以及msg_buf中的形如stevenbai.top 5hello
fn process_msg(&self) -> Result<ParseResult> {
let msg = if self.msg_buf.is_some() {
self.msg_buf.as_ref().unwrap().as_slice()
} else {
&self.buf[self.arg_len..self.arg_len + self.msg_total_len]
};
let mut arg_buf = [""; 2];
let mut arg_len = 0;
let ss = unsafe { std::str::from_utf8_unchecked(&self.buf[0..self.arg_len]) };
for s in ss.split(' ') {
if s.len() == 0 {
continue;
}
if arg_len >= 2 {
parse_error!()
}
arg_buf[arg_len] = s;
arg_len += 1;
}
let pub_arg = PubArg {
subject: arg_buf[0],
size_buf: arg_buf[1],
size: self.msg_total_len,
msg,
};
Ok(ParseResult::Pub(pub_arg))
}
pub fn clear_msg_buf(&mut self) {
self.msg_buf = None;
self.msg_len = 0;
self.msg_total_len = 0;
}
//从接收到的pub消息中提前解析出来消息的长度
fn get_message_size(&self) -> Result<usize> {
//缓冲区中形如top.stevenbai.top 5
let arg_buf = &self.buf[0..self.arg_len];
let pos = arg_buf
.iter()
.rev()
.position(|b| *b == ' ' as u8 || *b == '\t' as u8);
if pos.is_none() {
parse_error!();
}
let pos = pos.unwrap();
let size_buf = &arg_buf[arg_buf.len() - pos..];
let szb = unsafe { std::str::from_utf8_unchecked(size_buf) };
szb.parse::<usize>().map_err(|_| NError::new(ERROR_PARSE))
}
}
其他
在实现Parser的过程中,我们展示了Rust中enum的使用,错误的处理,字符串的处理等常见的问题. 接下来我会继续实现另一个组件订阅的管理.
相关代码都在我的github rnats 欢迎围观