---
title: "Prototyping Distributed Computations"
author: "Balasubramanian Narasimhan"
date: '`r Sys.Date()`'
output:
  html_document:
  fig_caption: yes
  theme: cerulean
  toc: yes
  toc_depth: 2
vignette: >
  %\VignetteIndexEntry{Prototyping Distributed Computations}
  %\VignetteEngine{knitr::rmarkdown}
  \usepackage[utf8]{inputenc}
---

```{r echo=FALSE}
knitr::opts_chunk$set(
    message = FALSE,
    warning = FALSE,
    error = FALSE,
    tidy = FALSE,
    cache = FALSE
)
suppressMessages(suppressWarnings(library(distcomp)))
dcWorkspace <- tempdir()
ignore <- distcompSetup(workspace = dcWorkspace,
                        ssl_verifyhost = 0L, ssl_verifypeer = 0L)
```

## Introduction

The `distcomp` package allows one to perform computations on data that
is row-partitioned data among several sites. Implemented methods
include stratified Cox regression and singular value decomposition
(SVD).  The distributed computations rely on a REST API for R, such as
that provided by [Opencpu](https://www.opencpu.org) for example.

In order to prototype new methods for `distcomp`, it is convenient to
write code that does all computations locally, at least during the
development phase. This allows for easy debugging and code
changes. The step to using a REST call is merely one step off.  For
example, in the context of distributed multiple imputation, the
distributed SVD is a building block. So one has to be able to be able
to run the svd operation not only in a modular fashion, but also
quickly to test code.  

Previous versions of `distcomp` assumed all computations were done
using `opencpu` remote calls. This meant that the results were
time-consuming to check; for reasonably sized problems one would have
to wait for hours.

Version 1.1 of `distcomp` addresses this by allowing local
computations, in essence _faking_ the `opencpu` calls. All
calculations are done locally at normal speeds (rather than a https
`POST` request) while ensuring that the sites are only privy to their
own data. 

## An Example

```{r}
available <- availableComputations()
( computationNames <- names(available) )
```

We will perform an SVD on a 1500 by 20 matrix where the matrix is
distributed across three sites each having a 500 by 20 matrix.  We will
extract the first 10 components using this distributed algorithm.

We first define the computation.

```{r}
svdDef <- data.frame(compType = "RankKSVD",
                     rank = 20L,
                     ncol = 20L,
                     id = "SVD",
                     stringsAsFactors = FALSE)
```

We generate some synthetic data.

```{r}
set.seed(12345)
## Three sites
nSites <- 3
siteData <- lapply(seq.int(nSites), function(i) matrix(rnorm(10000), ncol=20))
```

We now create local objects to represent the sites, a list of three
items, each with a name and its specific data.

```{r}
sites <- lapply(seq.int(nSites),
                function(x) list(name = paste0("site", x),
                                 worker = makeWorker(defn = svdDef, data = siteData[[x]])
                                 ))
```

We are now ready to create a master object and add sites to it.

```{r}
master <- makeMaster(svdDef)
for (site in sites) {
  master$addSite(name = site$name, worker = site$worker)
}
```

When any of the sites specify a `worker` argument in the call to
`addSite`, it is an indication that the computation is to be performed
locally. Otherwise, a URL will have to be specified using the `url`
argument to `addSite`. (Only one of `worker` or `url` can be
specified.) 

Finally, we are ready to run the decomposition, specifying a maximum
number of iterations, so that things don't go haywire.  This takes
less than 5 seconds on my laptop. 

```{r}
system.time(result <- master$run(max.iter = 10000))
```

The result is a named list of two things, the $d$ diagonal values and the
$v$ matrix.

We can compare the results obtained here with the actual.

```{r}
full_data <- do.call(rbind, siteData)
full_svd <- svd(full_data)
```

Let's print it side-by-side.

```{r}
d_table <- data.frame(truth = full_svd$d, distcomp = result$d)
knitr::kable(d_table)
```

We can also compare the $v$ matrix, taking into account the signs may
be different.

```{r}
norm(abs(result$v) - abs(full_svd$v), "F")
```

## Stratified Cox Model

The approach for the stratified Cox model is similar. We start with a definition.

```{r}
coxDef <- data.frame(compType = "StratifiedCoxModel",
                     formula = "Surv(time, censor) ~ age + becktota + ndrugfp1 + ndrugfp2 + ivhx3 + race + treat",
                     projectName = "STCoxTest",
                     projectDesc = "STCox Project Desc",
                     stringsAsFactors = FALSE)
```

We now create two sites with appropriate data

```{r}
## Two sites
siteDataFiles <- file.path(system.file("ex", package="distcomp"), c("uis-site1.csv", "uis-site2.csv"))
siteData <- lapply(siteDataFiles, read.csv)
```

We now create local objects to represent the two sites, each with a
name and its specific data.

```{r}
sites <- lapply(seq_along(siteData),
                function(x) list(name = paste0("site", x),
                                 worker = makeWorker(defn = coxDef, data = siteData[[x]])
                                 ))
```

We are now ready to create a master object and add sites to it.

```{r}
master <- makeMaster(coxDef)
for (site in sites) {
  master$addSite(name = site$name, worker = site$worker)
}
```

Now we can run the model.

```{r, }
result <- master$run()
print(master$summary(), digits = 4)
```

The above result will be the same as what we obtain from a Cox
regression fit. 

```{r}
coxFit <- survival::coxph(formula=Surv(time, censor) ~ age + becktota + ndrugfp1 + ndrugfp2 +
                              ivhx3 + race + treat + strata(site),
                          data = rbind(siteData[[1]], siteData[[2]]))
summary(coxFit)$coefficients
```

## Session Info

```{r}
sessionInfo()
```
