nf-parquet is a Nextflow plugin able to read and write parquet files
This plugin provides with several Functions
to work with parquet files
as fromParquet
, to emit the content of a file, or writeRecords
to
write a Collection of records
- WARNING
-
This plugin relay heavily in the
Record
Java concept introduced in recent versions of Java, so it requires Java version 19 as a minimum
CSV vs Parquet
A csv file is a text file where each line represents a record and fields are separated by some special character(";" for example)
Parquet by opposite is a binary file and can’t be opened with a simple editor, but file size is smaller and has a better read performance
SchemaS preparation
First thing to do (only one time meanwhile your schema doesn’t change) is to define and compile the schemaS to use,
represented as Record
Java classes
In your nextflow pipeline repository create a folder schemas
(for example) and a subfolder myrecords
(for example)
Create two records java:
package myrecords;
record CustomRecord(long id, String name, int sizell, double value, double percentile) {
}
package myrecords;
record SingleRecord(long id, String name) {
}
- INFO
-
As you can see, they’re pure Java records
CustomRecords will represent a "full" record we want to write/read from a parquet file meanwhile SingleRecord
will represent a projection
, a subset of fields from CustomRecords.
Using projections can improve the CPU and time spent on reading a huge file as the parquet reader is able to skip
non-interesting records
Now create a module-info.java
file:
module myrecords {
opens myrecords;
}
This file is required to allow the access of our schemas to all modules (and avoid classpath loaders issues)
Now compile your scemas with:
javac --release 17 -d lib/ schemas/myrecords/*
If all goes well you’ll have in your lib
folder 3 classes. Nextflow will attach these classes in the classpath
so nf-plugin
will be able to inspect them.
- WARNING
-
This step is only required meanwhile your schemas not change. In case you need to add/remove fields or create new schemas (records java) you need to execute the
javac
again - TIP
-
Remember to add to the repository the schemas file. Maybe you’ll want to add the lib folder with the binaries
Configuration
plugins {
id "nf-parquet@LATEST_VERSION"
}
// Nothing configurable by the moment
Basic example
In this example, we’ll create a parquet file, and after we’ll process each record
include {fromParquet; writeRecords} from 'plugin/nf-parquet' //(1)
import myrecords.* //(2)
writeRecords("test.parquet", [ //(3)
new CustomRecord(1,"test1",11, new Random().nextDouble(), new Random().nextDouble()),
new CustomRecord(2,"test2",12, new Random().nextDouble(), new Random().nextDouble()),
new CustomRecord(3,"test3",13, new Random().nextDouble(), new Random().nextDouble()),
])
process HELLO {
input: val(customer)
output: stdout
"""
echo hello ${customer.name()}
"""
}
workflow {
channel.fromParquet("test.parquet", CustomRecord) //(4)
| HELLO
| view
}
-
import nf-parquet functions
-
import our schemas
-
use writeRecords function to write a Collection of records
-
read the file and emit every record to a process
Projection example
In previous example we’ve seen how to write and read a CustomRecord
. If all goes well you’ll have a
test2.parquet
file with three records
In this example, we’ll read these records but only a subset of fields
include { fromParquet } from 'plugin/nf-parquet'
import myrecords.*
channel.fromParquet( "test.parquet", SingleRecord ) //(1)
| view
-
Read only id and name (defined in SingleRecord)
Complex schema
Imagine you have a "complex" schema where a Person
has an Address
and a Job
record Address(String street, String zip, String city) { }
record Job(String company, String position, int years){ }
record Person(long id, Job job, Address address) { }
record SingleAddress(String street) { }
record SinglePerson(long id, SingleAddress address) { }
Create these new Java records in the schema/myrecords
folder and recompile them using the javac command
Similar to previous examples you can generate a presidents.parquet
file:
include {
writeRecords
} from 'plugin/nf-parquet'
import myrecords.*
writeRecords(
"presidents.parquet",
[
new Person(1010101,
new Job("USA", "POTUS", 3),
new Address("1600 Pennsylvania Av.", "20500", "Washington")),
new Person(1010102,
new Job("Spain", "POSPAIN", 12),
new Address("Moncloa.", "28000", "Madrid")),
])
or read a projection of them:
include {
fromParquet
} from 'plugin/nf-parquet'
import myrecords.*
channel.fromParquet( "presidents.parquet", SinglePerson )
| map { it.address().street() }
| view
Schemaless
In case you don’t want to deal with the definition (and compilation) of record schemas, and in the case you only want
to read parquet files, you can use the fromRawParquet
Function
This Function read the parquet file and emits the records as Map
so you don’t need to define any schema
include {
fromRawParquet
} from 'plugin/nf-parquet'
channel.fromRawParquet( "presidents.parquet" )
| map { it.address.street }
| view
In this case map
operator receives a Map<Object,Object>
so you can navigate using Groovy syntax over it