24 releases (12 breaking)

new 0.13.1 Nov 24, 2024
0.12.0 Nov 14, 2024
0.7.0 Jul 28, 2024
0.5.1 Mar 31, 2024
0.3.0 Sep 5, 2023

#5 in Multimedia

Download history 135/week @ 2024-08-04 239/week @ 2024-08-11 152/week @ 2024-08-18 88/week @ 2024-08-25 264/week @ 2024-09-01 254/week @ 2024-09-08 298/week @ 2024-09-15 267/week @ 2024-09-22 165/week @ 2024-09-29 79/week @ 2024-10-06 50/week @ 2024-10-13 84/week @ 2024-10-20 386/week @ 2024-10-27 398/week @ 2024-11-03 277/week @ 2024-11-10 288/week @ 2024-11-17

1,356 downloads per month
Used in 9 crates (8 directly)

MIT/Apache

155KB
3K SLoC

stream-download-rs

crates.io docs.rs Dependency Status license CI codecov GitHub repo size Lines of Code

stream-download is a library for streaming content from a remote location to a local cache and using it as a read and seek-able source. The requested content is downloaded in the background and read or seek operations are allowed before the download is finished. Seek operations may cause the stream to be restarted from the requested position if the download is still in progress. This is useful for media applications that need to stream large files that may take a long time to download.

This library makes heavy use of the adapter pattern to allow for pluggable transports and storage implementations.

Installation

cargo add stream-download

Feature Flags

  • http - adds an HTTP-based implementation of the SourceStream trait (enabled by default).
  • reqwest - enables streaming content over http using reqwest (enabled by default).
  • reqwest-native-tls - enables reqwest's native-tls feature. Also enables the reqwest feature.
  • reqwest-rustls - enables reqwest's rustls feature. Also enables the reqwest feature.
  • reqwest-middleware - enables integration with reqwest-middleware. Can be used to add retry policies and additional observability. Also enables the reqwest feature.
  • open-dal - adds a SourceStream implementation that uses Apache OpenDAL as the backend.
  • async-read - adds a SourceStream implementation for any type implementing AsyncRead.
  • process - adds a SourceStream implementation for external processes. Also enables the async-read feature.
  • temp-storage - adds a temporary file-based storage backend (enabled by default).
  • registry - adds a method for routing inputs to different stream handlers based on a set of rules.

NOTE: One of reqwest-native-tls or reqwest-rustls is required if you wish to use HTTPS streams.

reqwest exposes additional TLS-related feature flags beyond the two that we re-export. If you want greater control over the TLS configuration, add a direct dependency on reqwest and enable the features you need.

Usage

use std::error::Error;
use std::io;
use std::io::Read;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        TempStorageProvider::new(),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    tokio::task::spawn_blocking(move || {
        let mut buf = Vec::new();
        reader.read_to_end(&mut buf)?;
        Ok::<_, io::Error>(())
    })
    .await??;

    Ok(())
}

Examples

See examples.

Transports

Transports implement the SourceStream trait. A few types of transports are provided out of the box:

Only http is enabled by default. You can provide a custom transport by implementing SourceStream yourself.

Streams with Unknown Length

Resources such as standalone songs or videos have a finite length that is used to support certain seeking functionality. Live streams or those that otherwise don't have a known length are still supported, but attempting to seek from the end of the stream will return an error. This may cause issues with certain audio or video libraries that attempt to perform such seek operations. If it's necessary to explicitly check for an infinite stream, you can check the stream's content length ahead of time.

use std::error::Error;
use std::io;
use std::io::Read;
use std::result::Result;

use stream_download::http::reqwest::Client;
use stream_download::http::HttpStream;
use stream_download::source::DecodeError;
use stream_download::source::SourceStream;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let stream =
        HttpStream::<Client>::create("https://some-cool-url.com/some-stream".parse()?).await?;
    let content_length = stream.content_length();
    let is_infinite = content_length.is_none();
    println!("Infinite stream = {is_infinite}");

    let mut reader = match StreamDownload::from_stream(
        stream,
        TempStorageProvider::default(),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    tokio::task::spawn_blocking(move || {
        let mut buf = [0; 256];
        reader.read_exact(&mut buf)?;
        Ok::<_, io::Error>(())
    })
    .await??;

    Ok(())
}

