R parallelisation and large data frames

The following is a naive benchmarking of a couple of R's parallelisation functions with a large data frame.

Problem: apply a function to each row of a data frame. The data frame is >10 million rows.

Here: I subset the data frame into different row numbers to compare the speed of different parallelization options. The results are reported in seconds. The time is measured using R's tictoc function and reports in seconds. There runs are not replicated, so there is no uncertainty provided (to my chargin, but one run with 10 mil rows takes > 2 hours).

Results

Shared memory: 2,3,7
Distributed memory: 1,4,5,6
Progress bars: 1,2,3,5
Splitting data frame: 3,4,5,6

  • This indicates that while shared memory is superior for data frames about 100,000 rows or less, the distributed memory options come into strength after that.
  • The progress bars have very little, if any, impact on the computational time (thank goodness, cause they are the best thing since sliced bread).
  • Splitting the data frame is key for these large data sets. This is even before we consider memory issues from pushing a 5Gb + data frame to all of the cores.

Functions

The functions are as follows. Note the following variable names:

  • coord.table is the data frame
  • kNoCores ( = 25 in this case) is the number of cores
  • there are a couple of different versions of the function QueryOSRM. This function simply includes an sapply to apply another function to each row of the subset of data frame passed by the parallelisation. The differences in versions relate simply to the differences in how the variable is passed.

1. Distributed memory with pbapply (the progress bar version of parSapply)

In this function the entire data frame is passed to the nodes. Within the node, a 50th (defined by num.splits) of the data frame is looped through. So each node does a 50th of the rows.

In [ ]:
library(pbapply)

# what fraction should each node operate on
num.splits <- 50
coord.table[,'query_group'] <- coord.table[1:.N,.(ceiling(1:.N/(.N/num.splits)))] 

tic()
# parallelize queries
cl <- makeCluster(kNoCores,outfile=outfile.progress)
# export variables and functions to the cluster
clusterExport(cl, c("osrm.url","int.results.file","coord.table","GetSingleTravelInfo","QueryOSRM"), envir=environment())
# export libraries to the cluster
clusterEvalQ(cl, c(library(httr),library(pbapply),library(data.table)))
# conduct the parallelisation
travel.queries <- pbsapply(seq(1,num.splits),function(j) QueryOSRM(j, osrm.url), cl = cl)
# close the cluster
parallel::clusterSetRNGStream(cl, iseed = 0L)
stopCluster(cl)
toc()

2. Shared memory with pbapply

This is the same as above except that we're using shared memory so that the data frame is known to all nodes, rather than being exported to them.

In [ ]:
library(pbapply)

# what fraction should each node operate on
num.splits <- 50
coord.table[,'query_group'] <- coord.table[1:.N,.(ceiling(1:.N/(.N/num.splits)))] 

tic()
# conduct the parallelisation
travel.queries <- pbsapply(seq(1,num.splits),function(j) QueryOSRM_MC(j, osrm.url, coord.table), cl = as.integer(kNoCores))
toc()

3. Shared memory with pbapply and by cutting the data frame

In [ ]:
# divide into groups
cut.factor <- 1 # potential of having more nodes than available cores, so it would loop
cuts <- cut(1:nrow(coord.table), kNoCores * cut.factor)

tic()
# conduct the parallelisation
travel.queries <- pbsapply(levels(cuts), function(l) QueryOSRM_cut(coord.table[cuts == l,], osrm.url), cl = as.integer(kNoCores))
toc()

4. Distributed (Do Par) by cutting the data frame

Here we cut the data frame so that each loop operates only on a subset of the data frame

In [ ]:
library(doParallel)
library(itertools)

tic()
# initialise the cluster
cl <- makePSOCKcluster(kNoCores)
registerDoParallel(cl)
# conduct the parallelisation
travel.queries <- foreach(m=isplitRows(coord.table, chunks=kNoCores), .combine='cbind',
            .packages=c('httr','data.table'), .export=c("QueryOSRM_dopar", "GetSingleTravelInfo")) %dopar% {
              QueryOSRM_dopar(m,osrm.url,int.results.file)
            }
# end cluster
stopCluster(cl)
toc()

5. Distributed (SNOW) by cutting the data frame, with progress bar

In [ ]:
library(doSNOW)
library(itertools)

# if size on cores exceeds available memory, increase the chunk factor
chunk.factor <- 1 
chunk.num <- kNoCores * cut.factor
tic()
# init the cluster
cl <- makePSOCKcluster(kNoCores)
registerDoSNOW(cl)
# init the progress bar
pb <- txtProgressBar(max = 100, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
# conduct the parallelisation
travel.queries <- foreach(m=isplitRows(coord.table, chunks=chunk.num),
                          .combine='cbind',
                          .packages=c('httr','data.table'),
                          .export=c("QueryOSRM_dopar", "GetSingleTravelInfo"), 
                          .options.snow = opts) %dopar% {
                            QueryOSRM_dopar(m,osrm.url,int.results.file)
                          }
# close progress bar
close(pb)
# stop cluster
stopCluster(cl) 
toc()

6. Distributed (SNOW) by cutting the data frame, without progress bar (Option 5 without progress bar)

In [ ]:
library(doSNOW)
library(itertools)

tic()
# init the cluster
cl <- makePSOCKcluster(kNoCores)
registerDoSNOW(cl)
# conduct the parallelisation
travel.queries <- foreach(m=isplitRows(coord.table, chunks=kNoCores), .combine='cbind',
                        .packages=c('httr','data.table'),
                        .export=c("QueryOSRM_dopar", "GetSingleTravelInfo")) %dopar% {
                          QueryOSRM_dopar(m,osrm.url,int.results.file)
                        }
# stop cluster
stopCluster(cl) 
toc()

7. Option 2 without progress bar

In [ ]:
library(pbapply)

# what fraction should each node operate on
num.splits <- 50
coord.table[,'query_group'] <- coord.table[1:.N,.(ceiling(1:.N/(.N/num.splits)))] 

tic()
# conduct the parallelisation
travel.queries = parallel::mclapply(seq(1,num.splits),function(j) QueryOSRM_MC(j, osrm.url, coord.table), mc.cores = as.integer(kNoCores))
toc()

Packages

In [ ]:
library(parallel)
library(data.table)
library(tictoc)
Created with Jupyter, by Tom Logan.