---
title: "Getting started"
author: "Andrea Dodet"
date: "`r Sys.Date()`"
vignette: >
  %\VignetteIndexEntry{googlePubsubR}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

# `googlePubsubR`

This vignette will provide a walk through for the most common functions that will cover the majority
of use cases.

## Authentication

In order to authenticate the client, the following environment variables will need to be set:

* `GCP_AUTH_FILE`: path to the .json file containing GCP credentials for a service account enabled to
interact with the Pub/Sub API. You can also directly pass the path as a function argument.
* `GCP_PROJECT`: your GCP project id.

Upon setting the environment variables, it is just a matter of calling:
```r
pubsub_auth()
```

More authentication methods can be found in the `?pubsub_auth` documentation.

## Creating and getting resources
### Topics

In their most basic form, topics can be created in the following way:

```r
# Create a topic that retains messages for a minimum of 4 hours.
#  We'll attach some cutom labels to make it easier to spot
topic <- topics_create(name = "vignette-topic", message_retention_duration = 14400)

# A topic object is returned
topic

# $labels
# $labels$type
# [1] "pkg_vignette"
# 
# 
# $name
# [1] "projects/<my-gcp-project>/topics/vignette-topic"
# 
# $kmsKeyName
# NULL
# 
# $satisfiesPzs
# NULL
# 
# $messageStoragePolicy
# NULL
# 
# $schemaSettings
# NULL
# 
# $messageRetentionDuration
# [1] "14400s"
# 
# attr(,"class")
# [1] "Topic" "list"
```

It is possible to interact with pre-existing topic objects. For instance, one could retrieve a `Topic`
object from an existing topic:

```r
if(topics_exists("vignette-topic")) {
    topic <- topics_get("vignette-topic")
}
```

In order to consume messages from a topic, a subscription will be needed:
```r
# You can either pass a Topic object or a topic name
sub <- subscriptions_create(
  name = "vignette-sub",
  topic = topic,
  # Messages will expire after 3 days of inactivity (no messages acked)
  expiration_policy = ExpirationPolicy(86400),
  # We'll retain unacked messages for 12 hours
  msg_retention_duration = 43200,
  # We'll retry message delivery with at least 1 second delay from the previous try
  retry_policy = RetryPolicy(min_backoff = 1) 
)

sub

# $deadLetterPolicy
# NULL
# 
# $messageRetentionDuration
# [1] "82400s"
# 
# $labels
# NULL
# 
# $retryPolicy
# $retryPolicy$minimumBackoff
# [1] "1s"
# 
# $retryPolicy$maximumBackoff
# [1] "600s"
# 
# 
# $pushConfig
# named list()
# 
# $ackDeadlineSeconds
# [1] 10
# 
# $expirationPolicy
# $expirationPolicy$ttl
# [1] "86400s"
# 
# 
# $filter
# NULL
# 
# $detached
# NULL
# 
# $retainAckedMessages
# NULL
# 
# $topic
# [1] "projects/<my-gcp-project>/topics/vignette-topic"
# 
# $name
# [1] "projects/<my-gcp-project>/subscriptions/vignette-sub"
# 
# $enableMessageOrdering
# NULL
# 
# $topicMessageRetentionDuration
# [1] "14400s"
# 
# attr(,"class")
# [1] "Subscription" "list"
```

We can also inspect all subscriptions attached to a given topic.
```r
topics_list_subscriptions(topic = "vignette-topic")
```

### Schemas

Schemas can be used to enforce a specific format on incoming messages, this will force all malformed messages
to be discarded or sent to a dead letter queue. A schema object can be passed to a topic upon creation. Getting
the schema definition in the right format (Pub/Sub expects it as a string) can be quite fiddly. In this example
we'll define an AVRO schema.
```r
avro_schema <- schemas_create(
  name = "vignette-schema", 
  type = "AVRO",
  # toJSON as the API expects a string containing the definition of the AVRO schema
  definition = toJSON(list(
    name = "cutlery",
    type = "record",
    fields = list(
      list("name" = "name", "type" = "string"),
      list("name" = "price", "type" = "int")
    )
  ), auto_unbox = T)
)

# Test a message against the schema
msg <- list(
    name = "John",
    price = 123
) %>% 
  toJSON(auto_unbox = T) %>%
  charToRaw() %>% 
  base64enc::base64encode() %>% 
  PubsubMessage()

schemas_validate_message(schema = "vignette-schema", message = msg, encoding = "JSON")
# [1] TRUE

# Create a new topic and attach the schema
topic <- topics_create(
  name = "vignette-topic", 
  message_retention_duration = 14400, 
  schema_settings = SchemaSettings(encoding = "JSON", "vignette-schema")
)
```

