How do I write a snakemake input when not all jobs successfully output files from previous rule?

Basically, I have three snakemake rules (other than rule all) and cannot figure this problem out, despite the checkpoint resources.

Rule one has my first and only file that I start with. It will have x outputs (the number varies depending on the input file). Each of those x outputs needs to be processed separately in rule 2, meaning that rule 2 will run x jobs. However, only some subset, y, of these jobs will produce outputs (the software only writes out files for inputs that pass a certain threshold). So, while I want each of those outputs to run as a separate job in job 3, I don’t know how many files will come out of rule 2. Rule three will also run y jobs, one for each successful output from rule 2. I have two questions. The first is how do I write the input for rule 3, not knowing how many files will come out of rule two? The second question is how can I “tell” rule 2 it is done, when there is not a corresponding number of output files to the input files? If I add a fourth rule, I imagine it would try to re-run rule two on jobs that didn’t get an output file, which would never make an output. Maybe I am missing something with setting up the checkpoints?

something like:

rule a:
     input: file.vcf
     output: dummy.txt
     shell:"""
      .... make unknown number of output files (x) x_1 , x_2, ..., x_n 
           """ 
#run a separate job from each output of rule a
rule b:
     input: x_1 #not sure how many are going to be inputs here
     output: y_1 #not sure how many output files will be here
     shell:"""
           some of the x inputs will output their corresponding y, but others will have no output
           """
#run a separate job for each output of rule b
rule c:
     input: y_1 #not sure how many input files here
     output: z_1
 

Answer

You should change rule a to a checkpoint as mentioned in the comments. Rule b will generate one output for each input and can be left as is, same for rule c in this example.

Eventually, you will have an aggregate-like rule that will decide which outputs are required. It could be rule d, or it may end up being rule all. Either way, the aggregate rule needs an input function that invokes the checkpoint to determine which files are present. If you follow along the example, you would have something like:

checkpoint a:
     input: file.vcf
     output: directory('output_dir')
     shell:"""
           mkdir {output}  # then put all the output files here!
      .... make unknown number of output files (x) x_1 , x_2, ..., x_n 
           """ 
#run a separate job from each output of rule a
rule b:
     input: output_dir/x_{n}
     output: y_{n}
     shell:"""
           some of the x inputs will output their corresponding y, but others will have no output
           """
#run a separate job for each output of rule b
rule c:
     input: y_{n}
     output: z_{n}

# input function for the rule aggregate
def aggregate_input(wildcards):
    checkpoint_output = checkpoints.a.get(**wildcards).output[0]
    return expand("z_{i}",
           i=glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i)

rule aggregate:  # what do you do with all the z files?  could be all
    input: aggregate_input

If you think of your workflow like a tree, rule a is branching with a variable number of branches. Rules b and c are one to one mappings. Aggregate brings all the branches back together AND is responsible for checking how many branches are present. Rules b and c only see one input/output and don’t care how many other branches there are.

EDIT to answer the question in comment and fixed several bugs in my code:

I still get confused here though, because rule b will not have as many outputs as inputs, so won’t rule aggregate never run until all of the wildcards from the output of checkpoint a are present in z_{n}, which they never would be?

This is confusing because it’s not how snakemake usually works and leads to a lot of questions on SO. What you need to remember is that when checkpoints.<rule>.get is run, the evaluation for that step effectively pauses. Consider the simple case of three values for i == [1, 2, 3] but only i == 2 and 3 are created in checkpoint a. We know the DAG will look like this:

rule             file
input           file.vcf
             /     |     
a                 x_2    x_3
                   |      |
b                 y_2    y_3
                   |      |
c                 z_2    z_3
                         /
aggregate           OUTPUT

Where x_1 is missing from checkpoint a. But, snakemake doesn’t know how checkpoint a will behave, just that it will make a directory as output and (because it is a checkpoint) once it is completed the DAG will be reevaluated. So if you ran snakemake -nq you would see checkpoint a and aggregate will run, but no mention of b or c. At that point, those are the only rules snakemake knows about and plans to run. Calling checkpoint.<rule>.get basically says “wait here, after this rule you will have to see what is made”.

So when snakemake first starts running your workflow, the DAG looks like this:

rule             file
input           file.vcf
                   |     
a                 ...
                   |     
????              ...
                   |     
aggregate        OUTPUT

Snakemake doesn’t know what goes between rule a and aggregate, just that it needs to run a before it can tell.

rule             file
input           file.vcf
             /     |     
a                 x_2    x_3
                        
????              ...
                   |     
aggregate        OUTPUT

Checkpoint a gets scheduled, run, and now the DAG is reevaluated. The rest of aggregate_input looks at the files that are present with glob_wildcards then uses that information to decide which files it needs. Note that the expand is requesting outputs from rule c, which requires rule b, which requires x_{n}, which exist now that the checkpoint has run. Now snakemake can construct the DAG you expect.

Here’s the input function with more comments to hopefully make it clear:

def aggregate_input(wildcards):
    # say this rule depends on a checkpoint.  DAG evaulation pauses here
    checkpoint_output = checkpoints.a.get(**wildcards).output[0]
    # at this point, checkpoint a has completed and the output (directory)
    # is in checkpoint_output.  Some number of files are there

    # use glob_wildcards to find the x_{i} files that actually exist
    found_files = glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i
    # now we know we need all the z files to be created *if* a x file exists.
    return expand("z_{i}", i=found_files)

Leave a Reply

Your email address will not be published. Required fields are marked *