1 Multiple cores at JHPCE

While SLURM provides a few different ways to request multiple resource (e.g. nodes, cores, or tasks), JHPCE admins advise using the -c/ --cpus-per-task argument to srun or sbatch to request multiple CPU cores. Here we’ll request an interactive session with 8 cores using -c 8.

srun --pty -c 8 --mem=5G --x11 -t 08:00:00 bash

2 Multiple cores in R with BiocParallel

2.1 Intro to BiocParallel

BiocParallel is a Bioconductor R package that makes parallel computation across several available CPUs straightforward. We’ll also make use of other GitHub, CRAN, and Bioconductor packages.

These can be installed if necessary:

#   GitHub package
remotes::install_github("LieberInstitute/slurmjobs")

#   Pre-requisite for Bioconductor packages
if (!requireNamespace("BiocManager", quietly = TRUE)) {
    install.packages("BiocManager")
}

#   Install CRAN and Bioconductor packages
pkgs = c("BiocParallel", "tidyverse", "spatialLIBD", "nnSVG")
for (p in pkgs) {
    if (!requireNamespace("BiocManager", quietly = TRUE)) {
        BiocManager::install(p)
    }
}

Then the packages can be loaded:

library(BiocParallel)
library(tidyverse)
library(spatialLIBD)
library(nnSVG)
library(slurmjobs)

2.2 Automatic number-of-cores detection

First, it’s useful to automatically determine the number of cores available to R, and define this at the top of our script rather than hardcode it potentially in multiple locations. Can’t BiocParallel do this for us? On many computers, yes, but on many computing clusters (including JHPCE), automatic core detection is buggy, and R will believe is has access to all cores present on the entire compute node:

parallel::detectCores()
## [1] 48

Instead, we’ll read in the SLURM_CPUS_PER_TASK environment variable, which always holds the value we specified earlier with srun -c 8.

num_cores = as.integer(Sys.getenv('SLURM_CPUS_PER_TASK'))
print(num_cores)
## [1] 8

2.3 bplapply: basic usage

With things set up, we’ll specifically dive into the bplapply function, which is invoked identically to base R’s lapply, except for the addition of a BPPPARAM parameter specifying how many cores to use.

We’ll start with a simple example to demonstrate the usage of bplapply: taking the square of a list of numbers using 8 cores. While useful as an example, exponentiation is already vectorized, and the operation is typically so fast that parallelism introduces more overhead than the additional cores help with speed! TL;DR: normally just do x**2 here, but this is how it’s done with bplapply:

We’ll define a function square(), simply taking the square of an input numeric vector. Let’s say we want to square the numbers 1 through 10000.

square = function(x) x**2
nums_to_square = 1:10000
head(nums_to_square, n = 3)
## [1] 1 2 3

To run in parallel, we’ll call bplapply just like lapply, but adding BPPARAM = MulticoreParam(num_cores). BiocParallel offers many backends, but MulticoreParam is generally recommended on Linux-like operating systems (including at JHPCE) because it uses forked processes and shared memory (meaning low overhead in many situations!).

squared_nums = bplapply(
    nums_to_square, square, BPPARAM = MulticoreParam(num_cores)
)
head(squared_nums, n = 3)
## [[1]]
## [1] 1
## 
## [[2]]
## [1] 4
## 
## [[3]]
## [1] 9

2.4 bplapply: quantifying overhead

While MulticoreParam is quite efficient in many situations, all parallel computations involve some “overhead”: it takes time to for R to communicate what work each CPU core should do, and receive a computation’s results from each worker. While we won’t cover this today, note that overhead can get worse when a central object is being modified by multiple workers, and changes must be copied between different workers. In any case, the takeaway is that adding more cores isn’t always the right choice to make a job faster.

Let’s do a small experiment to measure how number of cores influences the speed of our computation. Predicting this relationship isn’t always easy, so measuring it can be useful. We’ll do the previous task and time it for different numbers of cores: 1 through 8.

time_df = tibble(
    n_cores = 1:num_cores,
    elapsed_time = 0 # will be overwritten with actual times
)

