Ingest Transformation

Ingest Transformations enable you run SQL queries over data from your sources and only persist the output of those queries to your Collection. This gives you the power of SQL to drop, rename, or combine fields, filter out incoming rows, and even aggregate incoming documents in real-time with Rollups.

Rockset's ingestion platform applies these transformations both during the initial load of a new collection's data, and on an ongoing basis to new documents coming from your source, effectively giving you a real-time materialized view of your data.

An ingest transformation can be applied to the whole collection or an individual source. Ingest transformations specified for a source will precede the ingest transformation specified for the collection. The source ingest transformation is applied and then the collection integration transformation is applied on the output of the source ingest transformation.

Ingest transformations can be configured at collection creation time, either in the Rockset Console or through the Rockset APIΒ using theΒ field_mapping_query field for the collection or the ingest_transformation field for the source.

We'll cover the following topics:

Ingest Transformations

You configure ingest transformations by specifying ingest transformation at collection creation time, either in the Rockset Console or through the Rockset API using the field field_mapping_query for collection ingest transformations and ingest_transformation for source ingest transformations. To update an ingest transformation after collection creation, see Updating Collections.

Ingest transformations have the following structure:

SELECT expression [, ...]
FROM _input
[ WHERE condition ]
[ GROUP BY field [, ...] ]
[ HAVING condition ]
[ CLUSTER BY field [, ...] ]

Each component above is similar to its normal query counterpart. Here's a brief overview of each clause and how/why you might use them.

SELECT Clause

The SELECT clause is where you define:

  • Which input fields to include or exclude (e.g. SELECT * or SELECT * EXCEPT (a, b) or
    SELECT a, b).
  • Field projections and renaming (e.g. SELECT a AS b).
  • New expressions to evaluate and add to the final document (e.g. SELECT a + 1 AS b).
  • Whether object subfields are extracted to top-level fields (e.g. SELECT a, b.*). As with
    SELECT * you can use the EXCEPT clause to exclude some subfields (e.g. SELECT a, b.* EXCEPT(c)).

🚧

Explicit projections take precedence over fields from the input document of the same name.

For example input document {"a": 1, "b": 2} with ingest transformation SELECT *, a AS b FROM _input will result in document {"a": 1, "b": 1} and the original value of b will be lost.

FROM Clause

Unlike regular queries, ingest transformations may only reference the pseudo-collection _input, which is the stream of input documents coming from your source. You may not have any other collection/view/alias in your FROM clause.

However you may use the WITH clause to construct a CTE that can be referenced in the FROM clause.

WHERE Clause

The WHERE clause filters out input documents from your source based on some condition, similar to a normal query.

GROUP BY Clause

The GROUP BY clause is used to construct a rollup collection, in which source documents are aggregated at ingestion time and only the resulting summarized aggregates are ingested into your collection. Much like with the GROUP BY clause of a regular SQL query, all input rows with the same values for the fields in the grouping set will be aggregated together to generate one output row.

With rollups you will often want to aggregate along a time dimension, for example saving hourly summaries of sensor readings. In those cases, it's best to create a mapping for _event_time and use that as one of the fields in your GROUP BY clause. See the rollups examples for sample usage.

There are a few key differences however from the GROUP BY clause of a regular query:

  1. Fields referenced in the GROUP BY clause of an ingest transformation must be explicit fields from the SELECT clause, such as SELECT a FROM _input GROUP BY a, or in ordinal form, SELECT a FROM _input GROUP BY 1. You cannot do SELECT * FROM _input GROUP BY a, even though you may do this in a regular query.
  2. A rollup query is only allowed to contain one grouping set. This means that you can have GROUP BY a, b, c but not GROUP BY GROUPING SETS ((a), (a,b)) as you might do in a regular query.

HAVING Clause

The HAVING clause is used in conjunction with the GROUP BY clause in a rollup to filter on the aggregated rows generated by the GROUP BY clause and aggregations. Unlike the WHERE clause, which filters out input rows, the HAVING clause only filters out rows after the GROUP BY clause and aggregations have been applied.

