#parquet #csv #dataset #feather #arrow-ipc

dataset-writer

Utilities to write CSV/Arrow/Parquet files concurrently

3 stable releases

1.1.0 Oct 29, 2024
1.0.1 Sep 12, 2024
1.0.0 Sep 3, 2024

#386 in Filesystem

Download history 38/week @ 2024-09-25 37/week @ 2024-10-02 39/week @ 2024-10-09 25/week @ 2024-10-16 103/week @ 2024-10-23 103/week @ 2024-10-30 52/week @ 2024-11-06 43/week @ 2024-11-13 47/week @ 2024-11-20 41/week @ 2024-11-27 39/week @ 2024-12-04 43/week @ 2024-12-11 32/week @ 2024-12-18 17/week @ 2024-12-25 27/week @ 2025-01-01 44/week @ 2025-01-08

128 downloads per month
Used in swh_graph_topology

GPL-3.0-or-later

27KB
453 lines

dataset-writer-rs

Utilities to write CSV/Arrow/Parquet files concurrently

CSV example

use tempfile::TempDir;
use rayon::prelude::*;

use dataset_writer::*;

let tmp_dir = TempDir::new().unwrap();

let mut dataset_writer = ParallelDatasetWriter::<CsvZstTableWriter>::new(tmp_dir.path().join("dataset"))
    .expect("Could not create directory");

(0..100000)
    .into_par_iter()
    .try_for_each_init(
        || dataset_writer.get_thread_writer().unwrap(),
        |table_writer, number| -> Result<(), csv::Error> {
            table_writer.write_record(&[number.to_string()])
        }
    )
    .expect("Failed to write table");

Parquet example

use std::sync::Arc;

use anyhow::Result;
use arrow::array::{Array, ArrayBuilder, StructArray, UInt64Builder};
use arrow::datatypes::{Field, Schema};
use arrow::datatypes::DataType::UInt64;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use rayon::prelude::*;

use dataset_writer::*;

let tmp_dir = TempDir::new().unwrap();

fn schema() -> Schema {
    Schema::new(vec![Field::new("id", UInt64, false)])
}
let writer_properties = WriterProperties::builder().build();

#[derive(Debug)]
pub struct Builder(UInt64Builder);

impl Default for Builder {
    fn default() -> Self {
        Builder(UInt64Builder::new_from_buffer(
            Default::default(),
            None, // Values are not nullable -> validity buffer not needed
        ))
    }
}

impl StructArrayBuilder for Builder {
    fn len(&self) -> usize {
        self.0.len()
    }

    fn buffer_size(&self) -> usize {
        // No validity slice
        self.len() * 8
    }

    fn finish(&mut self) -> Result<StructArray> {
        let columns: Vec<Arc<dyn Array>> = vec![Arc::new(self.0.finish())];

        Ok(StructArray::new(
            schema().fields().clone(),
            columns,
            None, // nulls
        ))
    }
}

let mut dataset_writer = ParallelDatasetWriter::<ParquetTableWriter<Builder>>::with_schema(
    tmp_dir.path().join("dataset"),
    (Arc::new(schema()), writer_properties)
)
.expect("Could not create directory");

(0..100000)
    .into_par_iter()
    .try_for_each_init(
        || dataset_writer.get_thread_writer().unwrap(),
        |table_writer, number| -> Result<()> {
            table_writer.builder()?.0.append_value(number);
            Ok(())
        }
    )
    .expect("Failed to write table");

Dependencies

~2–11MB
~120K SLoC