Running Pipeline Stages in Parallel

A frequent need in Bioinformatics pipelines is to execute several tasks at the same time. There are two main cases where you want to do this:

  1. you have one set of data (eg. a sample) that needs to undergo several independent operations that can be done at the same time
  2. your data is made up of many separate samples which can be processed independently through part or all of your pipeline

In both cases you can save a lot of time by doing the operations in parallel instead of sequentially. Bpipe supports this kind of parallel execution with a simple syntax that helps you declare which parts of your pipeline can execute at the same time and what inputs should go to them.

Branches

Key to all parallelism in Bpipe is the concept of "branches". Each parallel portion of a pipeline is considered to be "branch" which is one of the split processing paths from a parent path.

Bpipe has several different types of branches, which are described below. Central to all of these different branches is that files are isolated between branches. This is very important to help ensure that you do not have input file sets that should be separated crossing over and mixing with each other. For example, imagine if you are processing multiple samples within a single pipeline: it is very important that data files for one sample are not fed into a step for a different sample. For this reason, when you use "input" variables within Bpipe, they can only resolve files that are associated to the current branch, and will not see files from outside the branch.

Data Driven Branching

Suppose you had a very simple "hello world" pipeline as illustrated below:


run {
  hello + world
}

Now suppose you wanted to add a second "mars" stage that would execute simultaneously with the "world" pipeline stage. All you need to do is place all the stages that execute together in square brackets and separate them with commas:


run {
  hello + [ world,mars ]
}

Note: if you are familiar with Groovy syntax, you will notice that the square bracket notation is how you define a list in Groovy. Thus all we are saying is that if you give Bpipe a list of stages to process, it executes them in parallel.

You can execute multiple stages in parallel too:


Bpipe.run {
  hello + [ blue + world, red + mars ]
}

Here "blue + world" and "red + mars" form sub-pipelines that execute in parallel with each other. You can have more stages at the end that are sequential:


Bpipe.run {
  hello + [ blue + world, red + mars ] + nice_to_see_you
}

Note that in this case the last stage nice_to_see_you won't execute until all of the preceding stages executing in parallel have finished. It will receive all the outputs combined from both the "blue + world" and "red + mars" stages as input.

You can also nest parallel tasks if you wish:

Bpipe.run {
  hello + [ blue + world, red + [mars,venus] ] + nice_to_see_you
}

In this case the mars and venus stages will execute simultaneously, but only after the red stage has finished executing.

Parallelizing Based on Chromosome

In bioinformatics it is often possible to run operations simultaneously across multiple chromosomes. Bpipe makes this easy to achieve using a special syntax as follows:


Bpipe.run {
  chr(1..5) * [ hello ]
}

This will run 5 parallel instances of the 'hello' pipeline stage, each receiving the same file(s) as input. Each stage will receive an implicit chr variable that can be used to refer to the chromosome that is to be processed by the stage. This can be used with many tools that accept the chromosome as an input to specify the region to process. For example, with samtools:


hello = {
    exec """samtools view test.bam $chr | some_other_tool """
}

Multiple ranges or single chromosomes can be specified:


Bpipe.run {
  chr(1..10, 'X','Y') * [ hello ]
}

This would run 12 parallel stages, passing 'chr1' through to 'chr10' and 'chrX' and 'chrY' as the the chr variable to all the different stages.

Note: Bpipe also makes available the $region variable. If no genome is declared (see below) Bpipe will emit a region in the form: chrN:0-0 for statements inside the pipeline. Many tools accept this as meaning to include the entire chromosome. However if you declare a genome (see below) then Bpipe will emit a region that includes the entire chromosome with the correct range for that chromosome.

Alternative Genomes

When parallelizing by chromosome the format of the reference sequence (eg: chromosome) can vary according to which underlying reference genome is used. The default configuration of Bpipe is that it uses the UCSC style convention of including 'chr' as a prefix before reference sequences. To omit this, as well as inform Bpipe about other aspects of the genomes, declare the genome you are using at the start of your pipeline:

genome 'GRCh37'

