5 unstable releases

new 0.3.1 Jan 26, 2025
0.3.0 Jan 26, 2025
0.2.1 Jan 23, 2025
0.2.0 Jan 23, 2025
0.1.0 Jan 20, 2025

#236 in Operating systems

Download history 82/week @ 2025-01-15 425/week @ 2025-01-22

507 downloads per month

MIT license

71KB
1.5K SLoC

Coppice: dynamic programming for acyclic analytical queries expressed as nested map/reduce

Don't use this for real.


lib.rs:

Coppice is a dynamic programming library for acyclic analytical queries expressed as nested map/reduce computations over the union of smaller data sets (tablets). These map/reduce computations are automatically cached and parallelised when executed with the [map_reduce()] higher-order function.

Of course, since we know we're only interested in final [map_reduce()] results, we actually memoise fully aggregated results for each "tablet" (small set of rows) and opaque params.

In addition to these standard inputs (and cache keys), Coppice also passes "join keys" to the mapped functions. This third type of inputs (in addition to rows and opaque params) enables Coppice to offer asymptotically superior performance compared to pure memoisation: [map_reduce()] essentially executes mapped functions "in reverse," for join keys.

The [map_reduce()] function ends up evaluating the mapped function for each row and params, but extracts all join keys that, combined with the row and params, yields a non-trivial Aggregate. We thus extract, for each row and params, a branching function from join keys to Aggregate, and reduce (merge::Merge) these branching functions together for all rows in a tablet. The maximum number of join keys that yield non-trivial results for a given row (should) depend on the the mapped function, but not on the rows or params... i.e., it's a constant.

For analytical queries, the number of scan over data files is often what we really care about. Pure memoisation gives us scans proportional to |tablets| * |params| * |join_keys|; that's often unrealistically large, which forces people to come up with ad hoc indexing schemes. The way Coppice caches branching functions instead of raw values means the number of scans instead scales with |tablets| * |params|. When the join keys have a large cardinality, shaving that |join_keys| multiplicative factor in I/O can be a real practical win... hopefully enough to justify the CPU overhead of Coppice's function inversion approach.

Two key ideas underlie Coppice.

The first is that backtracking search over join keys represented as bounded arrays of bits gives us enough finite domain powers to weakly emulate logic programming. That's not a lot, but enough to automatically build a bit-trie index from an opaque function.

The second is that many analytical queries use only hierarchical joins (a well known fact), and that writing these queries as regular code implicitly gives us the tree necessary for Yannakakis's algorithm (maybe less well known).

In short, the Coppice is just an API trick to coerce Rust coders into writing plain code that can be executed with a version of Yannakakis's algorithm simplified for the hierarchical subset of acyclic queries.

Examples

See examples/ny_philharmonic.rs for an executable example that processes sets of JSON files that each contains metadata for the New York Philharmonic Orchestra's performances over different periods.

Counting the total number of programs is a simple map/reduce function:

fn load_json_dump(path: impl AsRef<Path>) -> Result<Vec<Program>, &'static str>;

fn count_programs(files: &[PathBuf]) -> Result<u64, &'static str> {
    // Create a query cache where each dataset is identified by a PathBuf,
    // and the summary is just a counter
    coppice::query!(
        CACHE(path: PathBuf) -> Counter,
        load_json_dump(path),  // Load the data in each path with `load_json_dump`
        _program => Counter::new(1) // And count 1 for each Program in the json dump
    );

    Ok(CACHE.nullary_query(files)?.count)
}

Counting the number of performances for each composer isn't hard either

fn count_composer_occurrences(files: &[PathBuf]) -> Result<Vec<(String, u64)>, &'static str> {
    coppice::query!(
        // This time, we go from data in PathBuf to a Histogram keyed on String names
        CACHE(path: PathBuf) -> Histogram<String>,
        load_json_dump(path),  // Again, load the Programs in each json file
        row => {
            let mut ret: Histogram<String> = Default::default();

            // For each work in the row (in the program), add 1
            // for each occurrence of a composer.
            for work in row.works.iter() {
                if let Some(composer) = &work.composer_name {
                    ret.observe(composer.to_owned(), Counter::new(1));
                }
            }

            ret
        }
    );

    Ok(CACHE.nullary_query(files)?.into_popularity_sorted_vec())
}

It's nice that the above is automatically cached and parallelised, but that's nothing super interesting. The next one should better motivate the approach: we filter down the programs to only those that occurred in a given venue, and accept an optional "root" composer. The histogram count composer occurrences for programs that included a given venue and in which the root composer was also featured.

fn count_composer_cooccurrences(
    files: &[PathBuf],
    venue: String,
    root_composer: Option<String>,
) -> Result<Vec<(String, u64)>, &'static str> {
    coppice::query!(
        // We take a PathBuf, a venue, and maybe a root composer, and return a histogram keyed on composer names.
        COOCCURRENCES(path: PathBuf, venue: +String, root_composer: -Option<String>) -> Histogram<String>,
        load_json_dump(path),  // Again, load each `PathBuf` with `load_json_dump`.
        rows => {
            use rayon::iter::IntoParallelIterator;
            use rayon::iter::ParallelIterator;

            let venue = venue.clone();  // Post-process the `Vec<Program>` returned by load_json_dump
            Ok(rows
                .into_par_iter()
                // Make sure the target venue appears in at least one of the concerts
                .filter(move |row| row.concerts.iter().any(|concert| concert.venue == venue))
                // extract the composer names.
                .map(|row| {
                    row.works
                        .iter()
                        .map(|work| work.composer_name.clone())
                        .collect::<Vec<Option<String>>>()
                }))
        },
        token, composers => {
            let _ = venue;  // We don't use the venue here, it was already handled above
            let mut ret: Histogram<String> = Default::default();

            let mut maybe_composers: Vec<&Option<String>> = vec![&None];
            maybe_composers.extend(composers.iter());

            let (mut token, root_composer) = token.focus(root_composer);

            // If either `root_composer` is None, or matches one of the composers
            // in the program...
            let any_match = token.eql(root_composer, &None) || composers.iter().any(|composer| token.eql(root_composer, composer));

            if any_match {
                // Count occurrences in the histogram
                for composer in composers.iter().flatten().cloned() {
                    ret.observe(composer, Counter::new(1));
                }
            }

            ret
        }
    );

    Ok(COOCCURRENCES
        .query(files, &venue, &root_composer)?
        .into_popularity_sorted_vec())
}

This more complex examples shows what's interesting about Coppice: the query call scans the files once regardless of how many different root_composer values we pass.

On my laptop, the first call to count_composer_cooccurrences takes 2 seconds. Subsequence calls with various root composers (e.g., count how many times works by each composer was played in the same program as Wagner) take ~100 micro seconds, without any file I/O. This is possible because COOCCURRENCES.query enumerates all possible values of root_composer that would result in a non-trival result for each row, and caches the result in a (bad) trie.

Dependencies

~1.4–2MB
~39K SLoC