As with a normal SQL query, the HAVING clause can refer to the fields in the GROUP BY clause or aggregate functions. For example, if your rollup query groups by fields a and b, then including HAVING a IS NOT NULL in your rollup query will drop the aggregated rows where a is null. If you change the rollup query to contain HAVING COUNT(*) > 1 instead, this keeps the groups where at least two rows were used to form the aggregated row.

CLUSTER BY Clause

The CLUSTER BY clause is used to configure data clustering on a collection. Unlike the other clauses covered above, CLUSTER BY is an ingest transformation only concept and has no analog in regular query execution.

The fields referenced in the CLUSTER BY clause must be explicit fields in the SELECT clause and can be referenced by their name, e.g. SELECT x FROM _input CLUSTER BY x or by their ordinal, e.g. SELECT x FROM _input CLUSTER BY 1.

Source Ingest Transformations

πŸ› οΈ

Source ingest transformations are currently in Beta.

You can specify an ingest transformation for each source, which will be applied before the collection’s ingest transformation. This uses the ingest_transformation field in the source field of the create collection request.

πŸ’‘

Source Ingest Transformation is different from the collection ingest transformation, which currently uses the field_mapping_query field.

See the Limitations section for more information.

An example collection creation request might look like:

{
  "name": "trip-costs",
	// collection ingest transformation
  "field_mapping_query": {
    "sql": "SELECT TRY_CAST(fuel_used_liters AS FLOAT) * fuel_price_liter AS fuel_cost_dollars FROM _input"
  },
  "sources": [
		// source will use both the source and collection transformations
    {
      "ingest_transformation": {
        "sql": "select TRY_CAST(fuel_used_gallons AS FLOAT) * 3.785 as fuel_used_liters from _input"
      },
      "s3": {
        "bucket": "road-trip",
        "pattern": "usa/"
      },
    },
		// source will only use the collection transformation
		{
      "s3": {
        "bucket": "road-trip",
        "pattern": "spain/"
      },
    }
  ],
	...
}

In the example above, we are are aggregating data from two sources that have different schemas. In one source, we record the amount of fuel used on a road trip in gallons, while the other uses liters. To address this, we convert the source with the fuel used in gallons to liters using the source ingest transformation. Now both sources output liters, and the collection transformation can be applied to both.

Instead of having some complex transformation to deal with different sources or creating multiple collections with different ingest transformations, specifying the source ingest transformation allows you to choose how to transform each data source individually.

Ingest Transformation Use Cases

Type Coercion

You can cast fields to enforce type consistency, detect bad records, or extract advanced types from string representations.

Example:

SELECT TRY_CAST(last_updated_at AS DATETIME) AS updated_at FROM _input

PII/PHI Masking

If your input dataset has PII (personally identifiable information) or PHI (protected health information) fields, you can use a one-way crypto hash function so that Rockset only stores the hashed value and not the original PII/PHI field.

Example:

SELECT TO_HEX(SHA256(email_address)) AS email_hash FROM _input

Precomputing Fields

If your application queries involve complex SQL functions which result in slow query processing, you can pre-compute the result of the expensive SQL expression at ingestion time. Queries on these computed output fields are much faster than executing the SQL expression at query time.

Example:

-- extract the domain from the input email address
SELECT REGEXP_EXTRACT_ALL(email_address, '^([a-zA-Z0-9_.+-]+)@([a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)', 2) AS domain FROM _input

Dropping Fields

If you have unused fields in your input dataset, you can drop them upon data ingestion to save storage space.

Example:

SELECT * EXCEPT (large_field_1, large_field_2) FROM _input

Name Standardization

Sometimes the data coming into Rockset can be messy, and you can use the ingest transformation to clean it up.

Example:

SELECT mnfct AS manufacturer, ser_new_x AS serial_number
FROM _input
WHERE ser_new_x IS NOT NULL

Special Fields

You can use the ingest transformation to specify the values of the special fields _id, _event_time, and/or _op.

_id

The _id field is the unique identifier of a document in Rockset. If not specified then Rockset will create a new, random id for the document at ingestion time. But if your source data already has a unique identifier, it's best to map it to _id so future updates to the document from the source are properly reflected in Rockset.

