forked from rstudio/bigdataclass
-
Notifications
You must be signed in to change notification settings - Fork 0
/
07-distributed-r.Rmd
104 lines (83 loc) · 2.08 KB
/
07-distributed-r.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
---
title: "Distributed R"
output: html_notebook
---
## Class catch up
```{r}
library(dplyr)
library(purrr)
library(readr)
library(sparklyr)
library(lubridate)
top_rows <- read.csv("/usr/share/flights/data/flight_2008_1.csv", nrows = 5)
file_columns <- top_rows %>%
rename_all(tolower) %>%
map(function(x) "character")
conf <- spark_config()
conf$`sparklyr.cores.local` <- 4
conf$`sparklyr.shell.driver-memory` <- "8G"
conf$spark.memory.fraction <- 0.9
sc <- spark_connect(master = "local", config = conf,version = "2.0.0")
spark_flights <- spark_read_csv(
sc,
name = "flights",
path = "/usr/share/flights/data/",
memory = FALSE,
columns = file_columns,
infer_schema = FALSE
)
```
## 7.1 - Basic distribution
1. Cache a sample of *fligths*
```{r}
flights_sample <- spark_flights %>%
sample_frac(0.01) %>%
mutate(arrdelay = as.numeric(arrdelay)) %>%
ft_binarizer(
input.col = "arrdelay",
output.col = "delayed",
threshold = 15
) %>%
compute("flights_sample")
```
2. Navigate to the Storage page in the Spark UI
3. Pass `nrow` to `spark_apply()` to get the row count by partition
```{r}
spark_apply(flights_sample, nrow)
```
4. Pass a function to operate the average distance in each partition
```{r}
spark_apply(
flights_sample,
function(x) mean(as.numeric(x$distance))
)
```
## 7.2 - Use group_by
1. Use the `group_by` argument to partition by the *month* field
```{r}
spark_apply(flights_sample, nrow, group_by = "month", columns = "count")
```
2. Pass the same function from the previous exercise to calculate the average distance by month
```{r}
spark_apply(
flights_sample,
function(x) mean(as.numeric(x$distance)),
group_by = "month",
columns = "avg_distance"
)
```
## 7.3 - Distributing packages
1. Use `broom::tidy()` to run one `glm()` model per month
```{r}
models <- spark_apply(
flights_sample,
function(e) broom::tidy(glm(delayed ~ arrdelay, data = e, family = "binomial")),
names = c("term", "estimate", "std_error", "statistic", "p_value"),
group_by = "month"
)
models
```
2. Close Spark connection
```{r}
spark_disconnect(sc)
```