Icecast/Shoutcast Streams

If you're using this library to handle Icecast streams or one if its derivatives, check out the icy-metadata crate. There are examples for how to use it with stream-download in the repo.

Streaming from YouTube and Similar Sites

Some websites with embedded audio or video streams can be tricky to handle directly. For these cases, it's easier to use a dedicated program such as yt-dlp which can parse media from specific websites (it supports more websites than just YouTube, despite the name). If you enable the process feature, you can integrate with external programs like yt-dlp that can send their output to stdout. Some helpers for interacting with yt-dlp and ffmpeg (for post-processing) are also included.

See youtube_simple for a simple way to stream audio from a YouTube video.

See yt_dlp for a more complex example of handling different kinds of URLs with yt-dlp.

Registry

Some applications may need to use multiple types of stream handlers. For example, you may want to handle YouTube URLs with yt-dlp, normal HTTP URLs with reqwest, and file:// URLs with a simple BufReader. You can use a registry to handle these cases.

See the example for more info.

Storage

The storage module provides ways to customize how the stream is cached locally. Pre-configured implementations are available for memory and temporary file-based storage. Typically you'll want to use temporary file-based storage to prevent using too much memory, but memory-based storage may be preferable if you know the stream size is small or you need to run your application on a read-only filesystem.

use std::error::Error;
use std::io::Read;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::memory::MemoryStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        // buffer will be stored in memory instead of on disk
        MemoryStorageProvider,
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    Ok(())
}

Bounded Storage

When using infinite streams which don't need to support seeking, it usually isn't desirable to let the underlying cache grow indefinitely if the stream may be running for a while. For these cases, you may want to use bounded storage. Bounded storage uses a circular buffer which will overwrite the oldest contents once it fills up. If the reader falls too far behind the writer, the writer will pause so the reader can catch up.

use std::error::Error;
use std::io::Read;
use std::num::NonZeroUsize;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::bounded::BoundedStorageProvider;
use stream_download::storage::memory::MemoryStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        // use bounded storage to keep the underlying size from growing indefinitely
        BoundedStorageProvider::new(
            // you can use any other kind of storage provider here
            MemoryStorageProvider,
            // be liberal with the buffer size, you need to make sure it holds enough space to
            // prevent any out-of-bounds reads
            NonZeroUsize::new(512 * 1024).unwrap(),
        ),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => Err(e.decode_error().await)?,
    };

    Ok(())
}

Adaptive Storage

When you need to support both finite and infinite streams, you may want to use adaptive storage. This is a convenience wrapper that will use bounded storage when the stream has no content length and unbounded storage when the stream does return a content length.

use std::error::Error;
use std::io::Read;
use std::num::NonZeroUsize;
use std::result::Result;

use stream_download::source::DecodeError;
use stream_download::storage::adaptive::AdaptiveStorageProvider;
use stream_download::storage::temp::TempStorageProvider;
use stream_download::{Settings, StreamDownload};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut reader = match StreamDownload::new_http(
        "https://some-cool-url.com/some-file.mp3".parse()?,
        // use adaptive storage to keep the underlying size from growing indefinitely
        // when the content type is not known
        AdaptiveStorageProvider::new(
            // you can use any other kind of storage provider here
            TempStorageProvider::default(),
            // be liberal with the buffer size, you need to make sure it holds enough space to
            // prevent any out-of-bounds reads
            NonZeroUsize::new(512 * 1024).unwrap(),
        ),
        Settings::default(),
    )
    .await
    {
        Ok(reader) => reader,
        Err(e) => return Err(e.decode_error().await)?,
    };

    Ok(())
}

Handling Errors and Reconnects

Some automatic support is available for retrying stalled streams. See the docs for the StreamDownload struct for more details.

If using reqwest-middleware, a retry policy can be used to handle transient server errors. See retry_middleware for an example of adding retry middleware.

Authentication and Other Customization

It's possible to customize your HTTP requests if you need to perform authentication or change other settings.

See client_options for customizing the HTTP client builder.

See custom_client for dynamically modifying each HTTP request.

Supported Rust Versions

The MSRV is currently 1.75.0.

Dependencies

~7–22MB
~314K SLoC