---
title: "Working with TimeSeriesRDD"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{working-with-time-series-rdd}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
```
Currently `sparklyr.flint` supports a number of commonly used summarizers (e.g., count, sum, average, etc) that are implemented in the Flint time series library. Each summarizer can be either applied to a moving time window (e.g., `in_past(5s)`) or groups of rows within a `TimeSeriesRDD` having the same timestamps (which is known as a “cycle” in Flint nomenclature).

## Summarizing moving time windows

The following is a quick example of applying the sum summarizer to a moving time window:

```r
library(sparklyr)
library(sparklyr.flint)

# Step 0: decide which Spark version to use, how to connect to Spark, etc
spark_version <- "3.0.0"
sc <- spark_connect(master = "local", version = spark_version)

example_time_series <- data.frame(
  t = c(1, 3, 4, 6, 7, 10, 15, 16, 18, 19),
  v = c(4, -2, NA, 5, NA, 1, -4, 5, NA, 3)
)

# Step 1: import example time series data into a Spark dataframe
sdf <- copy_to(sc, example_time_series, overwrite = TRUE)

# Step 2: specify how the Spark dataframe should be interpreted as a time series by Flint
ts_rdd <- fromSDF(sdf, is_sorted = TRUE, time_unit = "SECONDS", time_column = "t")

# Step 3: apply a Flint summarizer to the time series above
sum <- summarize_sum(ts_rdd, column = "v", window = in_past("3s"))

# Step 4: collect summarized result from Spark to R
res <- ts_sum %>% collect()

print(res)
```

```
## # A tibble: 10 x 3
##    time                    v v_sum
##    <dttm>              <dbl> <dbl>
##  1 1970-01-01 00:00:01     4     4
##  2 1970-01-01 00:00:03    -2     2
##  3 1970-01-01 00:00:04   NaN     2
##  4 1970-01-01 00:00:06     5     3
##  5 1970-01-01 00:00:07   NaN     5
##  6 1970-01-01 00:00:10     1     1
##  7 1970-01-01 00:00:15    -4    -4
##  8 1970-01-01 00:00:16     5     1
##  9 1970-01-01 00:00:18   NaN     1
## 10 1970-01-01 00:00:19     3     8
```

From the result above, one can see as a result of specifying `window = in_past("3s")`, for each time point `t` from `example_time_series` (i.e., `t = 1`, `t = 3`, `t = 4`, `t = 6`, and so on), Flint has created a row containing `t` and the summation of all `v` value(s) occurring within the time window of `[t - 3, t]`, and the sums are stored in a new column named `v_sum`.

## Summarizing cycles

Given a timestamp `t`, the subset of rows in a `TimeSeriesRDD` having that timestamp is known as a "cycle" in Flint.

If the `window = "<time window specification>"` argument is omitted, then the summarizer function will look at all cycles in the `TimeSeriesRDD`. In other words, it will group all rows by their timestamps and perform aggregation within each group. 

For example:

```
ts_sum <- summarize_sum(ts_rdd, column = "v")
```

will return a `TimeSeriesRDD` with a timestamp column named `time` and a summation column named `v_sum`. For each timestamp `t` present in `ts_rdd`, `ts_sum` will contain a row with timestamp `t` and `v_sum` value equal to summation of all `v` values occurring at `t`.

Because all rows from `ts_rdd` are already ordered internally by timestamps, aggregations on cycles can be performed efficiently in Flint without re-shuffling rows in the input `TimeSeriesRDD`.
