DynamoDB
Important Capabilities
Capability | Status | Notes |
---|---|---|
Classification | ✅ | Optionally enabled via classification.enabled . |
Detect Deleted Entities | ✅ | Enabled by default via stateful ingestion. |
Platform Instance | ✅ | By default, platform_instance will use the AWS account id. |
This plugin extracts the following:
AWS DynamoDB table names with their region, and infer schema of attribute names and types by scanning the table
Prerequisities
Notice of breaking change: Starting v0.13.3, aws_region
is now a required configuration for DynamoDB Connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.
In order to execute this source, you need to attach the AmazonDynamoDBReadOnlyAccess
policy to a user in your AWS account. Then create an API access key and secret for the user. This future proofs it in case we need to make further changes. But you can use these privileges to run this source for now
dynamodb:ListTables
dynamodb:DescribeTable
dynamodb:Scan
We need dynamodb:Scan
because Dynamodb does not return the schema in dynamodb:DescribeTable
and thus we sample few values to understand the schema.
Concept Mapping
Source Concept | DataHub Concept | Notes |
---|---|---|
"dynamodb" | Data Platform | |
DynamoDB Table | Dataset |
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: dynamodb
config:
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
aws_region: "${AWS_REGION}"
# If there are items that have most representative fields of the table, users could use the
# `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
# For each `region.table`, the list of primary keys can be at most 100.
# We include these items in addition to the first 100 items in the table when we scan it.
#
# include_table_item:
# region.table_name:
# [
# {
# "partition_key_name": { "attribute_type": "attribute_value" },
# "sort_key_name": { "attribute_type": "attribute_value" },
# },
# ]
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
aws_access_key_id One of string, null | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_advanced_config object | Advanced AWS configuration options. These are passed directly to botocore.config.Config. |
aws_endpoint_url One of string, null | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. Default: None |
aws_profile One of string, null | The named profile to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config. Default: None |
aws_proxy One of string, null | A set of proxy configs to use with AWS. See the botocore.config docs for details. Default: None |
aws_region One of string, null | AWS region code. Default: None |
aws_retry_mode Enum | One of: "legacy", "standard", "adaptive" Default: standard |
aws_retry_num integer | Number of times to retry failed AWS requests. See the botocore.retry docs for details. Default: 5 |
aws_secret_access_key One of string, null | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_session_token One of string, null | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
max_schema_size integer | Maximum number of fields to include in the schema. Default: 300 |
platform_instance One of string, null | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. Default: None |
read_timeout number | The timeout for reading from the connection (in seconds). Default: 60 |
env string | The environment that all assets produced by this connector belong to Default: PROD |
aws_role One of string, array, null | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as boto3's STS.Client.assume_role. Default: None |
aws_role.union One of string, AwsAssumeRoleConfig | |
aws_role.union.RoleArn ❓ string | ARN of the role to assume. |
aws_role.union.ExternalId One of string, null | External ID to use when assuming the role. Default: None |
database_pattern AllowDenyPattern | A class to store allow deny regexes |
database_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
domain map(str,AllowDenyPattern) | A class to store allow deny regexes |
domain. key .allowarray | List of regex patterns to include in ingestion Default: ['.*'] |
domain. key .allow.stringstring | |
domain. key .ignoreCaseOne of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
domain. key .denyarray | List of regex patterns to exclude from ingestion. Default: [] |
domain. key .deny.stringstring | |
include_table_item One of array, null | [Advanced] The primary keys of items of a table in dynamodb format the user would like to include in schema. Refer "Advanced Configurations" section for more details Default: None |
include_table_item. key .objectobject | |
table_pattern AllowDenyPattern | A class to store allow deny regexes |
table_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
classification ClassificationConfig | |
classification.enabled boolean | Whether classification should be used to auto-detect glossary terms Default: False |
classification.info_type_to_term map(str,string) | |
classification.max_workers integer | Number of worker processes to use for classification. Set to 1 to disable. Default: 4 |
classification.sample_size integer | Number of sample values used for classification. Default: 100 |
classification.classifiers array | Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance. Default: [{'type': 'datahub', 'config': None}] |
classification.classifiers.DynamicTypedClassifierConfig DynamicTypedClassifierConfig | |
classification.classifiers.DynamicTypedClassifierConfig.type ❓ string | The type of the classifier to use. For DataHub, use datahub |
classification.classifiers.DynamicTypedClassifierConfig.config One of object, null | The configuration required for initializing the classifier. If not specified, uses defaults for classifer type. Default: None |
classification.column_pattern AllowDenyPattern | A class to store allow deny regexes |
classification.column_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
classification.table_pattern AllowDenyPattern | A class to store allow deny regexes |
classification.table_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
stateful_ingestion One of StatefulStaleMetadataRemovalConfig, null | Default: None |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"AwsAssumeRoleConfig": {
"additionalProperties": true,
"properties": {
"RoleArn": {
"description": "ARN of the role to assume.",
"title": "Rolearn",
"type": "string"
},
"ExternalId": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "External ID to use when assuming the role.",
"title": "Externalid"
}
},
"required": [
"RoleArn"
],
"title": "AwsAssumeRoleConfig",
"type": "object"
},
"ClassificationConfig": {
"additionalProperties": false,
"properties": {
"enabled": {
"default": false,
"description": "Whether classification should be used to auto-detect glossary terms",
"title": "Enabled",
"type": "boolean"
},
"sample_size": {
"default": 100,
"description": "Number of sample values used for classification.",
"title": "Sample Size",
"type": "integer"
},
"max_workers": {
"default": 4,
"description": "Number of worker processes to use for classification. Set to 1 to disable.",
"title": "Max Workers",
"type": "integer"
},
"table_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "Regex patterns to filter tables for classification. This is used in combination with other patterns in parent config. Specify regex to match the entire table name in `database.schema.table` format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'"
},
"column_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "Regex patterns to filter columns for classification. This is used in combination with other patterns in parent config. Specify regex to match the column name in `database.schema.table.column` format."
},
"info_type_to_term": {
"additionalProperties": {
"type": "string"
},
"default": {},
"description": "Optional mapping to provide glossary term identifier for info type",
"title": "Info Type To Term",
"type": "object"
},
"classifiers": {
"default": [
{
"type": "datahub",
"config": null
}
],
"description": "Classifiers to use to auto-detect glossary terms. If more than one classifier, infotype predictions from the classifier defined later in sequence take precedance.",
"items": {
"$ref": "#/$defs/DynamicTypedClassifierConfig"
},
"title": "Classifiers",
"type": "array"
}
},
"title": "ClassificationConfig",
"type": "object"
},
"DynamicTypedClassifierConfig": {
"additionalProperties": false,
"properties": {
"type": {
"description": "The type of the classifier to use. For DataHub, use `datahub`",
"title": "Type",
"type": "string"
},
"config": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"description": "The configuration required for initializing the classifier. If not specified, uses defaults for classifer type.",
"title": "Config"
}
},
"required": [
"type"
],
"title": "DynamicTypedClassifierConfig",
"type": "object"
},
"StatefulStaleMetadataRemovalConfig": {
"additionalProperties": false,
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"title": "Enabled",
"type": "boolean"
},
"remove_stale_metadata": {
"default": true,
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"title": "Remove Stale Metadata",
"type": "boolean"
},
"fail_safe_threshold": {
"default": 75.0,
"description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
"maximum": 100.0,
"minimum": 0.0,
"title": "Fail Safe Threshold",
"type": "number"
}
},
"title": "StatefulStaleMetadataRemovalConfig",
"type": "object"
}
},
"additionalProperties": false,
"properties": {
"aws_access_key_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Access Key Id"
},
"aws_secret_access_key": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Secret Access Key"
},
"aws_session_token": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Session Token"
},
"aws_role": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/$defs/AwsAssumeRoleConfig"
}
]
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as [boto3's STS.Client.assume_role](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role).",
"title": "Aws Role"
},
"aws_profile": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.",
"title": "Aws Profile"
},
"aws_region": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS region code.",
"title": "Aws Region"
},
"aws_endpoint_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
"title": "Aws Endpoint Url"
},
"aws_proxy": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
"title": "Aws Proxy"
},
"aws_retry_num": {
"default": 5,
"description": "Number of times to retry failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
"title": "Aws Retry Num",
"type": "integer"
},
"aws_retry_mode": {
"default": "standard",
"description": "Retry mode to use for failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
"enum": [
"legacy",
"standard",
"adaptive"
],
"title": "Aws Retry Mode",
"type": "string"
},
"read_timeout": {
"default": 60,
"description": "The timeout for reading from the connection (in seconds).",
"title": "Read Timeout",
"type": "number"
},
"aws_advanced_config": {
"additionalProperties": true,
"description": "Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
"title": "Aws Advanced Config",
"type": "object"
},
"env": {
"default": "PROD",
"description": "The environment that all assets produced by this connector belong to",
"title": "Env",
"type": "string"
},
"database_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "regex patterns for databases to filter in ingestion."
},
"table_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "Regex patterns for tables to filter in ingestion. The table name format is 'region.table'"
},
"classification": {
"$ref": "#/$defs/ClassificationConfig",
"default": {
"enabled": false,
"sample_size": 100,
"max_workers": 4,
"table_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"column_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"info_type_to_term": {},
"classifiers": [
{
"config": null,
"type": "datahub"
}
]
},
"description": "For details, refer to [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md)."
},
"stateful_ingestion": {
"anyOf": [
{
"$ref": "#/$defs/StatefulStaleMetadataRemovalConfig"
},
{
"type": "null"
}
],
"default": null
},
"platform_instance": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
"title": "Platform Instance"
},
"domain": {
"additionalProperties": {
"$ref": "#/$defs/AllowDenyPattern"
},
"default": {},
"description": "regex patterns for tables to filter to assign domain_key. ",
"title": "Domain",
"type": "object"
},
"include_table_item": {
"anyOf": [
{
"additionalProperties": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "[Advanced] The primary keys of items of a table in dynamodb format the user would like to include in schema. Refer \"Advanced Configurations\" section for more details",
"title": "Include Table Item"
},
"max_schema_size": {
"default": 300,
"description": "Maximum number of fields to include in the schema.",
"exclusiveMinimum": 0,
"title": "Max Schema Size",
"type": "integer"
}
},
"title": "DynamoDBConfig",
"type": "object"
}
Advanced Configurations
Using include_table_item
config
If there are items that have most representative fields of the table, users could use the include_table_item
option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it.
Take AWS DynamoDB Developer Guide Example tables and data as an example, if a account has a table Reply
in the us-west-2
region with composite primary key Id
and ReplyDateTime
, users can use include_table_item
to include 2 items as following:
Example:
# The table name should be in the format of region.table_name
# The primary keys should be in the DynamoDB format
include_table_item:
us-west-2.Reply:
[
{
"ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" },
"Id": { "S": "Amazon DynamoDB#DynamoDB Thread 1" },
},
{
"ReplyDateTime": { "S": "2015-10-05T19:58:22.947Z" },
"Id": { "S": "Amazon DynamoDB#DynamoDB Thread 2" },
},
]
Code Coordinates
- Class Name:
datahub.ingestion.source.dynamodb.dynamodb.DynamoDBSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DynamoDB, feel free to ping us on our Slack.