For genomes recognized by Bpipe that do not include the 'chr' prefix, Bpipe will use the correct style in when evaluating the $chr and $region variable.

Note: Bpipe will attempt to download the chromosome sizes for genomes it recognises from UCSC.

Parallelising over Arbitrary Genomic Regions

Bpipe also supports splitting up any arbitrary set of genomic regions to process them in parallel. This can be especially useful when you want to parallelize in a more fine grained way than per-chromosome (often necessary because chromosome sizes are quite uneven). There are two ways to split up a set of genomic regions:

  • partition to even sized parts by size of interval (allowing number of parts to be dynamic)
  • split to a specific number of parts (allowing resulting interval size to be dynamic)

Bpipe supports both of these, through two commands called partition and split.

Defining Regions

If you want to process a whole "genome" you can declare the genome using the genome command and then that genome will be available as a set of regions. However if you want to process a subset of the genome (eg. a set of exome capture regions), then you can load a custom BED file:

my_regions = bed("/some/path/to/a/bed/file.bed")

Partitioning

To partition regions, use the partition command, followed by a list of stages to parallelis over:

To process all of hg19 in 1mb blocks:

hg19.partition(1000000) * [ stage1 + stage2 ]

To process custom regions it is exactly the same:

my_regions.partition(1000000) * [ stage1 + stage2 ]

Splitting

To split regions into a specific number of parts, use the split command:

hg19.split(40) * [ stage1 + stage2 ]

Bpipe will try to create 40 even parts, but it will not try too hard. That is, if you are using a custom set of regions, it will try to bisect the gaps between the regions and will never break them in two unless there is a greater than 2:1 ratio that cannot be reduced. This way, unless you split to an extremely fine grained level, you can usually rely on Bpipe to preserve whole exons and target regions for targeted capture definitions.

When splitting a whole genome, Bpipe will attempt to download definitions of genes and exons from UCSC. It will then avoid bisecting the middle of an exon or gene in the created regions (again, unless the required granularity cannot be achieved to within a 2:1 ratio between resulting region sizes).

Branch Names

When you split or partition a set of regions, and parallelise over them using Bpipe's branch syntax (eg: the * operator), Bpipe assigns a branch name to each parallel path automatically:

  • if the region is a whole chromosome / contig, the branch name is the name of the chromosome / contig
  • otherwise, Bpipe calculates the sha1 hash of the regions and assigns the first 8 characters as the branch name

Note that when Bpipe parallelises over branches in general, the output files within the branch will automatically have the id of the branch inserted. This ensures that parallel branches do not try to write the same file. In the case where sub-chromosomal regions are parallelised, this results in files containing sha1 fragments to distinguish their names.

Accessing Regions

The actual regions to be processed can be accessed inside pipeline stages using the $region variable, or if needed as a BED file, using $region.bed.

Merging Results

See the merge points operator to understand specific support that Bpipe provides for merging the outputs from parallel segments.

Executing Multiple Stages Simultaneously on Different Data

In the above examples each parallel stage received the same input files and operated on them together. Sometimes however what you really want is to have each input file or groups of your input files processed independently through the same stage (or stages). Bpipe calls this input splitting and gives you a concise and simple way to achieve it.

Suppose we have 10 input files and we want all 10 files named input_1.txt to input_10.txt to be processed at the same time. Here is how it looks:


Bpipe.run {
   "input_%.txt" * [ hello + world ] + nice_to_see_you
}

There are two things to notice here: 1. The pipeline starts with an input splitting pattern containing a % character that shows which part of the file name should be used to split the input files into groups 1. The pipeline uses a ** (or multiplication) operator in your pipeline definition instead of the usual +

Note that Bpipe still requires you to specify the files to match against on the command line when you run your pipeline; the matching is not done on files in the file system, but on files that are part of the pipeline. So if you saved it in a file called 'helloworld.pipe' then you might run this example using something like this:


bpipe run helloworld.pipe input*.txt

Input Splitting Patterns

Splitting