for (this_num_cores in time_df$n_cores) {
    time_df[this_num_cores, 'elapsed_time'] = system.time(
        {
            temp_result = bplapply(
                nums_to_square,
                square,
                BPPARAM = MulticoreParam(this_num_cores)
            )
        }
    )[3] # the third value is elapsed / "wall clock" time
}

With ggplot2 the general relationship can be sketched out. Note that we measured elapsed time, which is likely responsible for most of the noise seen in the plot. While there’s a slightly decreasing trend to the runtime as cores increases, the overhead is so severe that we’re better off not parallelizing at all!

ggplot(time_df) +
    geom_point(aes(x = n_cores, y = elapsed_time)) +
    coord_cartesian(ylim = c(0, max(time_df$elapsed_time)))

2.5 bplapply: full example

Here, we’ll do a complete example of an analysis using bplapply. We’ll use spatial transcriptomics data of the human dorsolateral prefrontal cortex, collected with the Visium Spatial Proteogenomics platform, as made available through the spatialLIBD R package. We’ll use nnSVG to find spatially variable genes. Since we’re working with a dataset consisting of the same brain region repeated across many donors, the natural unit of parallelism will be the donor. In other words, we’re repeating the same analysis (using nnSVG) for several donors independently; each of these donors can be processed in parallel. Let’s grab the data and show that we have 4 donors:

spe = fetch_data(type = "spatialDLPFC_Visium_SPG")
unique(spe$sample_id)
## [1] "Br2720_Ant_IF"  "Br6432_Ant_IF"  "Br6522_Ant_IF"  "Br8667_Post_IF"

Since bplapply must iterate over a function, we’ll define the nnSVG workflow as a function. It takes a SpatialExperiment object “spe” and the ID for one donor “sample_id”. The order of parameters here is important: the first parameter must be the variable over which we’re iterating.

run_nnSVG = function(sample_id, spe) {
    spe <- spe[, spe$sample_id == sample_id]

    #---------------------------------------------------------------------------
    #   Filter lowly expressed and mitochondrial genes, and take spots with at
    #   least some nonzero counts
    #---------------------------------------------------------------------------

    message(Sys.time(), " | Filtering genes and spots")
    spe <- filter_genes(
        spe,
        filter_genes_ncounts = 3,
        filter_genes_pcspots = 0.5,
        filter_mito = TRUE
    )
    spe <- spe[rowSums(assays(spe)$counts) > 0, colSums(assays(spe)$counts) > 0]
    message("Dimensions of spe after filtering:")
    print(dim(spe))

    #---------------------------------------------------------------------------
    #   Recompute logcounts (library-size normalization as recommended in
    #   https://bioconductor.org/packages/release/bioc/vignettes/nnSVG/inst/doc/nnSVG.html)
    #---------------------------------------------------------------------------

    message(Sys.time(), " | Re-computing logcounts")
    spe <- computeLibraryFactors(spe)
    spe <- logNormCounts(spe)

    #---------------------------------------------------------------------------
    #   Run nnSVG
    #---------------------------------------------------------------------------

    spe <- nnSVG(spe)

    return(spe)
}

Finally, we can run nnSVG on all 4 donors in parallel and retrieve a list of SpatialExperiment objects as output.

spe_list_out = bplapply(
    unique(spe$sample_id),
    run_nnSVG,
    BPPARAM = MulticoreParam(num_cores),
    #   Additional arguments to the 'run_nnSVG' function
    spe = spe
)

3 Parallelism by array job

3.1 Theory: embarassingly parallel tasks

The above example with nnSVG was an example of an embarassingly parallel task: parallelism could be introduced simply be dividing up the data and processing as independent units; no additional sophisticated code was needed to produce a parallel workflow.

For embarassingly parallel tasks, I almost always use array jobs instead of BiocParallel. Array jobs are a SLURM feature that allows the same shell script to be executed as a series of separate jobs, called tasks. An environment variable called “SLURM_ARRAY_TASK_ID” is available to each task, and can be used to differentiate how each task behaves. Typically, I will use each task to process a separate unit of data, just as we processed donors as separate units with BiocParallel for nnSVG earlier.

3.2 Theory: BiocParallel vs. array jobs

