I am trying to use dask
in order to split a huge tab-delimited file into smaller chunks on an AWS Batch array of 100,000 cores.
In AWS Batch each core has a unique environment variable AWS_BATCH_JOB_ARRAY_INDEX
ranging from 0 to 99,999 (which is copied into the idx
variable in the snippet below). Thus, I am trying to use the following code:
import os import dask.dataframe as dd idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"]) df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv", sep='t') df = df.repartition(npartitions=100_000) df = df.partitions[idx] df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan) df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan) df.to_csv(f"/tmp/split_{idx}.tsv", sep="t", index=False) print(idx, df.shape, df.head(5))
Do I need to call presist
and/or compute
before calling df.to_csv
?
Answer
When I have to split a big file into multiple smaller ones, I simply run the following code.
Read and repartition
import dask.dataframe as dd df = dd.read_csv("file.csv") df = df.repartition(npartitions=100)
Save to csv
o = df.to_csv("out_csv/part_*.csv", index=False)
Save to parquet
o = df.to_parquet("out_parquet/")
Here you can use write_metadata_file=False
if you want to avoid metadata.
Few notes:
- I don’t think you really need persist and compute as you can directly save to disk. When you have problems like memory error is safer to save to disk rather than compute.
- I found using parquet format at least 3x faster than csv when it’s time to write.