Example:

SELECT user_id AS _id, * FROM _input

🚧

Custom Value Note

If you map a custom value for _id, the value must be non-null and of string type, otherwise the ingestion of the corresponding document will fail.

_event_time

The _event_time field is the document timestamp in Rockset and is used to determine the cutoff for retention.

Example:

SELECT TIMESTAMP_MILLIS(created_at_ts) AS _event_time, * FROM _input

🚧

Custom Value Note

If you map a custom value for _event_time, the value must be non-null and of either INT (microseconds since epoch), TIMESTAMP or ISO8601 STRING type (with valid UTC timezone), otherwise the ingestion of the corresponding document will fail.

_stored

The _stored field can be used in the SELECT clause of your ingest transformation to reduce hot storage size by excluding data from the inverted and range indexes. We recommend using _stored as an object so you can consistently store and reference multiple fields, though you can still use it as a scalar or array. Read more about this special field here.

Example:

SELECT * EXCEPT(user_id), {'user_id': user_id} _stored FROM _input

_op

The _op field is used to properly handle CDC operations from your source. The supported operation types (case insensitive) are: INSERT, UPDATE, UPSERT (default), DELETE, REPLACE, and REPSERT. For more on their exact semantics, refer to the documentation for _op.

Example:

SELECT CASE WHEN _delete = true THEN 'DELETE' ELSE 'UPSERT' END as _op, * FROM _input

Search Tokenization

Text fields in the input can be tokenized at data ingestion time so you can more efficiently run text search queries later.

Example:

SELECT TOKENIZE(tweet_text, 'en_US') AS tweet_tokens FROM _input

Vector Search

Enable storage and performance optimizations for Vector Embeddings and ensure compatibility between vectors during query execution. For more implementation details, refer to the documentation for VECTOR_ENFORCE.

Example:

SELECT VECTOR_ENFORCE(embeddings, 1536, 'float') as embeddings FROM _input

Importing Geographic Data

You can convert latitude/longitude points or well known text strings to Rockset's native geography type.

Example:

SELECT ST_GEOGPOINT(longitude, latitude) AS geo FROM _input

To validate your geographic data at ingest time the ST_VALIDATE_GEOGRAPHY function can be useful.

Example:

SELECT *  
FROM _input
WHERE ST_VALIDATE_GEOGRAPHY(_input.geo_polygon) IS NULL

CSV File Ingestion

When ingesting data from a CSV or TSV file, you can use ingest transformations to fully control how the data is ingested into your collection just like with any other data source. If you don't need to ingest every column from your CSV file, you can specify only the columns you need inside your select clause. To specify the typing of the fields ingested from your file, you should use type casting statements.

As an example, assume you had a CSV file with three fields ('name', 'sign_up_date', and 'age'). An ingest transformation that drops the age field and converts sign_up_date to a datetime type while also renaming the column could be written in the following way:

SELECT name, TRY_CAST(sign_up_date AS DATETIME) AS signUpDate FROM _input;

For more examples covering further use cases of ingest transformations, review the rest of this document.

Limitations

Limitations related to ingest transformation queries:

  • JOINs are not allowed in the ingest transformation query. When bringing in data from multiple sources into the same collection, the sources are automatically unioned and the result available via the pseudo-collection _input.
  • Some SQL clauses including LIMIT, OFFSET, and ORDER BY are unsupported in the ingest transformation.

Limitations for Source Ingest Transformations:

  • The capabilities associated with specifying source ingest transformations are only available through the Rockset REST API. These actions cannot currently be done through the Rockset console.
  • Source ingest transformations are not supported using Kafka Connect or the Write API. They can only take advantage of the collection ingest transformation.
  • CLUSTER BY and GROUP BY clauses cannot be specified on the source ingest transformation. Thus, source ingest transformations do not support Real-Time Aggregations.

Limitations for specific sources:

  • DynamoDB– For collections sourced from DynamoDB, you cannot specify a custom mapping for _id. So SELECT user_id AS _id is invalid. This is so we can properly map updates coming from DynamoDB to the corresponding Rockset document to keep your collection in-sync.