Source code for rockset.source

"""
Introduction
------------
Source objects represent various data sources that could be used to create
collections.

Example usage
-------------
::

    from rockset import Client, Q, F
    import os

    rs = Client()

    # create a collection from an AWS S3 bucket
    integration = rs.Integration.retrieve('aws-rockset-read-only')
    s3 = rs.Source.s3(bucket='my-s3-bucket',
        integration=integration)
    newcoll = rs.Collection.create(name='newcoll', sources=[s3])

Create AWS S3 source for a collection
-------------------------------------
AWS S3 buckets can be used as a data source for collections::

    from rockset import Client, Q, F
    import os

    rs = Client()

    # create a collection from an AWS S3 bucket
    integration = rs.Integration.retrieve('aws-rockset-read-only')
    s3 = rs.Source.s3(bucket='my-s3-bucket',
        integration=integration)
    newcoll = rs.Collection.create(name='newcoll', sources=[s3])

.. automethod :: rockset.Source.s3

"""

from rockset.swagger_client.models import (
    FormatParams,
    Integration,
    CsvParams,
    XmlParams,
)


[docs]class Source(object): def __init__(self, integration, format_params): if isinstance(integration, Integration) or isinstance(integration, dict): self.integration_name = integration.get("name") elif integration is not None: ret = "TypeError: invalid object type {} for integration".format( type(integration) ) raise TypeError(ret) if format_params is not None: self.format_params = format_params def __str__(self): return str(vars(self)) def __iter__(self): for k, v in vars(self).items(): yield (k, v)
[docs] @classmethod def s3( cls, bucket, prefixes=None, prefix=None, pattern=None, integration=None, format_params=None, ): """Creates a source object to represent an AWS S3 bucket as a data source for a collection. Args: bucket (str): Name of the S3 bucket prefix: Path prefix to only source S3 objects that are recursively within the given path. (optional) pattern: Path pattern to only source S3 objects that match the given pattern. (optional) integration (rockset.Integration): An Integration object (optional) format_params (FormatParams): the specifications of the format, CsvParams or XmlParams """ return SourceS3( bucket=bucket, prefixes=prefixes, prefix=prefix, pattern=pattern, integration=integration, format_params=format_params, )
[docs] @classmethod def dynamo( cls, table_name, integration=None, aws_region="us-west-2", use_scan_api=True ): """Creates a source object to represent an AWS DynamoDB table as a data source for a collection. Args: table_name (str): Name of the DynamoDB table integration (rockset.Integration): An Integration object (optional) aws_region: AWS region of the DynamoDB table use_scan_api: Whether the initial table scan should use the DynamoDB scan API. """ return SourceDynamo( table_name=table_name, integration=integration, aws_region=aws_region, use_scan_api=use_scan_api, )
[docs] @classmethod def mongo( cls, database_name, collection_name, integration, ): """Creates a source object to represent a MongoDB collection as a data source for a Rockset collection. Args: database_name (str): Name of the MongoDB database collection_name (str): Name of the MongoDB collection integration (rockset.Integration): An Integration object """ return SourceMongo( database_name=database_name, collection_name=collection_name, integration=integration, )
[docs] @classmethod def kinesis( cls, stream_name, integration, dms_primary_key=None, format_params=None, aws_region="us-west-2", ): """Creates a source object to represent a Kinesis Stream as a data source for a collection Args: stream_name (str): Name of the Kinesis Stream integration (rockset.Integration): An Integration object (optional) dms_primary_key (list of strings): DMS primary key (optional) format_params (FormatParams): the specifications of the format (optional) aws_region (str): AWS region the stream is created in (defaults to us-west-2) """ return SourceKinesis( stream_name=stream_name, integration=integration, dms_primary_key=dms_primary_key, format_params=format_params, aws_region=aws_region, )
[docs] @classmethod def redshift(cls, database, schema, table_name, integration): """Creates a source object to represent an AWS Redshift table as a data source for a collection. Args: database (str): Name of the Redshift database schema (str): Name of the Redshift schema table_name (str): Name of the Redshift table integration (rockset.Integration): An Integration object (optional) """ return SourceRedshift( database=database, schema=schema, table_name=table_name, integration=integration, )
[docs] @classmethod def csv_params( cls, separator=None, encoding=None, first_line_as_column_names=None, column_names=None, column_types=None, ): """Creates a object to represent options needed to parse a CSV file Args: separator (str): The separator between column values in a line encoding (str): The encoding format of data, one of "UTF-8", "UTF-16" "US_ASCII" [default: "US-ASCII"] first_line_as_column_names (boolean): Set to true if the first line of a data object has the names of columns to be used. If this is set to false, the the column names are auto generated. [default: False] column_names (list of strings): The names of columns column_types (list of strings): The types of columns """ csv = CsvParams( separator=separator, encoding=encoding, first_line_as_column_names=first_line_as_column_names, column_names=column_names, column_types=column_types, ) return FormatParams(csv=csv)
[docs] @classmethod def xml_params( cls, root_tag=None, doc_tag=None, encoding=None, value_tag=None, attribute_prefix=None, ): """Creates a object to represent options needed to parse a XML file Args: root_tag (str): Outermost tag within an XML file to be treated as the root. Any content outside the root tag is ignored. doc_tag (str): Every rockset document is contained between <doc_tag> and a </doc_tag> encoding (str): The encoding format of data. [default: 'UTF-8'] value_tag (str): Tag used for the value when there are attributes in the element having no child. [default: 'value'] attribute_prefix (str): Attributes are transformed into key-value pairs in a Rockset document This prefix is used to tell attributes apart from nested tags in a Rockset document. """ xml = XmlParams( root_tag=root_tag, doc_tag=doc_tag, encoding=encoding, value_tag=value_tag, attribute_prefix=attribute_prefix, ) return FormatParams(xml=xml)
[docs] @classmethod def gcs(cls, bucket=None, prefix=None, integration=None, format_params=None): """Creates a source object to represent an Google Cloud Storage(GCS) bucket as a data source for a collection. Args: bucket (str): Name of the GCS bucket prefix: selects objects whose path matches the specified prefix within the bucket integration (rockset.Integration): An Integration object (optional) format_params (FormatParams): the specifications of the format, CsvParams or XmlParams """ return SourceGcs( bucket=bucket, prefix=prefix, integration=integration, format_params=format_params, )
[docs] @classmethod def kafka( cls, kafka_topic_name, integration, use_v3=None, offset_reset_policy="EARLIEST", ): """Creates a source object to represent Apache Kafka as a data source for a collection. Args: kafka_topic_name (str): Kafka topic to be tailed use_v3 (boolean): whether to use v3 offset_reset_policy (str, LATEST/EARLIEST): the offset reset policy integration (rockset.Integration): An Integration object """ if use_v3: return SourceKafkaV3( kafka_topic_name=kafka_topic_name, offset_reset_policy=offset_reset_policy, integration=integration, ) return SourceKafka(kafka_topic_name=kafka_topic_name, integration=integration)
class SourceS3(Source): def __init__( self, bucket=None, prefixes=None, prefix=None, pattern=None, integration=None, format_params=None, ): super().__init__(integration, format_params) self.s3 = { "bucket": bucket, } if prefixes is not None: if len(prefixes) > 0: self.s3["prefix"] = prefixes[0] else: self.s3["prefix"] = "" if prefix is not None: self.s3["prefix"] = prefix elif pattern is not None: self.s3["pattern"] = pattern class SourceDynamo(Source): def __init__( self, table_name=None, integration=None, format_params=None, aws_region=None, use_scan_api=True, ): super().__init__(integration, format_params) self.dynamodb = { "table_name": table_name, "aws_region": aws_region, "use_scan_api": use_scan_api, } class SourceMongo(Source): def __init__( self, database_name=None, collection_name=None, integration=None, format_params=None, ): super().__init__(integration, format_params) self.mongodb = { "database_name": database_name, "collection_name": collection_name, } class SourceKinesis(Source): def __init__( self, stream_name=None, dms_primary_key=None, integration=None, format_params=None, aws_region=None, ): super().__init__(integration, format_params) self.kinesis = {"stream_name": stream_name} if dms_primary_key: self.kinesis["dms_primary_key"] = dms_primary_key self.kinesis["aws_region"] = aws_region class SourceGcs(Source): def __init__(self, bucket=None, prefix=None, integration=None, format_params=None): super().__init__(integration, format_params) self.gcs = {"bucket": bucket, "prefix": prefix} class SourceRedshift(Source): def __init__( self, database=None, schema=None, table_name=None, integration=None, format_params=None, ): super().__init__(integration, format_params) self.redshift = { "database": database, "schema": schema, "table_name": table_name, } class SourceKafka(Source): def __init__(self, kafka_topic_name, integration=None, format_params=None): super().__init__(integration, format_params) self.kafka = {"kafka_topic_name": kafka_topic_name} class SourceKafkaV3(Source): def __init__( self, kafka_topic_name, offset_reset_policy, integration=None, format_params=None, ): super().__init__(integration, format_params) self.kafka = { "kafka_topic_name": kafka_topic_name, "offset_reset_policy": offset_reset_policy, "use_v3": "true", }