## ----setup, include=FALSE-----------------------------------------------------
knitr::opts_chunk$set(collapse = TRUE, comment = "#>")

if (!requireNamespace("bigmemory", quietly = TRUE)) {
  cat("This vignette requires the 'bigmemory' package.\n")
  knitr::knit_exit()
}

library(bigKNN)
library(bigmemory)

## ----helpers, include=FALSE---------------------------------------------------
job_summary <- function(job) {
  data.frame(
    status = job$status,
    metric = job$metric,
    queries = job$n_query,
    edges = job$n_edge,
    checkpoint = basename(job$checkpoint_path),
    row.names = NULL
  )
}

checkpoint_summary <- function(spec, fields) {
  data.frame(
    field = fields,
    value = vapply(fields, function(field) {
      value <- spec[[field]]
      if (is.null(value)) {
        "NULL"
      } else {
        paste(value, collapse = ", ")
      }
    }, character(1)),
    row.names = NULL
  )
}

read_graph_store <- function(xp_from, xp_to, xp_value = NULL) {
  out <- data.frame(
    from = as.integer(as.vector(bigmemory::as.matrix(xp_from))),
    to = as.integer(as.vector(bigmemory::as.matrix(xp_to))),
    row.names = NULL
  )
  if (!is.null(xp_value)) {
    out$value <- as.numeric(as.vector(bigmemory::as.matrix(xp_value)))
  }
  out
}

read_radius_store <- function(xp_index, xp_distance, xp_offset, query_ids) {
  index <- as.integer(as.vector(bigmemory::as.matrix(xp_index)))
  distance <- as.numeric(as.vector(bigmemory::as.matrix(xp_distance)))
  offset <- as.integer(as.vector(bigmemory::as.matrix(xp_offset)))
  counts <- diff(offset)

  data.frame(
    query = rep(query_ids, times = counts),
    neighbor = index,
    distance = signif(distance, 5),
    row.names = NULL
  )
}

## ----create-workspace---------------------------------------------------------
scratch_dir <- file.path(tempdir(), "bigknn-resumable-jobs")
dir.create(scratch_dir, recursive = TRUE, showWarnings = FALSE)

## ----graph-reference----------------------------------------------------------
graph_points <- data.frame(
  id = paste0("g", 1:4),
  x1 = c(0, 1, 5, 6),
  x2 = c(0, 0, 0, 0)
)

graph_ref <- filebacked.big.matrix(
  nrow = nrow(graph_points),
  ncol = 2,
  type = "double",
  backingfile = "graph-ref.bin",
  descriptorfile = "graph-ref.desc",
  backingpath = scratch_dir
)

graph_ref[,] <- as.matrix(graph_points[c("x1", "x2")])
graph_points

## ----graph-job----------------------------------------------------------------
k <- 1L
n_edge <- nrow(graph_ref) * k

graph_from <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "integer",
  backingfile = "graph-from.bin",
  descriptorfile = "graph-from.desc",
  backingpath = scratch_dir
)

graph_to <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "integer",
  backingfile = "graph-to.bin",
  descriptorfile = "graph-to.desc",
  backingpath = scratch_dir
)

graph_value <- filebacked.big.matrix(
  nrow = n_edge,
  ncol = 1,
  type = "double",
  backingfile = "graph-value.bin",
  descriptorfile = "graph-value.desc",
  backingpath = scratch_dir
)

graph_checkpoint <- file.path(scratch_dir, "graph-job.rds")

graph_job <- knn_graph_stream_bigmatrix(
  graph_ref,
  k = k,
  xpFrom = graph_from,
  xpTo = graph_to,
  xpValue = graph_value,
  checkpoint_path = graph_checkpoint
)

job_summary(graph_job)
read_graph_store(graph_from, graph_to, graph_value)

## ----radius-reference---------------------------------------------------------
radius_points <- data.frame(
  id = paste0("r", 1:4),
  x1 = c(1, 0, 1, 2),
  x2 = c(0, 1, 1, 1)
)

