- Loading Data
Ingest Transformation
An ingest transformation lets you run a SQL query over the data from your source and only persists the output of that query to the collection. This gives you the power of SQL to drop, rename, or combine fields, filter out incoming rows, and even aggregate incoming documents in real-time with rollups.
Rockset's ingestion platform applies this transformation both during the initial load of a new collection's data, and on an ongoing basis to new documents coming from your source, effectivley giving you a real-time materialized view of your data.
Here we'll cover
- The Structure of an Ingest Transformation Query
- Rollups
- Use Cases For Ingest Transformation
- Example Ingest Transformation Queries
Ingest Transformation Query
You configure an ingest transformation by specifying an ingest transformation query at collection
creation time, either in the Rockset Console or through the REST API's
create collection endpoint using the field field_mapping_query
.
To update an ingest transformation after collection creation see below.
The structure of an ingest transformation query is
SELECT
expression [, ...]
FROM
_input [ WHERE condition ] [ GROUP BY field [, ...] ] [ HAVING condition ] [ CLUSTER BY field [, ...] ]
Each component above is similar to its normal query counterpart. Here's a brief overview of each clause and how/why you might use it.
SELECT
Clause
The SELECT
clause is where you define
- Which input fields to include/exclude, e.g.
SELECT *
orSELECT * EXCEPT (a, b)
orSELECT a, b
- Field projections (renaming), e.g.
SELECT a AS b
- New expressions to evaluate and add to the final document, e.g.
SELECT a + 1 AS b
- Whether object subfields are extracted to top-level fields, e.g.
SELECT a, b.*
. As withSELECT *
you can use the EXCEPT clause to exclude some subfields, e.g.SELECT a, b.* EXCEPT(c)
Note: Explicit projections take precedence over fields from the input document of the same name.
For example input document {"a": 1, "b": 2}
with ingest transformation
SELECT *, a AS b FROM _input
will result in document {"a": 1, "b": 1}
and the original value of
b
will be lost.
FROM
Clause
Unlike regular queries, ingest transformation queries may only reference the pseudo-collection
_input
, which is the stream of input documents coming from your source. You may not have any other
collection/view/alias in your FROM clause.
However you may use the WITH clause to construct a CTE that can be referenced in the FROM clause.
WHERE
Clause
The WHERE
clause filters out input documents from your source based on some condition, similar to
a normal query.
GROUP BY
Clause
The GROUP BY
clause is used to construct a rollup collection in which source documents
are aggregated at ingestion time and only the resulting summarized aggregates are ingested into your
collection. Much like with the GROUP BY
clause of a regular SQL query, all input rows with the
same values for the fields in the grouping set will be aggregated together to generate one output
row.
With rollups you will often want to aggregate along a time dimension, for example saving
hourly summaries of sensor readings. In those cases it's best to create a mapping for
_event_time
and use that as one of the fields in your GROUP BY
clause. See the
rollups examples below for sample usage.
There are a few key differences however from the GROUP BY
clause of a regular query:
- Fields referenced in the
GROUP BY
clause of an ingest transformation must be explicit fields from theSELECT
clause, such asSELECT a FROM _input GROUP BY a
, or in ordinal form,SELECT a FROM _input GROUP BY 1
. You cannot doSELECT * FROM _input GROUP BY a
, even though you may do this in a regular query. - A rollup query is only allowed to contain one grouping set. This means that you can have
GROUP BY a, b, c
but notGROUP BY GROUPING SETS ((a), (a,b))
as you might do in a regular query.
HAVING
Clause
The HAVING
clause is used in conjunction with the GROUP BY
clause in a rollup query
to filter on the aggregated rows generated by the GROUP BY
clause and aggregations. Unlike the
WHERE
clause, which filters out input rows, the HAVING
clause only filters out rows after the
GROUP BY
clause and aggregations have been applied.
As with a normal SQL query, the HAVING
clause can refer to the fields in the GROUP BY
clause or
aggregate functions. For example, if your rollup query groups by fields a
and b
, then including
HAVING a IS NOT NULL
in your rollup query will drop the aggregated rows where a
is null
. If
you change the rollup query to contain HAVING COUNT(*) > 1
instead, this keeps the groups where at
least two rows were used to form the aggregated row.
CLUSTER BY
Clause
The CLUSTER BY
clause is used to configure data clustering
on a collection. Unlike the other clauses covered above, CLUSTER BY
is an ingest transformation
only concept and has no analog in regular query execution.
The fields referenced in the CLUSTER BY
clause must be explicit fields in the SELECT
clause and
can be referenced by their name, e.g. SELECT x FROM _input CLUSTER BY x
or by their ordinal, e.g.
SELECT x FROM _input CLUSTER BY 1
.
Rollups
Overview
Rollups are a class of ingest transformation that enable you to aggregate data as it is ingested, combining multiple documents into one.
As new data comes in, Rockset will transform and aggregate it before storing it in your rollup collection. For time-series data, even out-of-order data arrivals that come in after the fact will be properly aggregated automatically.
Using rollups provides two main benefits:
- Reduced storage size because only the aggregated data is stored and indexed.
- Improved query performance because the metrics you care about have been precomputed at ingestion time.
To use rollups, all you need is a GROUP BY clause in your ingest transformation SQL. See the examples below for sample usage.
Rollups are currently available for the following streaming data sources:
- Apache Kafka
- Confluent Cloud
- Confluent Platform (self-managed)
- Amazon Kinesis
- Amazon MSK
- Azure Service Bus
- Azure Event Hubs
Rollups are also available for the following Data Lakes and Warehouses:
- Amazon S3
- Google Cloud Storage
- Azure Blob Storage
Real-time Rollups
Rollups in Rockset are unique and preserve the real-time nature of your data analytics.
Traditionally, rollups are implemented with ahead-of-time aggregation, which means you don’t have access to your data until the end of the aggregation interval. In this approach, aggregation groups are considered immutable, so all data for an aggregation interval must be present for the aggregation to happen.
In contrast, rollups in Rockset allow updates to aggregation groups, so your data remains real-time. You can query your Rockset rollup collection and get up-to-date results as incoming data continues to be aggregated. In addition, because updates to aggregation groups are allowed, out-of-order arrivals "just work" in Rockset.
Ingest Transformation Use Cases
Type Coercion
You can cast fields to enforce type consistency, detect bad records, or extract advanced types from string representations.
Example:
SELECT
TRY_CAST(last_updated_at AS DATETIME) AS updated_at
FROM
_input
PII/PHI Masking
If your input dataset has PII (personally identifiable information) or PHI (protected health information) fields, you can use a one-way crypto hash function so that Rockset only stores the hashed value and not the original PII/PHI field.
Example:
SELECT
TO_HEX(SHA256(email_address)) AS email_hash
FROM
_input
Precomputing Fields
If your application queries involve complex SQL functions which result in slow query processing, you can pre-compute the result of the expensive SQL expression at ingestion time. Queries on these computed output fields are much faster than executing the SQL expression at query time.
Example:
-- extract the domain from the input email address
SELECT
REGEXP_EXTRACT_ALL(
email_address,
'^([a-zA-Z0-9_.+-]+)@([a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)',
2
) AS domain
FROM
_input
Dropping Fields
If you have unused fields in your input dataset, you can drop them upon data ingestion to save storage space.
Example:
SELECT
*
EXCEPT
(large_field_1, large_field_2)
FROM
_input
Name Standardization
Sometimes the data coming into Rockset can be messy, and you can use the ingest transformation to clean it up.
Example:
SELECT
mnfct AS manufacturer,
ser_new_x AS serial_number
FROM
_input
WHERE
ser_new_x IS NOT NULL
Special Fields
You can use the ingest transformation to specify the values of the special fields
_id
, _event_time
, and/or _op
.
_id
The _id field is the unique identifier of a document in Rockset.
If not specified then Rockset will create a new, random id for the document at ingestion time. But
if your source data already has a unique identifier, it's best to map it to _id
so future updates
to the document from the source are properly reflected in Rockset.
Example:
SELECT
user_id AS _id,
*
FROM
_input
Note: If you map a custom value for _id
, the value must be non-null and of string type,
otherwise the ingestion of the corresponding document will fail.
_event_time
The _event_time field is the document timestamp in Rockset and is used to determine the cutoff for retention.
Example:
SELECT
TIMESTAMP_MILLIS(created_at_ts) AS _event_time,
*
FROM
_input
Note: If you map a custom value for _event_time
, the value must be non-null and of either int
(microseconds since epoch) or timestamp type, otherwise the ingestion of the corresponding document
will fail.
_op
The _op field is used to properly handle CDC operations
from your source. The supported operation types (case insensitive) are: INSERT
, UPDATE
,
UPSERT
(default), DELETE
, REPLACE
, and REPSERT
. For more on their exact semantics, refer
to the documentation for _op
.
Example:
SELECT
CASE
WHEN _delete = true THEN 'DELETE'
ELSE 'UPSERT'
END as _op,
*
FROM
_input
Search Tokenization
Text fields in the input can be tokenized at data ingestion time so you can more efficiently run text search queries later.
Example:
SELECT
TOKENIZE(tweet_text, 'en_US') AS tweet_tokens
FROM
_input
Importing Geographic Data
You can convert latitude/longitude points or well known text strings to Rockset's native geography type.
Example:
SELECT
ST_GEOGPOINT(longitude, latitude) AS geo
FROM
_input
Examples
Let’s look at some more in-depth example ingest transformation queries.
Rollups Examples
Example:
SELECT
DATE_TRUNC('HOUR', PARSE_TIMESTAMP_ISO8601(ts)) AS _event_time,
location,
SUM(cat_count) AS cat_count,
MAX(cat_count + dog_count) AS peak_animals_seen,
SQRT(COUNT(*) + AVG(cat_count)) * APPROX_DISTINCT(dog_count) AS my_crazy_metric
FROM
_input
GROUP BY
_event_time,
location
HAVING
SUM(cat_count) > 5
Imagine the data source is a network of cameras that detect cats and dogs. Every time a cat and/or
dog is found, an event is recorded in the form:
{"ts": "2021-04-20T16:20:42+0000", "location": "San Francisco", "cat_count": 1, "dog_count": 2}
.
There is a requirement to count the total number of cats per location, per hour. There is also a
requirement to track the locations and times with the highest numbers of animals seen (perhaps to
detect if cameras need faster processors that can count more animals). In addition, the
my_crazy_metric
expression is included here as an example to show just how complex a rollup query
expression can get. The HAVING
clause keeps the aggregated rows where the total number of cats
seen is greater than five.
Example:
SELECT
DATE_TRUNC(
'DAY',
PARSE_TIMESTAMP('%Y/%m/%d %H:%M', datetimeStr)
) AS _event_time,
region,
SUM(purchasePrice) AS revenue,
AVG(purchasePrice) AS avgPrice,
MAX(purchasePrice) AS largestSale,
SUM(CAST(purchasePrice > 3 AS int)) AS largeSales,
SUM(purchasePrice * 2) + 1 AS allTheTransforms,
COUNT(*) AS counts
FROM
_input
GROUP BY
_event_time,
region
In this case, there is raw data for a business’s sales in different regions. Each time a purchase is
made, the timestamp, the location of the sale, and the price of the item purchased are recorded. To
best understand the business’s financial health, a data-driven approach is taken to aggregate the
purchase price data in several different ways. The GROUP BY
clause groups data by date and region.
Anonymization and Casting
SELECT
*
EXCEPT
-- drop some fields
(extra_data, more_extra_data, email_address),
-- type coercion
TRY_CAST(last_updated_at AS DATETIME) AS last_updated_at,
-- PII/PHI masking
TO_HEX(SHA256(email_address)) AS email_address_hash,
FROM
_input
WHERE
-- filter input documents
email_address IS NOT NULL
This query keeps all input fields by default, except for extra_data
, more_extra_data
, and
email_address
. It adds two new fields to the final document: last_updated_at
and
email_address_hash
. If there are any input fields named last_updated_at
or email_address_hash
in the input document, their values in the final document will be the values of the specified
expressions, not their values in the input document. It also filters out all input documents where
email_address
is null
.
Data Clustering
SELECT
cntry AS country,
cont AS continent,
capt AS capital,
gdp_usd as gross_domestic_product,
ROUND(population / 1e6) AS population_milliions
FROM
_input
WHERE
population_millions > 0 CLUSTER BY continent,
country
This query drops all input fields by default since there is no *
in the SELECT
clause, and
instead it explicitly renames several input fields to their cleaner final names country
,
continent
, capital
, gross_domestic_product
and computes pouplation_millions
from the input
field population
. The WHERE
clause filters out any documents corresponding to countries with a
population less than 1M. And data clustering is configured on
the collection with clustering key (continent, country)
which will speed up queries that filter on
continent
, or continent
and country
significantly.
CDC Control Records and _op
SELECT
user_id AS _id,
IF(header.delete = true, 'DELETE', 'UPSERT') AS _op,
IF(
header.delete = true,
null,
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%E*S', created_at_ts)
) AS _event_time,
*
EXCEPT
(user_id, header, created_at_ts)
FROM
_input
This query maps user_id
from the source documents to _id
in
Rockset and interprets _op
based on whether a field is set in the
header. In the case of a new record coming in, header.delete
will be false (or null) and both IF
statements will execute their second branch which will set _op = 'UPSERT'
and will parse
_event_time
from _created_at_ts
. All fields except the
mapped ones and the header will then be inserted into Rockset. When a delete comes through and
header.delete
is true, then the first branch of the IF
statements will execute which will set
_op = DELETE
and leave _event_time
as null. This will issue a delete for the document
corresponding to _id
.
Precomputing Fields
SELECT
twitter_handle,
follower_count,
-- search tokenization
TOKENIZE(tweet_text, 'en_US') AS tweet_text_tokens,
-- functional index
REGEXP_EXTRACT_ALL(
email_address,
'^([a-zA-Z0-9_.+-]+)@([a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)',
2
) AS email_address_domain,
-- native geography type
ST_GEOGPOINT(longitude, latitude) AS geo,
FROM
_input
This query drops all input fields by default and only keeps the input fields twitter_handle
and
follower_count
. It also adds three new fields to our final document: tweet_text_tokens
,
email_address_domain
, and geo
, each of which can be used to speed up queries that would
otherwise have to run expensive operations at query time on the raw input fields.
Using UNDEFINED
SELECT
*,
IF(
user IS NOT NULL,
CONCAT(user, '@rockset.com'),
UNDEFINED
) AS email_address
FROM
_input
This query keeps all input fields by default and adds one new field to the final document:
email_address
.
Note: If user
is null
, the email_address
field will be set to undefined
in the final
document. Setting a field’s value to undefined
in an ingest transformation means that the field
will not be present in the final document. Thus, email_address
will only be present in the final
document if the input document’s user
field is not null
. See
documentation on undefined for more details.
Updating An Ingest Transformation
You can update the ingest transformation for a collection at any time using the REST API's update collection endpoint. Currently this functionality is only available through the API, and ingest transformation updates cannot be done through the Rockset console.
When you update the transformation, you may see a brief pause in ingestion for your sources. It usually only takes a few seconds for the updated transformation to take effect, but with some sources it can take up to a few minutes.
When you update an ingest transformation, only new documents being ingested from your source will have the new ingest transformation applied. The updated ingest transformation will not affect previously ingested documents. There is also no built-in versioning with the ingest transformation to track which documents of a collection had which transformation applied at ingest time. However you can add this in explicitly as part of your transformation, for example
SELECT
*,
1 AS version
FROM
_input
There are some restrictions when updating a transformation:
- You cannot update ingest transformations with rollups configured.
- You cannot add rollups to an existing ingest transformation.
- You cannot modify the clustering of an ingest transformation.
Otherwise you are free to add/remove/change field projections and names, add/remove where clause predicates, etc.
To remove the ingest transformation for a collection all-together, update the transformation to the default SELECT * FROM _input
.
CSV File Ingestion
When ingesting data from a CSV or TSV file, you can use ingest transformations to fully control how the data is ingested into your collection just like with any other data source. If you don't need to ingest every column from your CSV file, you can specify only the columns you need inside your select clause. To specify the typing of the fields ingested from your file, you should use type casting statements.
As an example, assume you had a CSV file with three fields ('name', 'sign_up_date', and 'age'). An ingest transformation that drops the age field and converts sign_up_date to a datetime type while also renaming the column could be written in the following way:
SELECT
name,
TRY_CAST(sign_up_date AS DATETIME) AS signUpDate
FROM
_input;
For more examples covering further use cases of ingest transformations, review the rest of this document.
Limitations
There are some limitations related to ingest transformation queries that should be kept in mind.
- JOINs are not allowed in the ingest transformation query. When bringing in data from multiple
sources into the same collection, the sources are automatically unioned and the result available
via the pseudo-collection
_input
. - Some SQL clauses including LIMIT, OFFSET, and ORDER BY are unsupported in the ingest transformation.
Source-specific limitations:
- MongoDB– Rockset's MongoDB connector only receives deltas from your MongoDB collection
instead of the full document on each update in order to reduce load on your MongoDB cluster. For
this reason, with MongoDB any expressions in the ingest transformation query can only involve 1
field. So
SELECT a + 1 AS b
is allowed, but notSELECT a + b AS c
. - DynamoDB– For collections sourced from DynamoDB, you cannot specify a custom mapping for
_id
. SoSELECT user_id AS _id
is invalid. This is so we can properly map updates coming from DynamoDB to the corresponding Rockset document to keep your collection in-sync.