What makes an array job preferable to BiocParallel?

  • Trivial re-execution of failed tasks: When many units of data are processed in parallel, it’s common for some units to have issues and others to complete successfully. slurmjobs::array_submit makes re-execution of failed jobs simple, whereas all tasks would have to be re-executed using BiocParallel if even one fails!
  • Reduced overhead: Since array tasks are independent jobs, no computational time is spent on sending instructions or data to, from, or between workers. This typically means that an array job with N tasks will complete almost N times faster than its non-parallel, full-dataset counterpart.
  • Independent use of resources: When one task in an array completes, its associated CPU(s) and memory are freed, resulting in efficient use of resources. In contrast, parallel work in R reserves memory and CPUs until all tasks are complete.
  • Simpler logging: While BiocParallel does have ways of coherently producing logs for parallel tasks, it’s more difficult than array jobs, whereas simply putting %s somewhere in the log name produces clean, separate logs in real time for all array tasks.

Is there ever a case where BiocParallel is better?

  • Parallel tasks that depend on or must communicate with eachother
  • Work with relatively fast, repetitive tasks: Suppose we had a problem where it took 15 minutes to load and preprocess the data, and 1000 units of data that could be independently processed in 3 seconds each. An array job would involve loading the data 1000 times, only to compute results for 3 seconds afterward. With BiocParallel, the data could be loaded once, and only the faster, repeated portion could be parallelized.

3.3 Practice: creating the shell script

To write the nnSVG example as an array job, (almost) the entire R script will be written to operate on one donor at a time, much like the run_nnSVG function over which bplapply was iterating in the BiocParallel example. There will be a couple more lines at the beginning dedicated to loading the full dataset and subsetting to one donor, depending on the array task ID.

Array jobs are fundamentally shell scripts, and to construct the shell-script portion, we’ll use job_single from the slurmjobs R package I built. This function is designed to reduce the need to memorize or repeatedly type SLURM-specific syntax; instead, a shell script is created by specifying key variables like memory, number of cores, and time limit. It also includes code to print helpful reproducibility information.

Note how we specify 1 core below! This means each of the four array tasks will use one core– this effectively means the dataset as a whole is still parallelized across four cores, just like our earlier BiocParallel example. While it’s theoretically possible to specify more than one core and nest BiocParallel parallelism within an array job, I find that if you get to the situation where this feels necessary, generally finding smaller units of data to loop over with just an array job is a simpler solution (and better, for the reasons I outlined above when comparing array jobs with BiocParallel).

job_single by default prints the script it will create:

job_single(
    name = "nnSVG_array", memory = "20G", cores = 1, task_num = 4,
    create_logdir = FALSE
)
## #!/bin/bash
## #SBATCH -p shared
## #SBATCH --mem=20G
## #SBATCH --job-name=nnSVG_array
## #SBATCH -c 1
## #SBATCH -t 1-00:00:00
## #SBATCH -o logs/nnSVG_array.%a.txt
## #SBATCH -e logs/nnSVG_array.%a.txt
## #SBATCH --mail-type=ALL
## #SBATCH --array=1-4%20
## 
## set -e
## 
## echo "**** Job starts ****"
## date
## 
## echo "**** JHPCE info ****"
## echo "User: ${USER}"
## echo "Job id: ${SLURM_JOB_ID}"
## echo "Job name: ${SLURM_JOB_NAME}"
## echo "Node name: ${SLURMD_NODENAME}"
## echo "Task id: ${SLURM_ARRAY_TASK_ID}"
## 
## ## Load the R module
## module load conda_R/4.3
## 
## ## List current modules for reproducibility
## module list
## 
## ## Edit with your job command
## Rscript -e "options(width = 120); sessioninfo::session_info()"
## 
## echo "**** Job ends ****"
## date
## 
## ## This script was made using slurmjobs version 1.2.1
## ## available from http://research.libd.org/slurmjobs/

We’ll re-run with create_shell = TRUE to write to disk this time:

job_single(
    name = "nnSVG_array", memory = "20G", cores = 1, task_num = 4,
    create_logdir = FALSE, create_shell = TRUE
)
## 2024-03-05 11:26:53.53031 creating the shell file nnSVG_array.sh
## To submit the job use: sbatch nnSVG_array.sh