### Snapshots and `seek`

There are two ways to `seek` a subscription back in time:

* Provide a snapshot of the subscription
* Via a timestamp in RFC3339 UTC "Zulu" format
```r
# Create a snapshot of a subscription
snapshot <- snapshots_create(name = "vignette-snap", subscription = "vignette-sub")

# 'Rewind' the subscription to the snapshot we've just created
subscriptions_seek(subscription = "vignette-sub", snapshot = snapshot)
# [1] TRUE

# Seek the subscription to a specific timestamp
subscriptions_seek("vignette-sub", time = "2021-11-08T23:55:00Z")
```
## Messages

Pubsub messages are expected encoded as base64 strings, depending on the object you're dealing with,
the process to convert them in the right format might vary. Below we'll convert a dataframe in a format that
will not upset Pubsub that much:
```r
msg <- data.frame(name = "fork", price = 999) %>%
  as.list() %>%
  toJSON(auto_unbox = TRUE) %>%
  charToRaw() %>%
  base64enc::base64encode() %>%
  PubsubMessage()
  
topics_publish(messages = msg, topic = 'vignette-topic'))
```

Now that we have successfully published a message (conforming to the schema specified above) we can pull messages
from the subscription. Note that pulling will not take messages out of the queue (you can do it as many times you want).
Messages will need to be acknowledged with `subscriptions_ack` after they have been successfully consumed in order to be
successfully taken out of the queue.

Pulling messages will return a list containing a `receivedMessages` dataframe containing ackIds and a dataframe storing 
and message data and information.

```r
msgs <- subscriptions_pull("vignette-sub")

tibble::glimpse(msgs)
List of 1
 $ receivedMessages:'data.frame':	1 obs. of  2 variables:
  ..$ ackId  : chr "PkVTRFAGFixdRkhRNxkIaFEOT14jPzUgKEUSC1MTUVx1A1MQaVwzdQdRDRlzejV1aQwRVAsUUHRfURsfWVxEjczJsS9QXWJxa1oQAwJHUH1YUxw"| __truncated__
  ..$ message:'data.frame':	1 obs. of  4 variables:
  .. ..$ data       : chr "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9"
  .. ..$ attributes :'data.frame':	1 obs. of  2 variables:
  .. ..$ messageId  : chr "3410320233787713"
  .. ..$ publishTime: chr "2021-11-09T09:06:31.171Z"

tibble::glimpse(msgs$receivedMessages$message)
Rows: 1
Columns: 4
$ data        <chr> "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9"
$ attributes  <df[,2]> <data.frame[1 x 2]>
$ messageId   <chr> "3410320233787713"
$ publishTime <chr> "2021-11-09T09:06:31.171Z
```

### Decoding incoming messages

In order to re-convert back message data into an usable format we'll basically need to reverse the process that was used
to encode them in the first place. Given that `subscriptions_pull` will return **all** messages in the queue, a good strategy
might be to decode them all at once using `lapply` 
```r
decoded_msg <- lapply(msgs$receivedMessages$message$data, function(msg) {
    msg %>%
        base64decode() %>%
        rawToChar() %>%
        fromJSON(flatten = TRUE, simplifyDataFrame = TRUE) %>%
        as.data.frame()
}) %>% do.call(rbind, .)

decoded_msg
#   name price
# 1 fork   999

# ... Do something with the dataframe

# Acknowledge we have succesfully used it
subscriptions_ack(ack_ids = msgs$receivedMessages$ackId, subscription = "vignette-sub")
# [1] TRUE
```

This approach is quite cumbersome and helpers to facilitate this process might be included in the library later on when a general
enough approach will be found.
