Skip to content
On this page

Future

rust
//虽然和标准库中的不要太一样,但是更好理解
trait Future {
    /// The type of the value returned when the future completes.
    type Item;

    /// The type representing errors that occurred while processing the computation.
    type Error;

    /// The function that will be repeatedly called to see if the future
    /// has completed or not. The `Async` enum can either be `Ready` or
    /// `NotReady` and indicates whether the future is ready to produce
    /// a value or not.
    fn poll(Pin<&mut Self>) -> Result<Async<Self::Item>, Self::Error>;
}

成功的时候返回Ok(Async::Ready(value)),如果没有准备好则返回Async::NotReady.

Stream

rust
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// The type representing errors that occurred while processing the computation.
    type Error;

    /// The function that will be repeatedly called to see if the stream has
    /// another value it can yield
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

和Future不一样的是他要持续返回数据,所以Ready中又分了Some和None两种情况. 分成四种情况返回:

  1. Ok(Async::Ready(Some(value))) 表示数据准备好了
  2. Ok(Async::NotReady) 数据没准备好
  3. Ok(Async::Ready(None)) 没数据了
  4. Err(error) 出错了

关于Pin

为什么需要Pin

async关键字本质上是一个状态机,

rust
async {
     fut_one.await;
     fut_two.await;
}

这么一段代码最终会被翻译成如下代码

rust
// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}
 // List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}
 impl AsyncFuture {
    fn poll(...) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

这里面的关键就是如果AsyncFuture包含自引用,那么一旦AsyncFuture被移动,就会导致里面的指针失效.

比如:

rust
async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    await!(read_into_buf_fut);
    println!("{:?}", x);
}

被翻译后为:

rust
struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}
 struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

因此需要Pin来固定AsyncFuture不被移动.

如何使用Pin

分别是Box::pinned和pin_utils::pin_mut!宏.

rust
use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io
 // A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... }
 let fut = async { ... };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait
 // Pinning with `Box`:
let fut = async { ... };
let fut = Box::pinned(fut);
execute_unpin_future(fut); // OK
 // Pinning with `pin_mut!`:
let fut = async { ... };
pin_mut!(fut);
execute_unpin_future(fut); // OK

tokio runtime

As hinted at earlier, the Rust asynchronous model is very different than that of other languages. Most other languages use a “completion” based model, usually built using some form of callbacks. In this case, when an asynchronous action is started, it is submitted with a function to call once the operation completes. When the process receives the I/O notification from the operating system, it finds the function associated with it and calls it immediately. This is a push based model because the value is pushed into the callback.

The rust asynchronous model is pull based. Instead of a Future being responsible for pushing the data into a callback, it relies on something else asking if it is complete or not. In the case of Tokio, that something else is the Tokio runtime.

Using a poll based model offers many advantages, including being a zero cost abstraction, i.e., using Rust futures has no added overhead compared to writing the asynchronous code by hand.
rust
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    /// The type of value produced on completion.
    type Output;

    /// Attempt to resolve the future to a final value, registering
    /// the current task for wakeup if the value is not yet available.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

配合Context中的awake,我觉得大多数情况下和push模型差不多啊. 可能是我理解的不深.

自己组合Future

注意这里面的use, 自己写的时候是缺一不可的.

rust
extern crate tokio;
extern crate bytes;
#[macro_use]
extern crate futures;

use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};

// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
    Connecting(ConnectFuture),
    Connected(TcpStream, Cursor<Bytes>),
}

impl Future for HelloWorld {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<(), io::Error> {
        use self::HelloWorld::*;

        loop {
            match self {
                Connecting(ref mut f) => {
                    let socket = try_ready!(f.poll());
                    let data = Cursor::new(Bytes::from_static(b"hello world"));
                    *self = Connected(socket, data);
                }
                Connected(ref mut socket, ref mut data) => {
                    // Keep trying to write the buffer to the socket as long as the
                    // buffer has more bytes available for consumption
                    while data.has_remaining() {
                        try_ready!(socket.write_buf(data));
                    }
                    return Ok(Async::Ready(()));
                }
            }
        }
    }
}

fn main() {
    let addr = "127.0.0.1:1234".parse().unwrap();
    let connect_future = TcpStream::connect(&addr);
    let hello_world = HelloWorld::Connecting(connect_future);

    // Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
    tokio::run(hello_world.map_err(|e| println!("{0}", e)))
}

组合工具

有点像标准库中的Option,iter等,futures也为Future实现了一些辅助工具

  1. map
  2. and_then