nf-parquet is a Nextflow plugin able to read and write parquet files

This plugin provides with several Functions to work with parquet files as fromParquet, to emit the content of a file, or writeRecords to write a Collection of records

WARNING

This plugin relay heavily in the Record Java concept introduced in recent versions of Java, so it requires Java version 19 as a minimum

CSV vs Parquet

A csv file is a text file where each line represents a record and fields are separated by some special character(";" for example)

Parquet by opposite is a binary file and can’t be opened with a simple editor, but file size is smaller and has a better read performance

SchemaS preparation

First thing to do (only one time meanwhile your schema doesn’t change) is to define and compile the schemaS to use, represented as Record Java classes

In your nextflow pipeline repository create a folder schemas (for example) and a subfolder myrecords (for example)

Create two records java:

CustomRecord.java
package myrecords;
record CustomRecord(long id, String name, int sizell, double value, double percentile) {

}
SingleRecord.java
package myrecords;
record SingleRecord(long id, String name) {
}
INFO

As you can see, they’re pure Java records

CustomRecords will represent a "full" record we want to write/read from a parquet file meanwhile SingleRecord will represent a projection, a subset of fields from CustomRecords. Using projections can improve the CPU and time spent on reading a huge file as the parquet reader is able to skip non-interesting records

Now create a module-info.java file:

module-info.java
module myrecords {
    opens myrecords;
}

This file is required to allow the access of our schemas to all modules (and avoid classpath loaders issues)

Now compile your scemas with:

javac --release 17 -d lib/ schemas/myrecords/*

If all goes well you’ll have in your lib folder 3 classes. Nextflow will attach these classes in the classpath so nf-plugin will be able to inspect them.

WARNING

This step is only required meanwhile your schemas not change. In case you need to add/remove fields or create new schemas (records java) you need to execute the javac again

TIP

Remember to add to the repository the schemas file. Maybe you’ll want to add the lib folder with the binaries

Configuration

plugins {
     id "nf-parquet@LATEST_VERSION"
}

// Nothing configurable by the moment

Basic example

In this example, we’ll create a parquet file, and after we’ll process each record

include {fromParquet; writeRecords} from 'plugin/nf-parquet' //(1)

import myrecords.*  //(2)

writeRecords("test.parquet", [  //(3)
        new CustomRecord(1,"test1",11, new Random().nextDouble(), new Random().nextDouble()),
        new CustomRecord(2,"test2",12, new Random().nextDouble(), new Random().nextDouble()),
        new CustomRecord(3,"test3",13, new Random().nextDouble(), new Random().nextDouble()),
])

process HELLO {
    input: val(customer)
    output: stdout
    """
    echo hello ${customer.name()}
    """
}

workflow {
    channel.fromParquet("test.parquet", CustomRecord) //(4)
            | HELLO
            | view
}
  1. import nf-parquet functions

  2. import our schemas

  3. use writeRecords function to write a Collection of records

  4. read the file and emit every record to a process

Projection example

In previous example we’ve seen how to write and read a CustomRecord. If all goes well you’ll have a test2.parquet file with three records

In this example, we’ll read these records but only a subset of fields

include { fromParquet } from 'plugin/nf-parquet'

import myrecords.*

channel.fromParquet( "test.parquet", SingleRecord ) //(1)
        | view
  1. Read only id and name (defined in SingleRecord)

Complex schema

Imagine you have a "complex" schema where a Person has an Address and a Job

record Address(String street, String zip, String city) { }
record Job(String company, String position, int years){ }
record Person(long id, Job job, Address address) { }

record SingleAddress(String street) { }
record SinglePerson(long id, SingleAddress address) { }

Create these new Java records in the schema/myrecords folder and recompile them using the javac command

Similar to previous examples you can generate a presidents.parquet file:

include {
        writeRecords
} from 'plugin/nf-parquet'

import myrecords.*

writeRecords(
        "presidents.parquet",
        [
                new Person(1010101,
                        new Job("USA", "POTUS", 3),
                        new Address("1600 Pennsylvania Av.", "20500", "Washington")),
                new Person(1010102,
                        new Job("Spain", "POSPAIN", 12),
                        new Address("Moncloa.", "28000", "Madrid")),
        ])

or read a projection of them:

include {
    fromParquet
} from 'plugin/nf-parquet'

import myrecords.*

channel.fromParquet( "presidents.parquet", SinglePerson )
        | map { it.address().street() }
        | view

Schemaless

In case you don’t want to deal with the definition (and compilation) of record schemas, and in the case you only want to read parquet files, you can use the fromRawParquet Function

This Function read the parquet file and emits the records as Map so you don’t need to define any schema

include {
    fromRawParquet
} from 'plugin/nf-parquet'

channel.fromRawParquet( "presidents.parquet" )
        | map { it.address.street }
        | view

In this case map operator receives a Map<Object,Object> so you can navigate using Groovy syntax over it