Bpipe uses a very simple wildcard pattern syntax to let you indicate how your files should be split into groups for processing. In these patterns you simply replace the portion of file names that indicates what group the file belongs to with the percent character which acts as a wildcard (matches any number of characters). Files that share the same grouping portion will be passed together to the the parallel pipeline stages to process.

The pattern matching used for grouping files is a substring match. Therefore you only need to supply enough of the input file name to uniquely identify where the grouping character is. For example, the following pipeline is equivalent to the one above:


run {
   "_%." * [ hello + world ] + nice_to_see_you
}

This means Bpipe will look for the first (and shortest) token in the file name that is flanked by an underscore on the left and a period (. character) on the right. This may be useful if your files have portions of their names that differ but are not related to how you wish to group them.

Note: files that mismatch the grouping operator pattern will be filtered out of the inputs altogether. This feature can be useful by allowing you to have a directory full of files that you provide as input even if some of them are not real input files - Bpipe will filter out only the ones it needs based on the pattern you specified.

Ordering

Bpipe supports one other special character in its input splitting patterns: the * wildcard. This also acts as a wildcard match but it does not split the input into groups. Instead, it affects ordering within the groups that are split. When Bpipe matches a * character in an input splitting pattern it first splits the files into their groups (based on the % match) and then sorts them based on the portions that match the * character. This helps you ensure that even after splitting, your files are still in a sensible order. For example, consider the following input files

  • input_1_1.txt
  • input_1_2.txt
  • input_2_2.txt
  • input_2_1.txt

You can split and sort the inputs using a pattern:

"input_%_*.txt"

This pattern will then split and order the files like so:

Group 1 - input_1_1.txt, input_1_2.txt

Group 2 - input_2_1.txt, input_2_2.txt

Notice that the second group had its files reversed in order because Bpipe sorted them.

Explicitly Specifying Parallel Paths

If you don't get the flexibility you need from the above mechanisms, you can set the branch paths yourself explicitly by specifying a Groovy List or a Map that tells Bpipe what paths you want. When you specify a Map, the keys in the map are interpreted as branch names and the values in the Map are interpreted as files, or lists of files, that are supplied to the branch as input.

For example:


// Create a data structure (Map) that maps branches to files
def branches = [
    sample1: ["sample1_2.fastq.gz"],
    sample2: ["sample2_2.fastq.gz"],
    sample3: ["sample3_2.fastq.gz"]
]

align = {
   // keep the sample name as a branch variable
   branch.sample = branch.name 
   ...
}

run { branches * [ align ] }

In this example the align stage will run three times in parallel and the files specified for each branch will be explicitly provided to it. Of course, in normal usage this technique would not be best applied by specifying them statically, but rather for when you want to read the information from a file or database or other source and construct the branch => file mapping from that.

Splitting Dynamically on Arbitrary Metadata

A particular challenge occurs when your parallelisation is dynamically data driven. When this manifests as a parallel path per file or groups of files, you can usually use an input splitting pattern (see above) to create the parallel branches dynamically.

If, however, you find you need parallelisation that is not driven by files (or groups of them), you can use a pipeline stage to define downstream splitting strategy by utilising the forwardSplit(...) command.

In this context, you pass a Map to the function where:

  • keys are the branch names
  • values are metadata that is accessible on the branches via a metadata property

Consider the following example:

hello = {
    forwardSplit([ a: 'mars', b: 'jupiter' ])
}

world = {
    exec """
        echo "hello $branch.metadata from branch $branch.name"
    """
}

run {  hello * [ world ] }

This example will create two branches, names a and b. These will then have metadata mars and jupiter. The metadata can be any type of object or data structure you create yourself in Groovy. Bpipe will not attempt to resolve it as a file. This way you can split to create arbitrary branches with metadata for downstream processing as required.

NOTE: Unlike other branch properties, metadata is not inherited by child branches. If you would like it to be available in grand child and lower branches then you must manually set properties on the child branch object yourself inside the child branches.

Channels

