## ----include = FALSE----------------------------------------------------------
knitr::opts_chunk$set(collapse = TRUE, comment = "#>")
set.seed(1014)

## ----setup--------------------------------------------------------------------
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(nycflights13)

## -----------------------------------------------------------------------------
cluster <- new_cluster(2)
cluster

## -----------------------------------------------------------------------------
flights1 <- flights %>% group_by(dest) %>% partition(cluster)
flights1

## -----------------------------------------------------------------------------
path <- tempfile()
dir.create(path)

flights %>% 
  group_by(month) %>% 
  group_walk(~ vroom::vroom_write(.x, sprintf("%s/month-%02i.csv", path, .y$month)))

## -----------------------------------------------------------------------------
files <- dir(path, full.names = TRUE)
cluster_assign_partition(cluster, files = files)

## -----------------------------------------------------------------------------
cluster_send(cluster, flights2 <- vroom::vroom(files))

flights2 <- party_df(cluster, "flights2")
flights2

## -----------------------------------------------------------------------------
flights1 %>% 
  summarise(dep_delay = mean(dep_delay, na.rm = TRUE)) %>% 
  collect()

## -----------------------------------------------------------------------------
by_dest <- flights %>% group_by(dest)

# Local computation
system.time(by_dest %>% summarise(mean(dep_delay, na.rm = TRUE)))

# Remote: partitioning
system.time(flights2 <- flights %>% partition(cluster))
# Remote: computation
system.time(flights3 <- flights2 %>% summarise(mean(dep_delay, na.rm = TRUE)))
# Remote: retrieve results
system.time(flights3 %>% collect())

## -----------------------------------------------------------------------------
daily_flights <- flights %>%
  count(dest) %>%
  filter(n >= 365)

common_dest <- flights %>% 
  semi_join(daily_flights, by = "dest") %>% 
  mutate(yday = lubridate::yday(ISOdate(year, month, day))) %>% 
  group_by(dest)

nrow(common_dest)

## -----------------------------------------------------------------------------
by_dest <- common_dest %>% partition(cluster)
by_dest

## ----message = FALSE----------------------------------------------------------
cluster_library(cluster, "mgcv")
system.time({
  models <- by_dest %>% 
    do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})

## -----------------------------------------------------------------------------
system.time({
  models <- common_dest %>% 
    group_by(dest) %>% 
    do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})