radius_ref <- filebacked.big.matrix(
  nrow = nrow(radius_points),
  ncol = 2,
  type = "double",
  backingfile = "radius-ref.bin",
  descriptorfile = "radius-ref.desc",
  backingpath = scratch_dir
)

radius_ref[,] <- as.matrix(radius_points[c("x1", "x2")])
radius_points

## ----radius-job---------------------------------------------------------------
radius_counts <- count_within_radius_bigmatrix(radius_ref, radius = 1.1)
total_matches <- sum(radius_counts)

radius_index <- filebacked.big.matrix(
  nrow = total_matches,
  ncol = 1,
  type = "integer",
  backingfile = "radius-index.bin",
  descriptorfile = "radius-index.desc",
  backingpath = scratch_dir
)

radius_distance <- filebacked.big.matrix(
  nrow = total_matches,
  ncol = 1,
  type = "double",
  backingfile = "radius-distance.bin",
  descriptorfile = "radius-distance.desc",
  backingpath = scratch_dir
)

radius_offset <- filebacked.big.matrix(
  nrow = length(radius_counts) + 1L,
  ncol = 1,
  type = "double",
  backingfile = "radius-offset.bin",
  descriptorfile = "radius-offset.desc",
  backingpath = scratch_dir
)

radius_checkpoint <- file.path(scratch_dir, "radius-job.rds")

radius_job <- radius_stream_job_bigmatrix(
  radius_ref,
  xpIndex = radius_index,
  xpDistance = radius_distance,
  xpOffset = radius_offset,
  radius = 1.1,
  checkpoint_path = radius_checkpoint
)

radius_counts
job_summary(radius_job)
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)

## ----resume-graph-------------------------------------------------------------
graph_expected <- as.data.frame(
  knn_graph_bigmatrix(graph_ref, k = 1, format = "edge_list", symmetrize = "none")
)
attr(graph_expected, "bigknn_graph") <- NULL

graph_spec <- readRDS(graph_checkpoint)
graph_spec$status <- "running"
graph_spec$next_row <- 3L
graph_spec$next_edge <- 3L
saveRDS(graph_spec, graph_checkpoint)

graph_from[, 1] <- 0L
graph_to[, 1] <- 0L
graph_value[, 1] <- 0
graph_from[1:2, 1] <- as.integer(graph_expected$from[1:2])
graph_to[1:2, 1] <- as.integer(graph_expected$to[1:2])
graph_value[1:2, 1] <- graph_expected$distance[1:2]

resumed_graph_job <- resume_knn_job(graph_checkpoint)

job_summary(resumed_graph_job)
read_graph_store(graph_from, graph_to, graph_value)

## ----resume-radius------------------------------------------------------------
radius_expected <- radius_bigmatrix(radius_ref, radius = 1.1)

radius_spec <- readRDS(radius_checkpoint)
radius_spec$status <- "running"
radius_spec$phase <- "collect"
radius_spec$next_row <- 3L
saveRDS(radius_spec, radius_checkpoint)

radius_index[, 1] <- 0L
radius_distance[, 1] <- 0
radius_offset[, 1] <- 0

prefix_end <- radius_expected$offset[3L] - 1L
radius_index[seq_len(prefix_end), 1] <- as.integer(radius_expected$index[seq_len(prefix_end)])
radius_distance[seq_len(prefix_end), 1] <- radius_expected$distance[seq_len(prefix_end)]

resumed_radius_job <- resume_knn_job(radius_checkpoint)

job_summary(resumed_radius_job)
as.integer(as.vector(bigmemory::as.matrix(radius_offset)))
read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id)

## ----checkpoint-fields--------------------------------------------------------
graph_spec <- readRDS(graph_checkpoint)
radius_spec <- readRDS(radius_checkpoint)

checkpoint_summary(graph_spec, c("type", "status", "k", "next_row", "next_edge"))
checkpoint_summary(radius_spec, c("type", "status", "phase", "next_row", "total_matches"))

