multidplyr

by tidyverse

tidyverse / multidplyr

A dplyr backend that partitions a data frame over multiple processes

471 Stars 63 Forks Last release: Not found 121 Commits 1 Releases

Available items

No Items, yet!

The developer of this repository has not created any items for sale yet. Need a bug fixed? Help with integration? A different license? Create a request here:

multidplyr

Lifecycle:
experimental Travis build
status Codecov test
coverage CRAN
status <!-- badges: end -->

Overview

multidplyr is a backend for dplyr that partitions a data frame across multiple cores. You tell multidplyr how to split the data up with

partition()
and then the data stays on each node until you explicitly retrieve it with
collect()
. This minimises the amount of time spent moving data around, and maximises parallel performance. This idea is inspired by partools by Norm Matloff and distributedR by the Vertica Analytics team.

Due to the overhead associated with communicating between the nodes, you won’t see much performance improvement on basic dplyr verbs with less than ~10 million observations, and you may want to try dtplyr, which uses data.table instead. multidplyr’s strength is conveniently parallelising the type of more complex operation often found in

do()
.

(Note that unlike other packages in the tidyverse, multidplyr requires R 3.5 or greater. We hope to relax this requirement in the future.)

Installation

To install from GitHub:

# install.packages("devtools")
devtools::install_github("tidyverse/multidplyr")

Usage

To use multidplyr, you first create a cluster of the desired number of workers. Each one of these workers is a separate R process, and the operating system will spread their execution across multiple cores:

library(multidplyr)
library(dplyr, warn.conflicts = FALSE)

cluster

There are two primary ways to use multidplyr. The first, and most efficient, way is to read different files on each worker:

# Create a filename vector containing different values on each worker
cluster_assign_each(cluster, filename = c("a.csv", "b.csv", "c.csv", "d.csv"))

Use vroom to quickly load the csvs

cluster_send(cluster, my_data

Alternatively, if you already have the data loaded in the main session, you can use

partition()
to automatically spread it across workers. Use
group_by()
to ensure that all of the observations belonging to a group end up on the same worker.
library(nycflights13)

flight_dest % group_by(dest) %>% partition(cluster) flight_dest #> Source: party_df [336,776 x 19] #> Groups: dest #> Shards: 4 [81,594--86,548 rows] #> #> year month day dep_time sched_dep_time dep_delay arr_time #> #> 1 2013 1 1 544 545 -1 1004 #> 2 2013 1 1 558 600 -2 923 #> 3 2013 1 1 559 600 -1 854 #> 4 2013 1 1 602 610 -8 812 #> 5 2013 1 1 602 605 -3 821 #> 6 2013 1 1 611 600 11 945 #> # … with 3.368e+05 more rows, and 12 more variables: sched_arr_time , #> # arr_delay , carrier , flight , tailnum , #> # origin , dest , air_time , distance , hour , #> # minute , time_hour

Now you can work with it like a regular data frame, but the computations will be spread across multiple cores. Once you’ve finished computation, use

collect()
to bring the data back to the host session:
flight_dest %>% 
  summarise(delay = mean(dep_delay, na.rm = TRUE), n = n()) %>% 
  collect()
#> # A tibble: 105 x 3
#>    dest  delay     n
#>      
#>  1 ABQ    13.7   254
#>  2 AUS    13.0  2439
#>  3 BQN    12.4   896
#>  4 BTV    13.6  2589
#>  5 BUF    13.4  4681
#>  6 CLE    13.4  4573
#>  7 CMH    12.2  3524
#>  8 DEN    15.2  7266
#>  9 DSM    26.2   569
#> 10 DTW    11.8  9384
#> # … with 95 more rows

Note that there is some overhead associated with copying data from the worker nodes back to the host node (and vice versa), so you’re best off using multidplyr with more complex operations. See

vignette("multidplyr")
for more details.

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.