Appearance
Future
rust
//虽然和标准库中的不要太一样,但是更好理解
trait Future {
/// The type of the value returned when the future completes.
type Item;
/// The type representing errors that occurred while processing the computation.
type Error;
/// The function that will be repeatedly called to see if the future
/// has completed or not. The `Async` enum can either be `Ready` or
/// `NotReady` and indicates whether the future is ready to produce
/// a value or not.
fn poll(Pin<&mut Self>) -> Result<Async<Self::Item>, Self::Error>;
}
成功的时候返回Ok(Async::Ready(value))
,如果没有准备好则返回Async::NotReady
.
Stream
rust
trait Stream {
/// The type of the value yielded by the stream.
type Item;
/// The type representing errors that occurred while processing the computation.
type Error;
/// The function that will be repeatedly called to see if the stream has
/// another value it can yield
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
和Future不一样的是他要持续返回数据,所以Ready中又分了Some和None两种情况. 分成四种情况返回:
- Ok(Async::Ready(Some(value))) 表示数据准备好了
- Ok(Async::NotReady) 数据没准备好
- Ok(Async::Ready(None)) 没数据了
- Err(error) 出错了
关于Pin
为什么需要Pin
async关键字本质上是一个状态机,
rust
async {
fut_one.await;
fut_two.await;
}
这么一段代码最终会被翻译成如下代码
rust
// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
fut_one: FutOne,
fut_two: FutTwo,
state: State,
}
// List of states our `async` block can be in
enum State {
AwaitingFutOne,
AwaitingFutTwo,
Done,
}
impl AsyncFuture {
fn poll(...) -> Poll<()> {
loop {
match self.state {
State::AwaitingFutOne => match self.fut_one.poll(..) {
Poll::Ready(()) => self.state = State::AwaitingFutTwo,
Poll::Pending => return Poll::Pending,
}
State::AwaitingFutTwo => match self.fut_two.poll(..) {
Poll::Ready(()) => self.state = State::Done,
Poll::Pending => return Poll::Pending,
}
State::Done => return Poll::Ready(()),
}
}
}
}
这里面的关键就是如果AsyncFuture包含自引用,那么一旦AsyncFuture被移动,就会导致里面的指针失效.
比如:
rust
async {
let mut x = [0; 128];
let read_into_buf_fut = read_into_buf(&mut x);
await!(read_into_buf_fut);
println!("{:?}", x);
}
被翻译后为:
rust
struct ReadIntoBuf<'a> {
buf: &'a mut [u8], // points to `x` below
}
struct AsyncFuture {
x: [u8; 128],
read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}
因此需要Pin来固定AsyncFuture不被移动.
如何使用Pin
分别是Box::pinned和pin_utils::pin_mut!宏.
rust
use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io
// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... }
let fut = async { ... };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait
// Pinning with `Box`:
let fut = async { ... };
let fut = Box::pinned(fut);
execute_unpin_future(fut); // OK
// Pinning with `pin_mut!`:
let fut = async { ... };
pin_mut!(fut);
execute_unpin_future(fut); // OK
tokio runtime
As hinted at earlier, the Rust asynchronous model is very different than that of other languages. Most other languages use a “completion” based model, usually built using some form of callbacks. In this case, when an asynchronous action is started, it is submitted with a function to call once the operation completes. When the process receives the I/O notification from the operating system, it finds the function associated with it and calls it immediately. This is a push based model because the value is pushed into the callback.
The rust asynchronous model is pull based. Instead of a Future being responsible for pushing the data into a callback, it relies on something else asking if it is complete or not. In the case of Tokio, that something else is the Tokio runtime.
Using a poll based model offers many advantages, including being a zero cost abstraction, i.e., using Rust futures has no added overhead compared to writing the asynchronous code by hand.
rust
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
/// The type of value produced on completion.
type Output;
/// Attempt to resolve the future to a final value, registering
/// the current task for wakeup if the value is not yet available.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
配合Context中的awake,我觉得大多数情况下和push模型差不多啊. 可能是我理解的不深.
自己组合Future
注意这里面的use, 自己写的时候是缺一不可的.
rust
extern crate tokio;
extern crate bytes;
#[macro_use]
extern crate futures;
use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};
// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
Connecting(ConnectFuture),
Connected(TcpStream, Cursor<Bytes>),
}
impl Future for HelloWorld {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
use self::HelloWorld::*;
loop {
match self {
Connecting(ref mut f) => {
let socket = try_ready!(f.poll());
let data = Cursor::new(Bytes::from_static(b"hello world"));
*self = Connected(socket, data);
}
Connected(ref mut socket, ref mut data) => {
// Keep trying to write the buffer to the socket as long as the
// buffer has more bytes available for consumption
while data.has_remaining() {
try_ready!(socket.write_buf(data));
}
return Ok(Async::Ready(()));
}
}
}
}
}
fn main() {
let addr = "127.0.0.1:1234".parse().unwrap();
let connect_future = TcpStream::connect(&addr);
let hello_world = HelloWorld::Connecting(connect_future);
// Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
tokio::run(hello_world.map_err(|e| println!("{0}", e)))
}
组合工具
有点像标准库中的Option,iter等,futures也为Future实现了一些辅助工具
- map
- and_then