How to split a large .csv file using dask?

I am trying to use dask in order to split a huge tab-delimited file into smaller chunks on an AWS Batch array of 100,000 cores.

In AWS Batch each core has a unique environment variable AWS_BATCH_JOB_ARRAY_INDEX ranging from 0 to 99,999 (which is copied into the idx variable in the snippet below). Thus, I am trying to use the following code:

import os
import dask.dataframe as dd

idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])

df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv", sep='t')
df = df.repartition(npartitions=100_000)
df = df.partitions[idx]

df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df.to_csv(f"/tmp/split_{idx}.tsv", sep="t", index=False)
print(idx, df.shape, df.head(5))

Do I need to call presist and/or compute before calling df.to_csv?

Answer

When I have to split a big file into multiple smaller ones, I simply run the following code.

Read and repartition

import dask.dataframe as dd

df = dd.read_csv("file.csv")
df = df.repartition(npartitions=100)

Save to csv

o = df.to_csv("out_csv/part_*.csv", index=False)

Save to parquet

o = df.to_parquet("out_parquet/")

Here you can use write_metadata_file=False if you want to avoid metadata.

Few notes:

  • I don’t think you really need persist and compute as you can directly save to disk. When you have problems like memory error is safer to save to disk rather than compute.
  • I found using parquet format at least 3x faster than csv when it’s time to write.

Leave a Reply

Your email address will not be published. Required fields are marked *