Introduction
This vignette showcases how to store time-series data and build a query interface similar to xts package.
A new class TimeSeries1d has been created to work with
tabular daily time-series data; its data model and R6 class definition
can be found in the Appendix.
Prerequisites:
-
Rpackages:data.table,xtsandarrow - Source
TimeSeries1dclass from the appendix
Application
Create Time-Series Array
# URI path to store array
uri <- tempfile()
# Generate TimeSeries1d
tsobj <- TimeSeries1d$new(uri)
# Create schema with two symbols
tsobj$create(attr_names = c("A", "B"))
# Schema
tsobj$schema_info()
# names types status enum
# 1 index DATETIME_DAY Dim FALSE
# 2 A FLOAT64 Attr FALSE
# 3 B FLOAT64 Attr FALSEGenerate data
# times-series data
dat <- data.frame(index = seq.Date("2010-01-01", length.out = 10),
A = as.numeric(1:10),
B = as.numeric(11:20),
C = as.numeric(21:30))
# Add some NAs, leading and random
dat[1:2, "A"] <- NA
dat[5, "B"] <- NA
# Initial dataset
dat1 <- dat[, c("index", "A", "B")]
# New series to ingest
dat2 <- dat[, c("index", "C")]Write and Ingest Series
# Write data
tsobj$write(dat1)
# Query all
tsobj$tiledb_array(return_as = "data.table")[]
# index A B
# <Date> <num> <num>
# 1: 2010-01-01 NA 11
# 2: 2010-01-02 NA 12
# 3: 2010-01-03 3 13
# 4: 2010-01-04 4 14
# 5: 2010-01-05 5 NA
# 6: 2010-01-06 6 16
# 7: 2010-01-07 7 17
# 8: 2010-01-08 8 18
# 9: 2010-01-09 9 19
# 10: 2010-01-10 10 20
# Subset like xts
tsobj$subset(c("2010-01-01", "2010-01-06"), cols = "A", as_xts = TRUE)
# A
# 2010-01-01 NA
# 2010-01-06 6
t0 <- Sys.time()
# Ingest new series
tsobj$ingest(dat2)
# Check symbols
tsobj$symbols()
# [1] "A" "B" "C"
# Subset like xts
tsobj$subset(c("2010-01-06/"), cols = c("A", "C"))
# index A C
# <Date> <num> <num>
# 1: 2010-01-06 6 26
# 2: 2010-01-07 7 27
# 3: 2010-01-08 8 28
# 4: 2010-01-09 9 29
# 5: 2010-01-10 10 30Note that schema has been evolved due to addition of new series (new attribute):
vfs_dir_tree(tsobj$uri)
# C:/Users/Constantine/AppData/Local/Temp/Rtmp6JkFsL/file33e4e961897
# ├── __commits
# │ ├── __1769941398404_1769941398404_4136f8ae903054d78272ab6b378ea126_22.wrt
# │ └── __1769941399444_1769941399444_662c00de4e03839addba20ab4934789b_22.wrt
# ├── __fragments
# │ ├── __1769941398404_1769941398404_4136f8ae903054d78272ab6b378ea126_22
# │ │ ├── a0.tdb
# │ │ ├── a0_validity.tdb
# │ │ ├── a1.tdb
# │ │ ├── a1_validity.tdb
# │ │ ├── d0.tdb
# │ │ └── __fragment_metadata.tdb
# │ └── __1769941399444_1769941399444_662c00de4e03839addba20ab4934789b_22
# │ ├── a0.tdb
# │ ├── a0_validity.tdb
# │ ├── a1.tdb
# │ ├── a1_validity.tdb
# │ ├── a2.tdb
# │ ├── a2_validity.tdb
# │ ├── d0.tdb
# │ └── __fragment_metadata.tdb
# ├── __fragment_meta
# ├── __labels
# ├── __meta
# └── __schema
# ├── __1769941398329_1769941398329_2f2845374bfd7b24cba73a8fa319dce4
# ├── __1769941399389_1769941399389_3bdf9ca23026edf29709cafca6e50284
# └── __enumerations
#
# ❯ directories (6) • total size (9.95 KiB)Built-in functions
# Index domain
tsobj$domain_index()
# [1] "1970-01-01" "2200-01-01"
# Index range by series
tsobj$index_range()
# symbol st_date en_date
# <char> <Date> <Date>
# 1: A 2010-01-03 2010-01-10
# 2: B 2010-01-01 2010-01-10
# 3: C 2010-01-01 2010-01-10
# Rename A-> NEW_A
tsobj$rename_symbol("A", "NEW_A")
tsobj$symbols()
# [1] "B" "C" "NEW_A"Appendix
TimeSeries1d Class
TimeSeries1d <- R6::R6Class(
classname = "TimeSeries1d",
inherit = TileDBArray,
cloneable = FALSE,
active = list(
#' @field index Retrieve Array's unique index.
#'
index = function(value) {
if (!missing(value)) {
private$check_read_only("index")
}
if (is.null(private$.index)) {
arr <- self$tiledb_array(attrs = NA_character_, return_as = "arrow")[]
index_unique <- arrow::call_function("unique", arr[["index"]])
xi <- index_unique$as_vector()
if (!xts::isOrdered(xi, strictly = TRUE)) {
idx <- order(xi, method = "radix")
xi <- xi[idx]
}
attr(xi, "tclass") <- self$tclass
attr(xi, "tzone") <- self$tzone
private$.index <- xi
}
private$.index
},
#' @field tclass Retrieve index class, one of:
#' [Date()] or [POSIXct()].
#'
tclass = function(value) {
if (!missing(value)) {
private$check_read_only("tclass")
}
# Retrieve time class from metadata. If null, then infer from data type.
if (is.null(private$.tclass)) {
dt <- self$get_metadata("tclass")
if (is.null(dt)) {
dt <- tiledb::datatype(self$dimensions()[["index"]])
if (dt == "DATETIME_DAY") { # Either "DATETIME_DAY" or "DATETIME_SEC/MS"
private$.tclass <- "Date"
} else {
private$.tclass <- c("POSIXct","POSIXt")
}
} else {
private$.tclass <- switch(dt,
Date = "Date",
POSIXct = c("POSIXct","POSIXt")
)
}
}
private$.tclass
},
#' @field tzone Retrieve Array timezone. This represents
#' the timezone where the time series have been created.
#'
#' If `NULL`, it will default to `UTC`.
#'
tzone = function(value){
if (!missing(value)) {
private$check_read_only("tzone")
}
if (is.null(private$.tzone)) {
xval <- self$get_metadata("tzone")
if (is.null(xval)) {
private$.tzone <- "UTC"
} else {
private$.tzone <- xval
}
}
private$.tzone
}
),
public = list(
#' @description Create a schema only for time series `TileDB` array.
#'
#' @param schema_name Select a time series schema.
#' @param attr_names A character vector with attributes names to be created
#' by generic schema.
#' @param meta A named list with key-value pairs of metadata.
#'
#' @return The object, invisibly.
#'
create = function(schema_name = c("generic_date",
"generic_datetime"),
attr_names = "close",
meta = NULL) {
schema_name <- match.arg(schema_name)
mode <- "WRITE"
private$log_debug0("create", "Define time-series schema '{}'", schema_name)
sch <- switch(
schema_name,
generic_date = {
.gen_ts1dim_schema(attr = attr_names, type_index = 0)
},
generic_datetime = {
.gen_ts1dim_schema(attr = attr_names, type_index = 1)
},
cli::cli_abort("Unkown schema", call = NULL)
)
# Add schema only
tiledb::tiledb_array_create(self$uri, schema = sch)
private$.tiledb_array <- tiledb::tiledb_array(self$uri,
ctx = self$ctx,
query_type = mode,
query_layout = "UNORDERED",
keep_open = TRUE)
private$.mode <- mode
private$.object_type <- "ARRAY"
if (!is.null(meta)) {
self$set_metadata(meta)
}
invisible(self)
},
#' @description Write time-series data to array.
#'
#' @param tsdata An object of class `data.frame`.
#'
#' @return The object, invisibly.
#'
write = function(tsdata) {
private$check_object_exists()
private$check_open_for_write()
private$log_debug("write", "Writing time-series data")
arr <- self$object
arr[] <- tsdata
invisible(self)
},
#' @description Ingest new time-series data to array.
#'
#' @param tsdata A `data.frame`, [arrow::Table()] or [arrow::RecordBatch()] with new symbols
#' including index column.
#'
#' @return The object, invisibly.
#'
ingest = function(tsdata) {
private$check_object_exists()
private$check_open_for_write()
stopifnot(is.data.frame(tsdata) || is_arrow_record_batch(tsdata) || is_arrow_table(tsdata))
if (is.data.frame(tsdata)) {
tsdata <- arrow::as_arrow_table(tsdata)
}
cols <- tsdata$ColumnNames()
new_columns <- cols[cols != "index"]
if (isFALSE(all(!(new_columns %in% self$symbols())))) {
cli::cli_abort("Found symbols that are already present in database.", call = NULL)
}
if (is.null(tsdata[["index"]])) {
cli::cli_abort("'index' is missing from time-series data.", call = NULL)
}
start_index <- paste0(as.character(tsdata[["index"]][1]$as_vector()), "/")
qry <- private$index_query(start_index, self$tclass[1], tz = self$tzone)
arrowx <- self$tiledb_array(return_as = "arrow",
selected_ranges = list(index = qry$index))[]
i <- arrowx$num_columns
res <- lapply(new_columns, function(.s) {
arrowx <<- arrowx$AddColumn(i, arrow::field(.s, arrow::float64()), tsdata[[.s]])
i <<- i + 1
})
# evolve schema with new columns
self$add_symbols(new_columns)
self$reopen("WRITE")
# Next write new data
# TODO: zero copy?
# Not yet, see tiledb-r issue #847
arr <- self$object
arr[] <- arrowx
invisible(self)
},
#' @description Get subset of time series array.
#'
#' @param i Extract time index rows. One of the following:
#'
#' - An `ISO-8601` style range string that represents the rows to extract.
#' - A numeric vector with elements within index range (Not supported).
#' - A logical vector of the same length as index (Not supported).
#' - A time-based vector (Not supported).
#'
#' @param cols A character vector of column names (`TileDB` attributes).
#' @param as_xts Should the output be returned as `xts`? Default is `FALSE`.
#'
#' @return An object of class `data.table`.
#'
subset = function(i = "1970-01-01/", cols = character(), as_xts = FALSE) {
if (is.character(i)) {
# i. date string range
# ii. time of day recurrent selection, e.g, ["T09:30:30/T16:10"] (not supported)
tod <- "(^/T)|(^T.*?/T)|(^T.*/$)"
if (length(i) == 1 && !identical(integer(), grep(tod, i[1]))) {
cli::cli_abort("Time of day pattern is not supported")
} else {
qry <- private$index_query(i, self$tclass[1], tz = self$tzone)
x <- self$tiledb_array(attrs = cols,
return_as = "arrow",
selected_ranges = list(index = qry$index))[]
}
} else {
cli::cli_abort("Not supported")
}
# extract query status/stats
qstatus <- attr(x, "query_status")
qstats <- attr(x, "query_statistics")
x <- data.table::as.data.table(x)
# assign back query status/stats
data.table::setattr(x, "query_status", value = qstatus)
data.table::setattr(x, "query_statistics", value = qstats)
# set tzone attr to index
data.table::setattr(x$index, "tzone", value = self$tzone)
if(as_xts) {
xts::as.xts(x[])
} else {
x[]
}
},
#' @description Get the index domain.
#'
#' @return A vector of length two with min, max values of index domain.
#'
domain_index = function() {
dim_index <- self$dimensions()[["index"]]
dom_index <- tiledb::domain(dim_index)
tc <- self$tclass[1]
if (tc == "Date") {
i <- as.Date(dom_index)
} else if (tc == "POSIXct"){
i <- as.numeric(dom_index) / 1000
i <- as.POSIXct(i, tz = "UTC")
} else {
cli::cli_abort("Unsupported time-series index class")
}
i
},
#' @description Get the index range of a series.
#'
#' @param series A character vector of symbol names. Defaults to `NULL`
#' to return all series.
#'
#' @return An object of class `data.table`.
#'
index_range = function(series = NULL) {
if (is.null(series)) {
series = character()
}
x <- self$tiledb_array(attrs = series,
return_as = "arrow")[]
x <- data.table::as.data.table(x)
x <- data.table::melt(x, id.vars = "index",
variable.name = "symbol",
variable.factor = FALSE,
value.name = "value",
na.rm = TRUE)
index <- NULL
x1 <- x[x[, .I[which.min(index)], by = "symbol"]$V1, c("symbol", "index")]
x2 <- x[x[, .I[which.max(index)], by = "symbol"]$V1, c("symbol", "index")]
en_date <- i.index <- NULL
x1[x2, on = "symbol", en_date := i.index]
data.table::setnames(x1, old = "index", new = "st_date")
x1[]
},
#' @description Get the start index of a series.
#'
#' @param series A character vector of symbol names. Defaults to `NULL`
#' to return all series.
#'
#' @return An object of class `data.table`.
#'
index_start = function(series = NULL) {
self$first(n = 1, series = series)[, c("symbol", "index")]
},
#' @description Get the last index of a series.
#'
#' @param series A character vector of symbol names. Defaults to `NULL`
#' to return all series.
#'
#' @return An object of class `data.table`.
#'
index_last = function(series = NULL) {
self$last(n = 1, series = series)[, c("symbol", "index")]
},
#' @description Get the N first values of a series.
#'
#' @param n A number of observations to return.
#' @param series A character vector of symbol names.
#'
#' @return An object of class `data.table`.
#'
first = function(n = 1, series = character()) {
if (is.null(series)) {
series = character()
}
dt <- self$tiledb_array(return_as = "arrow", attrs = series)[]
dt <- data.table::as.data.table(dt)
# set tzone attr to index
data.table::setattr(dt$index, "tzone", value = self$tzone)
dt <- data.table::melt(dt, id.vars = "index",
variable.name = "symbol",
variable.factor = FALSE,
value.name = "value",
na.rm = TRUE)
data.table::setorderv(dt, c("index"), order = 1)
dt[dt[, .I[seq_len(.N) <= n], by = "symbol"]$V1, ]
},
#' @description Get the N last values of a series.
#'
#' @param n A number of observations to return.
#' @param series A character vector of symbol names.
#'
#' @return An object of class `data.table`.
#'
last = function(n = 1, series = character()) {
if (is.null(series)) {
series = character()
}
dt <- self$tiledb_array(return_as = "arrow", attrs = series)[]
dt <- data.table::as.data.table(dt)
# set tzone attr to index
data.table::setattr(dt$index, "tzone", value = self$tzone)
dt <- data.table::melt(dt, id.vars = "index",
variable.name = "symbol",
variable.factor = FALSE,
value.name = "value",
na.rm = TRUE)
data.table::setorderv(dt, c("index"), order = -1)
data.table::setorderv(
dt[dt[, .I[seq_len(.N) <= n], by = "symbol"]$V1, ],
c("index"), order = 1
)[]
},
#- ---- ----- - -
#' @description Retrieve data from Array using symbol names.
#'
#' @param syms A character vector with symbol names.
#'
#' @return An object of class `data.table`.
#'
query_symbols = function(syms){
dt <- self$tiledb_array(return_as = "arrow", attrs = syms)[]
dt <- data.table::as.data.table(dt)
dt
},
#' @description Remove symbols from a Chronos Array.
#'
#' @param syms A character vector with symbol names for deletion.
#'
#' @return The object, invisibly.
#'
remove_symbols = function(syms) {
lapply(syms, function(.s) {
ase <- tiledb::tiledb_array_schema_evolution()
ase <- tiledb::tiledb_array_schema_evolution_drop_attribute(ase, .s)
ase <- tiledb::tiledb_array_schema_evolution_array_evolve(ase, self$uri)
})
invisible(self)
},
#' @description Rename a symbol.
#'
#' @param x A character with symbol name to be renamed.
#' @param newname A character with new symbol name.
#'
#' @return The object, invisibly.
#'
rename_symbol = function(x, newname) {
private$check_scalar_character(newname)
if (!self$symbol_exists(x)) {
cli::cli_abort(
"{.arg {deparse(substitute(x))}} series not found.",
call = NULL
)
}
obs <- self$query_symbols(x)
colnames(obs) <- c("index", newname)
self$ingest(obs)
self$remove_symbols(x)
invisible(self)
},
#' @description Add symbols to Chronos Array.
#'
#' It is used by ingest method to evolve schema with new columns
#' of numeric type.
#'
#' @param syms A character vector with symbol names to be added as columns.
#'
#' @return The object, invisibly.
#'
add_symbols = function(syms) {
if (any(!nzchar(trimws(syms)))) {
cli::cli_abort("Symbol names should not be an empty string")
}
lapply(syms, function(.s) {
attr <- tiledb::tiledb_attr(
name = .s,
type = "FLOAT64",
ncells = 1,
nullable = TRUE,
filter_list = .tiledb_flist(level = -1, name = "ZSTD"))
ase <- tiledb::tiledb_array_schema_evolution()
ase <- tiledb::tiledb_array_schema_evolution_add_attribute(ase, attr)
tiledb::tiledb_array_schema_evolution_array_evolve(ase, self$uri)
})
invisible(self)
},
#' @description Get all symbols from a Chronos Array.
#'
#' @return A character vector with symbol names.
#'
symbols = function() {
self$attrnames()
},
#' @description Check a symbol exists in a Chronos Array.
#'
#' @param syms A string with symbol name.
#'
#' @return A boolean.
#'
symbol_exists = function(syms) {
private$check_scalar_character(syms)
syms %in% self$symbols()
},
#' @description Count unique symbols in a Chronos Array.
#'
#'
#' @return A numeric with unique number of symbols
#' in the Array.
#'
count_symbols = function() {
length(self$attrnames())
}
), # End public
private = list(
# TODO: what if it is really big? option to decide at init?
.index = NULL,
.tclass = NULL,
.tzone = NULL,
# Built index ranges
#
# i ISO8601
# tc tclass
# tz time-zone
index_query = function(i, tc, tz) {
# Sub-setting by date style strings
if (length(i) == 1) {
dt <- xts::.parseISO8601(i, tz = tz)
if (is.na(dt$last.time)) {
dt$last.time <- as.POSIXct(Sys.Date(), tz = tz)
}
if (tc == "Date") {
dt <- sapply(dt, as.Date, tz = tz)
out <- list(index = cbind(dt[[1]], dt[[2]]))
} else {
out <- list(index = cbind(dt[1], dt[2]))
}
} else{
# Ranges e.g c("2016", "2017") or c("2016-09-21","2016-09-26/")
# Handle a vector of ISO8601 and return a matrix with ranges
if (tc == "Date") {
dt <- lapply(i, function(.x) {
.tmp <- xts::.parseISO8601(.x, tz = tz)
if (is.na(.tmp$last.time))
.tmp$last.time <- as.POSIXct(Sys.Date())
sapply(.tmp, as.Date)
})
dt <- do.call(rbind, dt)
} else {
dt <- sapply(i, function(.x) {
.tmp <- xts::.parseISO8601(.x, tz = tz)
if (is.na(.tmp$last.time)) {
.tmp$last.time <- as.POSIXct(Sys.Date())
}
.tmp
}, simplify = FALSE)
dt <- do.call(rbind, dt)
}
out <- list(index = dt)
} # multiple ranges as matrix
out
}
)
)