#frame #byte #codec #decoder #future #byte-string #io

yanked futures-codec2

Utilities for encoding and decoding frames forked from tokio-util

0.1.0 Dec 31, 2020

#79 in #byte-string

MIT license

93KB
1K SLoC

futures-codec2

Utilities for encoding and decoding frames forked from tokio-util.

License

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Tokio by you, shall be licensed as MIT, without any additional terms or conditions.


lib.rs:

Adaptors from AsyncRead/AsyncWrite to Stream/Sink

Raw I/O objects work with byte sequences, but higher-level code usually wants to batch these into meaningful chunks, called "frames".

This module contains adapters to go from streams of bytes, AsyncRead and AsyncWrite, to framed streams implementing Sink and Stream. Framed streams are also known as transports.

The Decoder trait

A Decoder is used together with FramedRead or Framed to turn an AsyncRead into a Stream. The job of the decoder trait is to specify how sequences of bytes are turned into a sequence of frames, and to determine where the boundaries between frames are. The job of the FramedRead is to repeatedly switch between reading more data from the IO resource, and asking the decoder whether we have received enough data to decode another frame of data.

The main method on the Decoder trait is the decode method. This method takes as argument the data that has been read so far, and when it is called, it will be in one of the following situations:

  1. The buffer contains less than a full frame.
  2. The buffer contains exactly a full frame.
  3. The buffer contains more than a full frame.

In the first situation, the decoder should return Ok(None).

In the second situation, the decoder should clear the provided buffer and return Ok(Some(the_decoded_frame)).

In the third situation, the decoder should use a method such as split_to or advance to modify the buffer such that the frame is removed from the buffer, but any data in the buffer after that frame should still remain in the buffer. The decoder should also return Ok(Some(the_decoded_frame)) in this case.

Finally the decoder may return an error if the data is invalid in some way. The decoder should not return an error just because it has yet to receive a full frame.

It is guaranteed that, from one call to decode to another, the provided buffer will contain the exact same data as before, except that if more data has arrived through the IO resource, that data will have been appended to the buffer. This means that reading frames from a FramedRead is essentially equivalent to the following loop:

use futures::io::AsyncBufReadExt;

let mut buf = bytes::BytesMut::new();
loop {
    // The read_buf call will append to buf rather than overwrite existing data.
    let len = {
        let rbuf = io_resource.fill_buf().await?;
        buf.extend_from_slice(rbuf);
        rbuf.len()
    };
    io_resource.consume_unpin(len);

    if len == 0 {
        while let Some(frame) = decoder.decode_eof(&mut buf)? {
            yield frame;
        }
        break;
    }

    while let Some(frame) = decoder.decode(&mut buf)? {
        yield frame;
    }
}

The example above uses yield whenever the Stream produces an item.

Example decoder

As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The decoder fails with an error if the string data is not valid utf-8 or too long.

Such a decoder can be written like this:

use futures_codec2::Decoder;
use bytes::{BytesMut, Buf};

struct MyStringDecoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Decoder for MyStringDecoder {
    type Item = String;
    type Error = std::io::Error;

    fn decode(
        &mut self,
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 4 {
            // Not enough data to read length marker.
            return Ok(None);
        }

        // Read length marker.
        let mut length_bytes = [0u8; 4];
        length_bytes.copy_from_slice(&src[..4]);
        let length = u32::from_le_bytes(length_bytes) as usize;

        // Check that the length is not too large to avoid a denial of
        // service attack where the server runs out of memory.
        if length > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", length)
            ));
        }

        if src.len() < 4 + length {
            // The full string has not yet arrived.
            //
            // We reserve more space in the buffer. This is not strictly
            // necessary, but is a good idea performance-wise.
            src.reserve(4 + length - src.len());

            // We inform the Framed that we need more bytes to form the next
            // frame.
            return Ok(None);
        }

        // Use advance to modify src such that it no longer contains
        // this frame.
        let data = src[4..4 + length].to_vec();
        src.advance(4 + length);

        // Convert the data to a string, or fail if it is not valid utf-8.
        match String::from_utf8(data) {
            Ok(string) => Ok(Some(string)),
            Err(utf8_error) => {
                Err(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    utf8_error.utf8_error(),
                ))
            },
        }
    }
}

The Encoder trait

An Encoder is used together with FramedWrite or Framed to turn an AsyncWrite into a Sink. The job of the encoder trait is to specify how frames are turned into a sequences of bytes. The job of the FramedWrite is to take the resulting sequence of bytes and write it to the IO resource.

The main method on the Encoder trait is the encode method. This method takes an item that is being written, and a buffer to write the item to. The buffer may already contain data, and in this case, the encoder should append the new frame the to buffer rather than overwrite the existing data.

It is guaranteed that, from one call to encode to another, the provided buffer will contain the exact same data as before, except that some of the data may have been removed from the front of the buffer. Writing to a FramedWrite is essentially equivalent to the following loop:

use futures::future::FutureExt;
use futures::io::AsyncWriteExt;
use bytes::Buf; // for advance

const MAX: usize = 8192;

let mut buf = bytes::BytesMut::new();
loop {
    futures::select! {
        num_written = io_resource.write(&buf).fuse() => {
            if !buf.is_empty() {
                buf.advance(num_written?);
            }
        },
        frame = next_frame().fuse() => {
            if buf.len() < MAX {
                encoder.encode(frame, &mut buf)?;
            }
        },
        _ = no_more_frames().fuse() => {
            io_resource.write_all(&buf).await?;
            io_resource.close().await?;
            return Ok(());
        },
    }
}

Here the next_frame method corresponds to any frames you write to the FramedWrite. The no_more_frames method corresponds to closing the FramedWrite with SinkExt::close.

Example encoder

As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The encoder will fail if the string is too long.

Such an encoder can be written like this:

use futures_codec2::Encoder;
use bytes::BytesMut;

struct MyStringEncoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Encoder<String> for MyStringEncoder {
    type Error = std::io::Error;

    fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
        // Don't send a string if it is longer than the other end will
        // accept.
        if item.len() > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", item.len())
            ));
        }

        // Convert the length into a byte array.
        // The cast to u32 cannot overflow due to the length check above.
        let len_slice = u32::to_le_bytes(item.len() as u32);

        // Reserve space in the buffer.
        dst.reserve(4 + item.len());

        // Write the length and string to the buffer.
        dst.extend_from_slice(&len_slice);
        dst.extend_from_slice(item.as_bytes());
        Ok(())
    }
}

Dependencies

~0.7–1.4MB
~26K SLoC