Skip to content
On this page

一个client对应多少线程/任务

这里面有tokio 异步任务,所以只要是异步执行的,都任务是独立的. 一个client至少会对应两个独立线程(不包含tokio创建的线程).

session 中的任务

async responses

一个client对应一个session,一个session 有一个后台线程 thread::spawn(move || Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx)); 周期性的去从message_queue中读取async_responses.

  1. ServiceFault 错误通知
  2. PublishResponse 订阅拉的结果,这些结果会通过OnSubscriptionNotification 告诉客户端使用者.

这个线程的轮询周期是10ms.

session_activity_task

通过发送一个空的ReadRequest来模拟ping消息,保持和服务器的连接. 这是一个tokio任务,周期是1秒钟.

tcp transport

tcp连接处理相关.

1. 总任务

建立连接,send hello以后就老等下面三个任务(都在connection_task中)完成后结束. 注意这是一个独立的线程.

2. spawn_finished_monitor_task

监控连接是否结束,这个很奇怪,没有使用通知,居然是轮训模式 轮询周期是200ms

3. spawn_reading_task

从tcp连接上读取服务器发送过来的消息,消息主要有三种类型:

  1. Ack 对Hello消息的确认,其他消息都是Request/Reponse?
  2. Error
  3. Response

Server从不主动给client发消息,除了Error以外.

收到Response以后,放到message_queue的responses中.等待其他任务来取.

4. spawn_writing_task

通过message_queue中的request_channel接受请求,然后发送给服务器.

subscription相关

一个client只有一个session,但是一个session可以有多个subscription,每个subscription有一个唯一的整数id.

1. timer_task

周期性的向服务器发送PublishRequest,不管服务器有没有响应. 除非收到了服务器的ServiceFault(BadTooManyPublishRequests).

主线程

send_request是一个同步的操作.

  1. 通过async方式将request放入message_queue的队列中
  2. 不停的轮询message_queue的response中是否有想要的响应
  3. 拿到响应后返回.

这里有一个问题,就是将request放入message_queue中会锁住session_state,但是这个数据结构非常关键. 到处都在用,是否会造成死锁? 如果一个请求迟迟发送不出去呢? 即使这时候收到了服务器端的其他publish Response消息,也是无法处理的,因为处理的过程要锁住session_state.

Server 端统计

http server

这是一个通过http接口展示server内部统计信息的工具. 比较独立,可以不用管,需要的时候才会启动.

PollingAction

一个定时执行函数的工具,只不过他会检查server是否在运行.

tcp_transport

和客户端通过tcp进行通信的地方, 以下任务是每一个连接都要创建的.

spawn_hello_timeout_task

检查连接建立后,是否收到hello,如果收到呢就结束这个task,否则后台一直周期性的检查.

  1. 超时, 发送Message::Quit,断开连接
  2. 收到,结束.

spawn_subscriptions_task

周期性的,这个周期取决于采样周期和订阅周期. 两个任务:

收集pulish responses
  1. 清理过期的publish requests
  2. 收集publish responses,并通过channel 发送.
接收publish reponses

收到以后通过sender发送出去

我的问题: *为什么这是两个独立的任务? 合成一个不更好么?

spawn_finished_monitor_task

一个独立的任务,周期性检查server是否退出. 思路和client是完全一样的. 感觉这种周期性的任务很奇怪,也不合理.

spawn_reading_loop_task

读取来自客户端的消息,这里面主要有两个状态:

  1. WaitingHello 这个时候什么消息都不处理,只处理Hello,处理以后,才能正常处理其他消息.

  2. ProcessMessages 处理任何消息.核心在process_chunk中.

其他的就是如何处理错误以及结束.

spawn_writing_loop_task

向客户端发送消息,发送消息的请求来自以上三个task:

  • spawn_hello_timeout_task
  • spawn_subscriptions_task
  • spawn_reading_loop_task
  1. 发送之前检查连接的可用性
  2. 发送之前检查消息的正确性
  3. 将要发送消息放入buffer中.
  4. 通过write_bytes_task发送出去
  5. 发送后再次检查是否需要关闭.

** shutdown没有await,需要加上.

Server 层的task

discovery server相关

可以不用,暂时忽略.

add_polling_action

这是一个pub api,让调用者可以周期性的在server内部做一些事情.

但是这个只能添加,不能删除.

交叉编译

针对raspberry进行交叉编译

sudo apt-get install gcc-arm-linux-gnueabihf
rustup target add armv7-unknown-linux-gnueabihf     # armv7
cargo build --target armv7-unknown-linux-gnueabihf --release 

.cargo/config

[target.arm-unknown-linux-musleabihf]
linker = "arm-linux-musleabihf-ld"

[target.armv7-unknown-linux-musleabihf]
linker = "arm-linux-musleabihf-ld"

[target.arm-unknown-linux-gnueabihf]
linker = "arm-linux-gnueabihf-gcc"

[target.armv7-unknown-linux-gnueabihf]
linker = "arm-linux-gnueabihf-gcc"

openssl 依赖问题

cd /tmp

wget https://www.openssl.org/source/openssl-1.0.1t.tar.gz
tar xzf openssl-1.0.1t.tar.gz
export MACHINE=armv7
export ARCH=arm
export CC=arm-linux-musleabi-gcc
cd openssl-1.0.1t && ./config shared && make && cd -

export OPENSSL_LIB_DIR=~/openssl-1.0.1t/
export OPENSSL_INCLUDE_DIR=~/openssl-1.0.1t/include

如果想静态编译就要使用arm-linux-musleabi-gcc, 用arm-linux-musleabihf-gcc的话,会出现-mfloat-abi=hard不支持错误.