Cassandra
Important Capabilities
Capability | Status | Notes |
---|---|---|
Asset Containers | ✅ | Enabled by default. |
Detect Deleted Entities | ✅ | Enabled by default via stateful ingestion. |
Platform Instance | ✅ | Enabled by default. |
Schema Metadata | ✅ | Enabled by default. |
This plugin extracts the following:
- Metadata for tables
- Column types associated with each table column
- The keyspace each table belongs to
Setup
This integration pulls metadata directly from Cassandra databases, including both DataStax Astra DB and Cassandra Enterprise Edition (EE).
You’ll need to have a Cassandra instance or an Astra DB setup with appropriate access permissions.
Steps to Get the Required Information
Set Up User Credentials:
- For Astra DB:
- Log in to your Astra DB Console.
- Navigate to Organization Settings > Token Management.
- Generate an Application Token with the required permissions for read access.
- Download the Secure Connect Bundle from the Astra DB Console.
- For Cassandra EE:
- Ensure you have a username and password with read access to the necessary keyspaces.
- For Astra DB:
Permissions:
- The user or token must have
SELECT
permissions that allow it to:- Access metadata in system keyspaces (e.g.,
system_schema
) to retrieve information about keyspaces, tables, columns, and views. - Perform
SELECT
operations on the data tables if data profiling is enabled.
- Access metadata in system keyspaces (e.g.,
- The user or token must have
Verify Database Access:
- For Astra DB: Ensure the Secure Connect Bundle is used and configured correctly.
- For Cassandra Opensource: Ensure the contact point and port are accessible.
When enabling profiling, make sure to set a limit on the number of rows to sample. Profiling large tables without a limit may lead to excessive resource consumption and slow performance.
For cloud configuration with Astra DB, it is necessary to specify the Secure Connect Bundle path in the configuration. For that reason, use the CLI to ingest metadata into DataHub.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "cassandra"
config:
# Credentials for on prem cassandra
contact_point: "localhost"
port: 9042
username: "admin"
password: "password"
# SSL Configuration (optional)
#ssl_ca_certs: "/path/to/ca-certificate.pem"
#ssl_certfile: "/path/to/client-certificate.pem"
#ssl_keyfile: "/path/to/client-private-key.pem"
#ssl_version: "TLS_CLIENT" # Options: TLS_CLIENT, TLSv1, TLSv1_1, TLSv1_2, TLSv1_3
# Or
# Credentials Astra Cloud
#cloud_config:
# secure_connect_bundle: "Path to Secure Connect Bundle (.zip)"
# token: "Application Token"
# Optional Allow / Deny extraction of particular keyspaces.
keyspace_pattern:
allow: [".*"]
# Optional Allow / Deny extraction of particular tables.
table_pattern:
allow: [".*"]
# Optional
profiling:
enabled: true
profile_table_level_only: true
sink:
# config sinks
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
contact_point string | Domain or IP address of the Cassandra instance (excluding port). Default: localhost |
password One of string, null | Password credential associated with the specified username. Default: None |
platform_instance One of string, null | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. Default: None |
port integer | Port number to connect to the Cassandra instance. Default: 9042 |
ssl_ca_certs One of string, null | Path to the CA certificate file for SSL connections. Default: None |
ssl_certfile One of string, null | Path to the SSL certificate file for SSL connections. Default: None |
ssl_keyfile One of string, null | Path to the SSL key file for SSL connections. Default: None |
ssl_version One of string, null | SSL protocol version to use for connections. Options: TLS_CLIENT, TLSv1, TLSv1_1, TLSv1_2, TLSv1_3. Defaults to TLS_CLIENT. Default: TLS_CLIENT |
username One of string, null | Username credential with read access to the system_schema keyspace. Default: None |
env string | The environment that all assets produced by this connector belong to Default: PROD |
cloud_config One of CassandraCloudConfig, null | Configuration for cloud-based Cassandra, such as DataStax Astra DB. Default: None |
cloud_config.secure_connect_bundle ❓ string | File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB. |
cloud_config.token ❓ string | The Astra DB application token used for authentication. |
cloud_config.connect_timeout integer | Timeout in seconds for establishing new connections to Cassandra. Default: 600 |
cloud_config.request_timeout integer | Timeout in seconds for individual Cassandra requests. Default: 600 |
keyspace_pattern AllowDenyPattern | A class to store allow deny regexes |
keyspace_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
profile_pattern AllowDenyPattern | A class to store allow deny regexes |
profile_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern AllowDenyPattern | A class to store allow deny regexes |
table_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
profiling GEProfilingBaseConfig | |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.include_field_distinct_count boolean | Whether to profile for the number of distinct values for each column. Default: True |
profiling.include_field_distinct_value_frequencies boolean | Whether to profile for distinct value frequencies. Default: False |
profiling.include_field_histogram boolean | Whether to profile for the histogram for numeric fields. Default: False |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_mean_value boolean | Whether to profile for the mean value of numeric columns. Default: True |
profiling.include_field_median_value boolean | Whether to profile for the median value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.include_field_quantiles boolean | Whether to profile for the quantiles of numeric columns. Default: False |
profiling.include_field_sample_values boolean | Whether to profile for the sample values for all columns. Default: True |
profiling.include_field_stddev_value boolean | Whether to profile for the standard deviation of numeric columns. Default: True |
profiling.limit One of integer, null | Max number of documents to profile. By default, profiles all documents. Default: None |
profiling.max_workers integer | Number of worker threads to use for profiling. Set to 1 to disable. Default: 20 |
profiling.offset One of integer, null | Offset in documents to profile. By default, uses no offset. Default: None |
profiling.profile_table_level_only boolean | Whether to perform profiling at table-level only, or include column-level profiling as well. Default: False |
profiling.operation_config OperationConfig | |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month One of integer, null | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. Default: None |
profiling.operation_config.profile_day_of_week One of integer, null | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. Default: None |
stateful_ingestion One of StatefulStaleMetadataRemovalConfig, null | Configuration for stateful ingestion and stale metadata removal. Default: None |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"CassandraCloudConfig": {
"additionalProperties": false,
"description": "Configuration for connecting to DataStax Astra DB in the cloud.",
"properties": {
"token": {
"description": "The Astra DB application token used for authentication.",
"title": "Token",
"type": "string"
},
"secure_connect_bundle": {
"description": "File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.",
"title": "Secure Connect Bundle",
"type": "string"
},
"connect_timeout": {
"default": 600,
"description": "Timeout in seconds for establishing new connections to Cassandra.",
"title": "Connect Timeout",
"type": "integer"
},
"request_timeout": {
"default": 600,
"description": "Timeout in seconds for individual Cassandra requests.",
"title": "Request Timeout",
"type": "integer"
}
},
"required": [
"token",
"secure_connect_bundle"
],
"title": "CassandraCloudConfig",
"type": "object"
},
"GEProfilingBaseConfig": {
"additionalProperties": false,
"properties": {
"enabled": {
"default": false,
"description": "Whether profiling should be done.",
"title": "Enabled",
"type": "boolean"
},
"operation_config": {
"$ref": "#/$defs/OperationConfig",
"description": "Experimental feature. To specify operation configs."
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Max number of documents to profile. By default, profiles all documents.",
"title": "Limit"
},
"offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Offset in documents to profile. By default, uses no offset.",
"title": "Offset"
},
"profile_table_level_only": {
"default": false,
"description": "Whether to perform profiling at table-level only, or include column-level profiling as well.",
"title": "Profile Table Level Only",
"type": "boolean"
},
"include_field_null_count": {
"default": true,
"description": "Whether to profile for the number of nulls for each column.",
"title": "Include Field Null Count",
"type": "boolean"
},
"include_field_distinct_count": {
"default": true,
"description": "Whether to profile for the number of distinct values for each column.",
"title": "Include Field Distinct Count",
"type": "boolean"
},
"include_field_min_value": {
"default": true,
"description": "Whether to profile for the min value of numeric columns.",
"title": "Include Field Min Value",
"type": "boolean"
},
"include_field_max_value": {
"default": true,
"description": "Whether to profile for the max value of numeric columns.",
"title": "Include Field Max Value",
"type": "boolean"
},
"include_field_mean_value": {
"default": true,
"description": "Whether to profile for the mean value of numeric columns.",
"title": "Include Field Mean Value",
"type": "boolean"
},
"include_field_median_value": {
"default": true,
"description": "Whether to profile for the median value of numeric columns.",
"title": "Include Field Median Value",
"type": "boolean"
},
"include_field_stddev_value": {
"default": true,
"description": "Whether to profile for the standard deviation of numeric columns.",
"title": "Include Field Stddev Value",
"type": "boolean"
},
"include_field_quantiles": {
"default": false,
"description": "Whether to profile for the quantiles of numeric columns.",
"title": "Include Field Quantiles",
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"default": false,
"description": "Whether to profile for distinct value frequencies.",
"title": "Include Field Distinct Value Frequencies",
"type": "boolean"
},
"include_field_histogram": {
"default": false,
"description": "Whether to profile for the histogram for numeric fields.",
"title": "Include Field Histogram",
"type": "boolean"
},
"include_field_sample_values": {
"default": true,
"description": "Whether to profile for the sample values for all columns.",
"title": "Include Field Sample Values",
"type": "boolean"
},
"max_workers": {
"default": 20,
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"title": "Max Workers",
"type": "integer"
}
},
"title": "GEProfilingBaseConfig",
"type": "object"
},
"OperationConfig": {
"additionalProperties": false,
"properties": {
"lower_freq_profile_enabled": {
"default": false,
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"title": "Lower Freq Profile Enabled",
"type": "boolean"
},
"profile_day_of_week": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"title": "Profile Day Of Week"
},
"profile_date_of_month": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"title": "Profile Date Of Month"
}
},
"title": "OperationConfig",
"type": "object"
},
"StatefulStaleMetadataRemovalConfig": {
"additionalProperties": false,
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"title": "Enabled",
"type": "boolean"
},
"remove_stale_metadata": {
"default": true,
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"title": "Remove Stale Metadata",
"type": "boolean"
},
"fail_safe_threshold": {
"default": 75.0,
"description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
"maximum": 100.0,
"minimum": 0.0,
"title": "Fail Safe Threshold",
"type": "number"
}
},
"title": "StatefulStaleMetadataRemovalConfig",
"type": "object"
}
},
"additionalProperties": false,
"description": "Configuration for connecting to a Cassandra or DataStax Astra DB source.",
"properties": {
"env": {
"default": "PROD",
"description": "The environment that all assets produced by this connector belong to",
"title": "Env",
"type": "string"
},
"stateful_ingestion": {
"anyOf": [
{
"$ref": "#/$defs/StatefulStaleMetadataRemovalConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "Configuration for stateful ingestion and stale metadata removal."
},
"platform_instance": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
"title": "Platform Instance"
},
"contact_point": {
"default": "localhost",
"description": "Domain or IP address of the Cassandra instance (excluding port).",
"title": "Contact Point",
"type": "string"
},
"port": {
"default": 9042,
"description": "Port number to connect to the Cassandra instance.",
"title": "Port",
"type": "integer"
},
"username": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Username credential with read access to the system_schema keyspace.",
"title": "Username"
},
"password": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Password credential associated with the specified username.",
"title": "Password"
},
"cloud_config": {
"anyOf": [
{
"$ref": "#/$defs/CassandraCloudConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "Configuration for cloud-based Cassandra, such as DataStax Astra DB."
},
"ssl_ca_certs": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to the CA certificate file for SSL connections.",
"title": "Ssl Ca Certs"
},
"ssl_certfile": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to the SSL certificate file for SSL connections.",
"title": "Ssl Certfile"
},
"ssl_keyfile": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Path to the SSL key file for SSL connections.",
"title": "Ssl Keyfile"
},
"ssl_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": "TLS_CLIENT",
"description": "SSL protocol version to use for connections. Options: TLS_CLIENT, TLSv1, TLSv1_1, TLSv1_2, TLSv1_3. Defaults to TLS_CLIENT.",
"title": "Ssl Version"
},
"keyspace_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "Regex patterns to filter keyspaces for ingestion."
},
"table_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "Regex patterns to filter keyspaces.tables for ingestion."
},
"profile_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "Regex patterns for tables to profile"
},
"profiling": {
"$ref": "#/$defs/GEProfilingBaseConfig",
"default": {
"enabled": false,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_date_of_month": null,
"profile_day_of_week": null
},
"limit": null,
"offset": null,
"profile_table_level_only": false,
"include_field_null_count": true,
"include_field_distinct_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": false,
"include_field_distinct_value_frequencies": false,
"include_field_histogram": false,
"include_field_sample_values": true,
"max_workers": 20
},
"description": "Configuration for profiling"
}
},
"title": "CassandraSourceConfig",
"type": "object"
}
Code Coordinates
- Class Name:
datahub.ingestion.source.cassandra.cassandra.CassandraSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Cassandra, feel free to ping us on our Slack.