Bpipe also supports a separate type of branch known as a "channel" (this is different to notification channels). Channels behave like branches in that they represent split processing paths within a pipeline. However, they are different to branches in that they retain their state even when their processing path terminates (by ending or merging back into the main processing path). Therefore, Channels can be "resumed" at a later point in processing, allowing you to retain the context that was associated to the processing path.

For example, if your pipeline has several phases that each have parallel processing of samples within them, you can create a sample "channel" that persists throughout the whole end to end.

Unlike branches, channels are created explicitly using the channel function.

Consider the following example:

samples = ['a','b','c']

sample_channel = channel(samples).named('sample')

sample_channel * [ 
   analyse_raw data
] + create_report  + sample_channel * [
   produce_final_results
]

Here the a, b, and c samples get processed in parallel in two separate phases. The use of the channel mechanism means that files resolved in produce_final_results will be those output for the correct sample in analyse_raw_data

Allocating Threads to Commands

Sometimes you know in advance exactly how many threads you wish to use with a command. In that case, it makes sense to specify it using the procs attribute in a configuration, or to specify it directly in the pipeline stage with the Uses clause.

Other times, however, you want to be more flexible, and allow resources to be assigned more dynamically. This means that if more compute power is available, you can take advantage of it, and when less is available, your pipeline can scale down to run on what is available. Bpipe offers some capability for this through the special $threads variable. This variable can behave in two different ways:

  • when a procs configuration is available from the bpipe.config file, the $threads variable will take that value.
  • when no procs configuration is specified by configuration, Bpipe will decide on a sensible value for $threads based on the currently unutilised concurrency slots specified to Bpipe via the -n flag (defaulting to the number of cores on the computer Bpipe is running on). Once assigned, $threads maps to the procs configuration value for any commands that it is specified in.

Example:

align_bwa = {
  exec "bwa aln -t $threads ref.fa $input.fq"
}

Here Bpipe will assign a value to $threads that tries to best utilise the total available concurrency. For example, if there are 32 cores on the computer you are using and there are 4 of these stages that execute in parallel, each one should get allocated 8 cores, as long as they start at approximately the same time.

Limiting Dynamic Concurrency

Bpipe implements dynamic concurrency in a somewhat subtle manner. When a command asks for a value for $threads, Bpipe needs to decide what other tasks the current pool of available threads should be shared with. If it simply calculates this value immediately then the first command that tries to use $threads will get allocated all the remaining concurrency slots and others will have none available. To avoid this, when a command asks for $threads, Bpipe pauses the current branch and waits until all concurrently executing paths in the pipeline are either executing tasks or have also requested $threads. Then the threads are divided up among all the requestors, and allocated fairly.

In general, the above process results in a "fair" allocation of threads to competing tasks, but you should be aware that "greedy" behavior can still emerge for tasks that are scattered apart in time. For this reason, it can be useful to set an upper limit on how many threads Bpipe will give to any one task. You can do this by setting the max_per_command_threads variable in bpipe.config. This will limit the total number of threads that can be allocated.

Another approach to this is to specify thread allocation ranges in the configuration of your commands via the procs variable. For example, we can reserve between 2 and 8 threads for bwa in the previous example by specifying in bpipe.config:

commands {
    bwa {
        procs=2..8
    }
}

This will allow Bpipe to set $threads to anything between 2 and 8.

Limitations

  1. When you run stages in parallel, you should always use the Bpipe specified output file (defined for you as the $output variable) rather than hard coding the file names. This is needed because when you define output files yourself Bpipe detects the creation of the files and interprets them as outputs of whatever pipeline stage is currently executing. However with multiple stages executing this detection can assign the the output to the wrong pipeline stage or even the wrong parallel instance of the correct pipeline stage. If you wish to "hard code" the file name that is output from a stage (or part of a stage) you can still do so, but you should do it by wrapping the command that creates that output with a Produce statement, for example:

hello = {
  produce("hello.txt") {
    exec "cp $input $output"
  }
}

Even this is not recommended because you may end up overwriting your output files from multiple parallel threads if you are not careful. In general, whenever you can, let Bpipe manage the names of your files and just give it hints to make them look the way you want.