S3 / Local Files
This connector ingests AWS S3 datasets into DataHub. It allows mapping an individual file or a folder of files to a dataset in DataHub. Refer to the section Path Specs for more details.
This connector can also be used to ingest local files.
Just replace s3://
in your path_specs with an absolute path to files on the machine running ingestion.
Supported file types
Supported file types are as follows:
- CSV (*.csv)
- TSV (*.tsv)
- JSONL (*.jsonl)
- JSON (*.json)
- Parquet (*.parquet)
- Apache Avro (*.avro)
Schemas for Parquet and Avro files are extracted as provided.
Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first 100 rows by default, which can be controlled via the max_rows
recipe parameter (see below)
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
"s3" | Data Platform | |
s3 object / Folder containing s3 objects | Dataset | |
s3 bucket | Container | Subtype S3 bucket |
s3 folder | Container | Subtype Folder |
Profiling
This plugin extracts:
- Row and column counts for each dataset
- For each column, if profiling is enabled:
- null counts and proportions
- distinct counts and proportions
- minimum, maximum, mean, median, standard deviation, some quantile values
- histograms or frequencies of unique values
Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see compatibility for more details). If profiling, make sure that permissions for s3a:// access are set because Spark and Hadoop use the s3a:// protocol to interface with AWS (schema inference outside of profiling requires s3:// access).
Enabling profiling will slow down ingestion runs.
Important Capabilities
Capability | Status | Notes |
---|---|---|
Asset Containers | ✅ | Enabled by default. Supported for types - Folder, S3 bucket. |
Data Profiling | ✅ | Optionally enabled via configuration. |
Detect Deleted Entities | ✅ | Enabled by default via stateful ingestion. |
Extract Tags | ✅ | Can extract S3 object/bucket tags if enabled. |
Schema Metadata | ✅ | Can infer schema from supported file types. |
Prerequisites
To allow the S3 ingestion source ingest metadata from an S3 bucket, you need to grant the necessary permissions to an IAM user or role. Follow these steps:
- Create an IAM Policy: Create a policy that grants read access to the S3 bucket.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": ["s3:ListBucket", "s3:GetBucketLocation", "s3:GetObject"],
"Resource": [
"arn:aws:s3:::your-bucket-name",
"arn:aws:s3:::your-bucket-name/*"
]
}
]
}
Permissions Explanation:
s3:ListBucket
: Allows listing the objects in the bucket. This permission is necessary for the S3 ingestion source to know which objects are available to read.s3:GetBucketLocation
: Allows retrieving the location of the bucket.s3:GetObject
: Allows reading the actual content of the objects in the bucket. This is needed to infer schema from sample files.
Attach the Policy to an IAM User or Role: Attach the created policy to the IAM user or role that the S3 ingestion source will use.
Configure the S3 Ingestion Source: Configure the user in s3 ingestion who you attached the role above.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
# Ingest data from S3
source:
type: s3
config:
path_specs:
- include: "s3://covid19-lake/covid_knowledge_graph/csv/nodes/*.*"
aws_config:
aws_access_key_id: *****
aws_secret_access_key: *****
aws_region: us-east-2
env: "PROD"
profiling:
enabled: false
# Ingest data from local filesystem
source:
type: s3
config:
path_specs:
- include: "/absolute/path/*.csv"
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
path_specs ✅ array | List of PathSpec. See below the details about PathSpec |
path_specs.PathSpec PathSpec | |
path_specs.PathSpec.include ❓ string | Path to table. Name variable {table} is used to mark the folder with dataset. In absence of {table} , file level dataset will be created. Check below examples for more details. |
path_specs.PathSpec.allow_double_stars boolean | Allow double stars in the include path. This can affect performance significantly if enabled Default: False |
path_specs.PathSpec.autodetect_partitions boolean | Autodetect partition(s) from the path. If set to true, it will autodetect partition key/value if the folder format is {partition_key}={partition_value} for example year=2024 Default: True |
path_specs.PathSpec.default_extension One of string, null | For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped. Default: None |
path_specs.PathSpec.enable_compression boolean | Enable or disable processing compressed files. Currently .gz and .bz files are supported. Default: True |
path_specs.PathSpec.include_hidden_folders boolean | Include hidden folders in the traversal (folders starting with . or _ Default: False |
path_specs.PathSpec.sample_files boolean | Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled Default: True |
path_specs.PathSpec.table_name One of string, null | Display name of the dataset.Combination of named variables from include path and strings Default: None |
path_specs.PathSpec.traversal_method Enum | One of: "ALL", "MIN_MAX", "MAX" |
path_specs.PathSpec.exclude One of array, null | list of paths in glob pattern which will be excluded while scanning for the datasets Default: [] |
path_specs.PathSpec.exclude.string string | |
path_specs.PathSpec.file_types array | Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted. Default: ['csv', 'tsv', 'json', 'parquet', 'avro'] |
path_specs.PathSpec.file_types.string string | |
path_specs.PathSpec.tables_filter_pattern AllowDenyPattern | A class to store allow deny regexes |
path_specs.PathSpec.tables_filter_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
add_partition_columns_to_schema boolean | Whether to add partition fields to the schema. Default: False |
generate_partition_aspects boolean | Whether to generate partition aspects for partitioned tables. On older servers for backward compatibility, this should be set to False. This flag will be removed in future versions. Default: True |
max_rows integer | Maximum number of rows to use when inferring schemas for TSV and CSV files. Default: 100 |
number_of_files_to_sample integer | Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec. Default: 100 |
platform string | The platform that this source connects to (either 's3' or 'file'). If not specified, the platform will be inferred from the path_specs. Default: |
platform_instance One of string, null | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. Default: None |
sort_schema_fields boolean | Whether to sort schema fields by fieldPath when inferring schemas. Default: False |
spark_config object | Spark configuration properties to set on the SparkSession. Put config property names into quotes. For example: '"spark.executor.memory": "2g"' Default: {} |
spark_driver_memory string | Max amount of memory to grant Spark. Default: 4g |
use_s3_bucket_tags One of boolean, null | Whether or not to create tags in datahub from the s3 bucket Default: None |
use_s3_content_type boolean | If enabled, use S3 Object metadata to determine content type over file extension, if set. Warning: this requires a separate query to S3 for each object, which can be slow for large datasets. Default: False |
use_s3_object_tags One of boolean, null | Whether or not to create tags in datahub from the s3 object Default: None |
verify_ssl One of boolean, string | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Default: True |
env string | The environment that all assets produced by this connector belong to Default: PROD |
aws_config One of AwsConnectionConfig, null | AWS configuration Default: None |
aws_config.aws_access_key_id One of string, null | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_config.aws_advanced_config object | Advanced AWS configuration options. These are passed directly to botocore.config.Config. |
aws_config.aws_endpoint_url One of string, null | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. Default: None |
aws_config.aws_profile One of string, null | The named profile to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config. Default: None |
aws_config.aws_proxy One of string, null | A set of proxy configs to use with AWS. See the botocore.config docs for details. Default: None |
aws_config.aws_region One of string, null | AWS region code. Default: None |
aws_config.aws_retry_mode Enum | One of: "legacy", "standard", "adaptive" Default: standard |
aws_config.aws_retry_num integer | Number of times to retry failed AWS requests. See the botocore.retry docs for details. Default: 5 |
aws_config.aws_secret_access_key One of string, null | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_config.aws_session_token One of string, null | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_config.read_timeout number | The timeout for reading from the connection (in seconds). Default: 60 |
aws_config.aws_role One of string, array, null | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as boto3's STS.Client.assume_role. Default: None |
aws_config.aws_role.union One of string, AwsAssumeRoleConfig | |
aws_config.aws_role.union.RoleArn ❓ string | ARN of the role to assume. |
aws_config.aws_role.union.ExternalId One of string, null | External ID to use when assuming the role. Default: None |
profile_patterns AllowDenyPattern | A class to store allow deny regexes |
profile_patterns.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
profile_patterns.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
profile_patterns.allow.string string | |
profile_patterns.deny array | List of regex patterns to exclude from ingestion. Default: [] |
profile_patterns.deny.string string | |
profiling DataLakeProfilerConfig | |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.include_field_distinct_value_frequencies boolean | Whether to profile for distinct value frequencies. Default: True |
profiling.include_field_histogram boolean | Whether to profile for the histogram for numeric fields. Default: True |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_mean_value boolean | Whether to profile for the mean value of numeric columns. Default: True |
profiling.include_field_median_value boolean | Whether to profile for the median value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.include_field_quantiles boolean | Whether to profile for the quantiles of numeric columns. Default: True |
profiling.include_field_sample_values boolean | Whether to profile for the sample values for all columns. Default: True |
profiling.include_field_stddev_value boolean | Whether to profile for the standard deviation of numeric columns. Default: True |
profiling.max_number_of_fields_to_profile One of integer, null | A positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up. Default: None |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only or include column-level profiling as well. Default: False |
profiling.operation_config OperationConfig | |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month One of integer, null | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. Default: None |
profiling.operation_config.profile_day_of_week One of integer, null | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. Default: None |
stateful_ingestion One of StatefulStaleMetadataRemovalConfig, null | Default: None |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"AwsAssumeRoleConfig": {
"additionalProperties": true,
"properties": {
"RoleArn": {
"description": "ARN of the role to assume.",
"title": "Rolearn",
"type": "string"
},
"ExternalId": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "External ID to use when assuming the role.",
"title": "Externalid"
}
},
"required": [
"RoleArn"
],
"title": "AwsAssumeRoleConfig",
"type": "object"
},
"AwsConnectionConfig": {
"additionalProperties": false,
"description": "Common AWS credentials config.\n\nCurrently used by:\n - Glue source\n - SageMaker source\n - dbt source",
"properties": {
"aws_access_key_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Access Key Id"
},
"aws_secret_access_key": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Secret Access Key"
},
"aws_session_token": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Session Token"
},
"aws_role": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/$defs/AwsAssumeRoleConfig"
}
]
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as [boto3's STS.Client.assume_role](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role).",
"title": "Aws Role"
},
"aws_profile": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.",
"title": "Aws Profile"
},
"aws_region": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS region code.",
"title": "Aws Region"
},
"aws_endpoint_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
"title": "Aws Endpoint Url"
},
"aws_proxy": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
"title": "Aws Proxy"
},
"aws_retry_num": {
"default": 5,
"description": "Number of times to retry failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
"title": "Aws Retry Num",
"type": "integer"
},
"aws_retry_mode": {
"default": "standard",
"description": "Retry mode to use for failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
"enum": [
"legacy",
"standard",
"adaptive"
],
"title": "Aws Retry Mode",
"type": "string"
},
"read_timeout": {
"default": 60,
"description": "The timeout for reading from the connection (in seconds).",
"title": "Read Timeout",
"type": "number"
},
"aws_advanced_config": {
"additionalProperties": true,
"description": "Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
"title": "Aws Advanced Config",
"type": "object"
}
},
"title": "AwsConnectionConfig",
"type": "object"
},
"DataLakeProfilerConfig": {
"additionalProperties": false,
"properties": {
"enabled": {
"default": false,
"description": "Whether profiling should be done.",
"title": "Enabled",
"type": "boolean"
},
"operation_config": {
"$ref": "#/$defs/OperationConfig",
"description": "Experimental feature. To specify operation configs."
},
"profile_table_level_only": {
"default": false,
"description": "Whether to perform profiling at table-level only or include column-level profiling as well.",
"title": "Profile Table Level Only",
"type": "boolean"
},
"max_number_of_fields_to_profile": {
"anyOf": [
{
"exclusiveMinimum": 0,
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
"title": "Max Number Of Fields To Profile"
},
"include_field_null_count": {
"default": true,
"description": "Whether to profile for the number of nulls for each column.",
"title": "Include Field Null Count",
"type": "boolean"
},
"include_field_min_value": {
"default": true,
"description": "Whether to profile for the min value of numeric columns.",
"title": "Include Field Min Value",
"type": "boolean"
},
"include_field_max_value": {
"default": true,
"description": "Whether to profile for the max value of numeric columns.",
"title": "Include Field Max Value",
"type": "boolean"
},
"include_field_mean_value": {
"default": true,
"description": "Whether to profile for the mean value of numeric columns.",
"title": "Include Field Mean Value",
"type": "boolean"
},
"include_field_median_value": {
"default": true,
"description": "Whether to profile for the median value of numeric columns.",
"title": "Include Field Median Value",
"type": "boolean"
},
"include_field_stddev_value": {
"default": true,
"description": "Whether to profile for the standard deviation of numeric columns.",
"title": "Include Field Stddev Value",
"type": "boolean"
},
"include_field_quantiles": {
"default": true,
"description": "Whether to profile for the quantiles of numeric columns.",
"title": "Include Field Quantiles",
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"default": true,
"description": "Whether to profile for distinct value frequencies.",
"title": "Include Field Distinct Value Frequencies",
"type": "boolean"
},
"include_field_histogram": {
"default": true,
"description": "Whether to profile for the histogram for numeric fields.",
"title": "Include Field Histogram",
"type": "boolean"
},
"include_field_sample_values": {
"default": true,
"description": "Whether to profile for the sample values for all columns.",
"title": "Include Field Sample Values",
"type": "boolean"
}
},
"title": "DataLakeProfilerConfig",
"type": "object"
},
"FolderTraversalMethod": {
"enum": [
"ALL",
"MIN_MAX",
"MAX"
],
"title": "FolderTraversalMethod",
"type": "string"
},
"OperationConfig": {
"additionalProperties": false,
"properties": {
"lower_freq_profile_enabled": {
"default": false,
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"title": "Lower Freq Profile Enabled",
"type": "boolean"
},
"profile_day_of_week": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"title": "Profile Day Of Week"
},
"profile_date_of_month": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"title": "Profile Date Of Month"
}
},
"title": "OperationConfig",
"type": "object"
},
"PathSpec": {
"additionalProperties": false,
"properties": {
"include": {
"description": "Path to table. Name variable `{table}` is used to mark the folder with dataset. In absence of `{table}`, file level dataset will be created. Check below examples for more details.",
"title": "Include",
"type": "string"
},
"exclude": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": [],
"description": "list of paths in glob pattern which will be excluded while scanning for the datasets",
"title": "Exclude"
},
"file_types": {
"default": [
"csv",
"tsv",
"json",
"parquet",
"avro"
],
"description": "Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.",
"items": {
"type": "string"
},
"title": "File Types",
"type": "array"
},
"default_extension": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.",
"title": "Default Extension"
},
"table_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Display name of the dataset.Combination of named variables from include path and strings",
"title": "Table Name"
},
"enable_compression": {
"default": true,
"description": "Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
"title": "Enable Compression",
"type": "boolean"
},
"sample_files": {
"default": true,
"description": "Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled",
"title": "Sample Files",
"type": "boolean"
},
"allow_double_stars": {
"default": false,
"description": "Allow double stars in the include path. This can affect performance significantly if enabled",
"title": "Allow Double Stars",
"type": "boolean"
},
"autodetect_partitions": {
"default": true,
"description": "Autodetect partition(s) from the path. If set to true, it will autodetect partition key/value if the folder format is {partition_key}={partition_value} for example `year=2024`",
"title": "Autodetect Partitions",
"type": "boolean"
},
"traversal_method": {
"$ref": "#/$defs/FolderTraversalMethod",
"default": "MAX",
"description": "Method to traverse the folder. ALL: Traverse all the folders, MIN_MAX: Traverse the folders by finding min and max value, MAX: Traverse the folder with max value"
},
"include_hidden_folders": {
"default": false,
"description": "Include hidden folders in the traversal (folders starting with . or _",
"title": "Include Hidden Folders",
"type": "boolean"
},
"tables_filter_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "The tables_filter_pattern configuration field uses regular expressions to filter the tables part of the Pathspec for ingestion, allowing fine-grained control over which tables are included or excluded based on specified patterns. The default setting allows all tables."
}
},
"required": [
"include"
],
"title": "PathSpec",
"type": "object"
},
"StatefulStaleMetadataRemovalConfig": {
"additionalProperties": false,
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"title": "Enabled",
"type": "boolean"
},
"remove_stale_metadata": {
"default": true,
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"title": "Remove Stale Metadata",
"type": "boolean"
},
"fail_safe_threshold": {
"default": 75.0,
"description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
"maximum": 100.0,
"minimum": 0.0,
"title": "Fail Safe Threshold",
"type": "number"
}
},
"title": "StatefulStaleMetadataRemovalConfig",
"type": "object"
}
},
"additionalProperties": false,
"properties": {
"path_specs": {
"description": "List of PathSpec. See [below](#path-spec) the details about PathSpec",
"items": {
"$ref": "#/$defs/PathSpec"
},
"title": "Path Specs",
"type": "array"
},
"env": {
"default": "PROD",
"description": "The environment that all assets produced by this connector belong to",
"title": "Env",
"type": "string"
},
"platform_instance": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
"title": "Platform Instance"
},
"stateful_ingestion": {
"anyOf": [
{
"$ref": "#/$defs/StatefulStaleMetadataRemovalConfig"
},
{
"type": "null"
}
],
"default": null
},
"platform": {
"default": "",
"description": "The platform that this source connects to (either 's3' or 'file'). If not specified, the platform will be inferred from the path_specs.",
"title": "Platform",
"type": "string"
},
"aws_config": {
"anyOf": [
{
"$ref": "#/$defs/AwsConnectionConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS configuration"
},
"use_s3_bucket_tags": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether or not to create tags in datahub from the s3 bucket",
"title": "Use S3 Bucket Tags"
},
"use_s3_object_tags": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether or not to create tags in datahub from the s3 object",
"title": "Use S3 Object Tags"
},
"use_s3_content_type": {
"default": false,
"description": "If enabled, use S3 Object metadata to determine content type over file extension, if set. Warning: this requires a separate query to S3 for each object, which can be slow for large datasets.",
"title": "Use S3 Content Type",
"type": "boolean"
},
"profile_patterns": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "regex patterns for tables to profile "
},
"profiling": {
"$ref": "#/$defs/DataLakeProfilerConfig",
"default": {
"enabled": false,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_date_of_month": null,
"profile_day_of_week": null
},
"profile_table_level_only": false,
"max_number_of_fields_to_profile": null,
"include_field_null_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": true,
"include_field_distinct_value_frequencies": true,
"include_field_histogram": true,
"include_field_sample_values": true
},
"description": "Data profiling configuration"
},
"spark_driver_memory": {
"default": "4g",
"description": "Max amount of memory to grant Spark.",
"title": "Spark Driver Memory",
"type": "string"
},
"spark_config": {
"additionalProperties": true,
"default": {},
"description": "Spark configuration properties to set on the SparkSession. Put config property names into quotes. For example: '\"spark.executor.memory\": \"2g\"'",
"title": "Spark Config",
"type": "object"
},
"max_rows": {
"default": 100,
"description": "Maximum number of rows to use when inferring schemas for TSV and CSV files.",
"title": "Max Rows",
"type": "integer"
},
"add_partition_columns_to_schema": {
"default": false,
"description": "Whether to add partition fields to the schema.",
"title": "Add Partition Columns To Schema",
"type": "boolean"
},
"verify_ssl": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "string"
}
],
"default": true,
"description": "Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.",
"title": "Verify Ssl"
},
"number_of_files_to_sample": {
"default": 100,
"description": "Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec.",
"title": "Number Of Files To Sample",
"type": "integer"
},
"sort_schema_fields": {
"default": false,
"description": "Whether to sort schema fields by fieldPath when inferring schemas.",
"title": "Sort Schema Fields",
"type": "boolean"
},
"generate_partition_aspects": {
"default": true,
"description": "Whether to generate partition aspects for partitioned tables. On older servers for backward compatibility, this should be set to False. This flag will be removed in future versions.",
"title": "Generate Partition Aspects",
"type": "boolean"
}
},
"required": [
"path_specs"
],
"title": "DataLakeSourceConfig",
"type": "object"
}
Path Specs
Path Specs (path_specs
) is a list of Path Spec (path_spec
) objects where each individual path_spec
represents one or more datasets. Include path (path_spec.include
) represents formatted path to the dataset. This path must end with *.*
or *.[ext]
to represent leaf level. If *.[ext]
is provided then files with only specified extension type will be scanned. ".[ext]
" can be any of supported file types. Refer example 1 below for more details.
All folder levels need to be specified in include path. You can use /*/
to represent a folder level and avoid specifying exact folder name. To map folder as a dataset, use {table}
placeholder to represent folder level for which dataset is to be created. For a partitioned dataset, you can use placeholder {partition_key[i]}
to represent name of i
th partition and {partition_value[i]}
to represent value of i
th partition. During ingestion, i
will be used to match partition_key to partition. Refer example 2 and 3 below for more details.
Exclude paths (path_spec.exclude
) can be used to ignore paths that are not relevant to current path_spec
. This path cannot have named variables ( {}
). Exclude path can have **
to represent multiple folder levels. Refer example 4 below for more details.
Refer example 5 if your bucket has more complex dataset representation.
Additional points to note
- Folder names should not contain {, }, *, / in their names.
- Named variable {folder} is reserved for internal working. please do not use in named variables.
Partitioned Dataset support
If your dataset is partitioned by the partition_key
=partition_value
format, then the partition values are auto-detected.
Otherwise, you can specify partitions in the following way in the path_spec:
- Specify partition_key and partition_value in the path like =>
{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/{partition_key[2]}={partition_value[2]}
- Partition key can be specify using named variables in the path_spec like =>
year={year}/month={month}/day={day}
3 if the path is in the form of /value1/value2/value3 the source infer partition value from the path and assign partition_0, partition_1, partition_2 etc
Dataset creation time is determined by the creation time of earliest created file in the lowest partition while last updated time is determined by the last updated time of the latest updated file in the highest partition.
How the source determines the highest/lowest partition it is based on the traversal method set in the path_spec.
- If the traversal method is set to
MAX
then the source will try to find the latest partition by ordering the partitions each level and find the latest partiton. This traversal method won't look for earilest partition/creation time but this is the fastest. - If the traversal method is set to
MIN_MAX
then the source will try to find the latest and earliest partition by ordering the partitions each level and find the latest/earliest partiton. This traversal sort folders purely by name therefor it is fast but it doesn't guarantee the latest partition will have the latest created file. - If the traversal method is set to
ALL
then the source will try to find the latest and earliest partition by listing all the files in all the partitions and find the creation/last modification time based on the file creations. This is the slowest but for non time partitioned datasets this is the only way to find the latest/earliest partition.
Path Specs - Examples
Example 1 - Individual file as Dataset
Bucket structure:
test-bucket
├── employees.csv
├── departments.json
└── food_items.csv
Path specs config to ingest employees.csv
and food_items.csv
as datasets:
path_specs:
- include: s3://test-bucket/*.csv
This will automatically ignore departments.json
file. To include it, use *.*
instead of *.csv
.
Example 2 - Folder of files as Dataset (without Partitions)
Bucket structure:
test-bucket
└── offers
├── 1.avro
└── 2.avro
Path specs config to ingest folder offers
as dataset:
path_specs:
- include: s3://test-bucket/{table}/*.avro
{table}
represents folder for which dataset will be created.
Example 3 - Folder of files as Dataset (with Partitions)
Bucket structure:
test-bucket
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet
Path specs config to ingest folders orders
and returns
as datasets:
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
or with partition auto-detection:
path_specs:
- include: s3://test-bucket/{table}/
One can also use include: s3://test-bucket/{table}/*/*/*.parquet
here however above format is preferred as it allows declaring partitions explicitly.
Example 4 - Folder of files as Dataset (with Partitions), and Exclude Filter
Bucket structure:
test-bucket
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet
Path specs config to ingest folder orders
as dataset but not folder tmp_orders
:
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
exclude:
- **/tmp_orders/**
or with partition auto-detection:
path_specs:
- include: s3://test-bucket/{table}/
Example 5 - Advanced - Either Individual file OR Folder of files as Dataset
Bucket structure:
test-bucket
├── customers
│ ├── part1.json
│ ├── part2.json
│ ├── part3.json
│ └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
└── year=2022
└── month=2
├── 1.parquet
├── 2.parquet
└── 3.parquet
Path specs config:
path_specs:
- include: s3://test-bucket/*.csv
exclude:
- **/tmp_10101000.csv
- include: s3://test-bucket/{table}/*.json
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
Above config has 3 path_specs and will ingest following datasets
employees.csv
- Single File as Datasetfood_items.csv
- Single File as Datasetcustomers
- Folder as Datasetorders
- Folder as Dataset and will ignore filetmp_10101000.csv
Valid path_specs.include
s3://my-bucket/foo/tests/bar.avro # single file table
s3://my-bucket/foo/tests/*.* # mulitple file level tables
s3://my-bucket/foo/tests/{table}/*.avro #table without partition
s3://my-bucket/foo/tests/{table}/ #table with partition autodetection. Partition only can be detected if it is in the format of key=value
s3://my-bucket/foo/tests/{table}/*/*.avro #table where partitions are not specified
s3://my-bucket/foo/tests/{table}/*.* # table where no partitions as well as data type specified
s3://my-bucket/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
s3://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions
s3://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in bucket
s3://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in bucket
Valid path_specs.exclude
- **/tests/**
- s3://my-bucket/hr/**
- */tests/.csv
- s3://my-bucket/foo/*/my_table/**
If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit.
Specify as long fixed prefix ( with out /*/ ) as possible in path_specs.include
. This will reduce the scanning time and cost, specifically on AWS S3
Running profiling against many tables or over many rows can run up significant costs. While we've done our best to limit the expensiveness of the queries the profiler runs, you should be prudent about the set of tables profiling is enabled on or the frequency of the profiling runs.
If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.
Compatibility
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the SPARK_HOME
and SPARK_VERSION
environment variables to be set. The Spark+Hadoop binary can be downloaded here.
For an example guide on setting up PyDeequ on AWS, see this guide.
From Spark 3.2.0+, Avro reader fails on column names that don't start with a letter and contains other character than letters, number, and underscore. [https://github.com/apache/spark/blob/72c62b6596d21e975c5597f8fff84b1a9d070a02/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala#L158] Avro files that contain such columns won't be profiled.
Code Coordinates
- Class Name:
datahub.ingestion.source.s3.source.S3Source
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for S3 / Local Files, feel free to ping us on our Slack.