Partition all the datas!

By: Jacob Quinn

Re-posted from: https://quinnj.home.blog/2020/11/13/partition-all-the-datas/

It’s been a while, but I wanted to put up a quick post around some new functionality in the Tables.jl package; as of the v1.1 release, a new function Tables.partitions has been introduced into the Tables.jl interfaces. Tables.partitions adds a “batch processing” layer to the already established Tables.rows/Tables.columns interfaces to allow input source tables to signal that they have natural “batches” or partitions. If a source overloads Tables.partitions, each partition/iteration should return a valid “table”, aka an object that satisfies the rows or columns interfaces.

Why partitions? There are certain workflows and algorithms that work best on “batches” of data, by employing the popular map-reduce methodology, or in cases of extremely large datasets, just merely processing such large tables chunk by chunk.

What are some concrete uses already out there? Here are a few examples:

  • In the Arrow.jl package, data is naturally stored in “record batches”. The package provides the Arrow.Stream type to allow iterating record batches from arrow data. The Arrow.write also allows writing input table partitions out as separate record batches, using separate threads to write each batch.
  • In the CSV.jl package, the CSV.Chunks type is provided for processing extremely large files in “chunks”, where each iteration is a CSV.File object. Because CSV.File is a valid Tables.jl table, CSV.Chunks naturally satisfies the Tables.partitions interface by returning itself. CSV.write also now supports the partition=true keyword argument that allows writing out multiple files at once using multiple threads, writing input partitions out to the separate files.

There has also been efforts at providing several convenience functions to make working with partitions easier. For example:

  • Tables.partitioner: is basically a lazy version of map, where each element returned must satisfy the Tables.jl source interface. This allows for easy “partitioning”, of multiple tables where each table is a partition. For example, Tables.partitioner(CSV.File, array_of_csv_filenames) will returned a object where Tables.partitions applys CSV.File to each element of the array_of_csv_filenames array. This allows treating a bunch of csv file as a single “long” table, as long as each csv file has the same schema.
  • TableOperations.joinpartitions: can take a partitioned input table, and return a single “table” that satisfies the Tables.columns interface; it does this by taking the columns of each partition and “appending” them together using the ChainedVector array type from the SentinelArrays.jl package.
  • TableOperations.makepartitions: Take an input Tables.jl compatible table source, along with an integer number of rows, and return an object that implements Tables.partitions by returning the # of specified rows in each partition.

Anyway, it’s exciting to already hear about people adopting these behaviors/functionality and enjoying the benefits of partitioning data seamlessly across packages. As always, feel free to ping me in the JuliaLang #data channel, or hit me up on twitter.