17 releases

0.1.22 May 6, 2023
0.1.21 May 6, 2023
0.1.20 Apr 23, 2023

#8 in #data-transformation

Download history 4/week @ 2024-09-19 3/week @ 2024-09-26

179 downloads per month

Apache-2.0

135KB
1.5K SLoC

Myval - a lightweight Apache Arrow data frame for Rust crates.io page docs.rs page

What is Myval?

Mýval (pronounced as [m'ival]) is translated from Czech as raccoon.

Why not a bear-name?

The common name for raccoon in Czech is "medvídek mýval" which can be translated as "little bear".

But there is Polars?

Myval is not a competitor of Polars. Myval is a lightweight Arrow data frame which is focused on in-place data transformation and IPC.

Because Arrow has got the standardized data layout, data frames can be converted to Polars and vice-versa with zero-copy:

let polars_df = polars::frame::DataFrame::from(myval_df);
let myval_df = myval::DataFrame::from(polars_df);

As well as Polars, Myval is based on arrow2.

Some tricks

IPC

Consider there is an Arrow stream block (Schema+Chunk) received from e.g. RPC or Pub/Sub. Convert the block into a Myval data frame:

let df = myval::DataFrame::from_ipc_block(&buf).unwrap();

Need to send a data frame back? Convert it to Arrow stream block with a single line of code:

let buf = df.into_ipc_block().unwrap();

Need to send sliced? No problem, there are methods which can easily return sliced series, sliced data frames or IPC chunks.

Overriding data types

Consider there is an i64-column "time" which contains nanosecond timestamps. Let us override its data type:

use myval::{DataType, TimeUnit};

df.set_data_type("time",
    DataType::Timestamp(TimeUnit::Nanosecond, None)).unwrap();

Parsing numbers from strings

Consider there is a utf8-column "value" which should be parsed to floats:

df.parse::<f64>("value").unwrap();

Basic in-place math

df.add("col", 1_000i64).unwrap();
df.sub("col", 1_000i64).unwrap();
df.mul("col", 1_000i64).unwrap();
df.div("col", 1_000i64).unwrap();

Custom in-place transformations

df.apply("time", |time| time.map(|t: i64| t / 1_000)).unwrap();

Horizontal join

df.join(df2).unwrap();

Concatenation

let merged = myval::concat(&[&df1, &df2, &df3]).unwrap();

Set column ordering

Consider there is a Myval data frame with columns "voltage", "temp1", "temp2", "temp3" which has received data from a server column-by-column in random ordering. Let us correct the ordering back to normal:

df.set_ordering(&["voltage", "temp1", "temp2", "temp3"]);

From/to JSON

Myval data frames can be parsed from serde_json Value (map only) or converted to Value (map/array). This requires "json" crate feature:

// create Object value from a data frame, converted to serde_json::Map
let val = serde_json::Value::Object(df.to_json_map().unwrap());
// define JSON parser
let mut parser = myval::convert::json::Parser::new()
    .with_type_mapping("name", DataType::LargeUtf8);
// add more columns if required
parser = parser.with_type_mapping("time", DataType::Int64);
parser = parser.with_type_mapping("status", DataType::Int32);
let parsed_df = parser.parse_value(val).unwrap();
  • Some data types can not be correctly parsed from Value objects (e.g. Timestamp), use DataFrame methods to correct them to the required ones.

  • If a column is defined in a json::Parser object but missing in Value, it is created as null-filled.

Others

Check the documentation: https://docs.rs/myval

Working with databases

Arrow provides several ways to work with databases. Myval additionally provides tools to work with PostgreSQL databases in the easy way via the popular sqlx crate ("postgres" feature must be enabled):

Fetching data from a database

use futures::stream::TryStreamExt;

let pool = PgPoolOptions::new()
    .connect("postgres://postgres:welcome@localhost/postgres")
    .await.unwrap();
let max_size = 100_000;
let mut stream = myval::db::postgres::fetch(
    "select * from test".to_owned(), Some(max_size), pool.clone());
// the stream returns data frames one by one with max data frame size (in
// bytes) = max_size
while let Some(df) = stream.try_next().await.unwrap() {
    // do some stuff
}

Why does the stream object require PgPool? There is one important reason: such stream objects are static and can be stored anywhere, e.g. used as cursors in a client-server architecture.

Pushing data into a database

Server

let df = DataFrame::from_ipc_block(payload).unwrap();
// The first received data frame must have "database" field in its schema
// metadata. Next data frames can go without it.
if let Some(dbparams) = df.metadata().get("database") {
    let params: myval::db::postgres::Params = serde_json::from_str(dbparams)
        .unwrap();
    let processed_rows: usize = myval::db::postgres::push(&df, &params,
        &pool).await.unwrap();
}

Client

Let us push Polars data frame into a PostgreSQL database:

use serde_json::json;

let mut df = myval::DataFrame::from(polars_df);
df.metadata_mut().insert(
    // set "database" metadata field
    "database".to_owned(),
    serde_json::to_string(&json!({
        // table, required
        "table": "test",
        // PostgreSQL schema, optional
        "postgres": { "schema": "public" },
        // keys, required if the table has got keys/unique indexes
        "keys": ["id"],
        // some field parameters
        "fields": {
            // another way to declare a key field
            //"id": { "key": true },
            // the following data frame columns contain strings which must be
            // sent to the database as JSON (for json/jsonb PostgreSQL types)
            "data1": { "json": true },
            "data2": { "json": true }
        }
    }))?,
);
// send the data frame to the server in a single or multiple chunks/blocks

PostgreSQL types supported

  • BOOL, INT2 (16-bit int), INT4 (32-bit int), INT8 (64-bit int), FLOAT4 (32-bit float), FLOAT8 (64-bit float)

  • TIMESTAMP, TIMESTAMPTZ (time zone information is discarded as Arrow arrays can not have different time zones for individual records)

  • CHAR, VARCHAR

  • JSON/JSONB (encoded to strings as LargeUtf8 when fetched)

General limitations

  • Myval is not designed for data engineering. Use Polars.

  • Myval series can contain a single chunk only and there are no plans to extend this. When a Polars data frame with multiple chunks is converted to Myval, the chunks are automatically aggregated.

  • Some features (conversion to Polars, PostgreSQL) are experimental, use at your own risk.

About

Myval is a part of EVA ICS Machine Learning kit developed by Bohemia Automation.

Bohemia Automation / Altertech is a group of companies with 15+ years of experience in the enterprise automation and industrial IoT. Our setups include power plants, factories and urban infrastructure. Largest of them have 1M+ sensors and controlled devices and the bar raises higher and higher every day.

Dependencies

~6–20MB
~314K SLoC