SQL Queries
Important Capabilities
Capability | Status | Notes |
---|---|---|
Column-level Lineage | ✅ | Parsed from SQL queries. |
Table-Level Lineage | ✅ | Parsed from SQL queries. |
This source reads a newline-delimited JSON file containing SQL queries and parses them to generate lineage.
Query File Format
This file should contain one JSON object per line, with the following fields:
- query: string - The SQL query to parse.
- timestamp (optional): number - The timestamp of the query, in seconds since the epoch.
- user (optional): string - The user who ran the query. This user value will be directly converted into a DataHub user urn.
- operation_type (optional): string - Platform-specific operation type, used if the operation type can't be parsed.
- session_id (optional): string - Session identifier for temporary table resolution across queries.
- downstream_tables (optional): string[] - Fallback list of tables that the query writes to, used if the query can't be parsed.
- upstream_tables (optional): string[] - Fallback list of tables the query reads from, used if the query can't be parsed.
Incremental Lineage
When incremental_lineage
is enabled, this source will emit lineage as patches rather than full overwrites.
This allows you to add lineage edges without removing existing ones, which is useful for:
- Gradually building up lineage from multiple sources
- Preserving manually curated lineage
- Avoiding conflicts when multiple ingestion processes target the same datasets
Note: Incremental lineage only applies to UpstreamLineage aspects. Other aspects like queries and usage statistics will still be emitted normally.
Example Queries File
{"query": "SELECT x FROM my_table", "timestamp": 1689232738.051, "user": "user_a", "downstream_tables": [], "upstream_tables": ["my_database.my_schema.my_table"]}
{"query": "INSERT INTO my_table VALUES (1, 'a')", "timestamp": 1689232737.669, "user": "user_b", "downstream_tables": ["my_database.my_schema.my_table"], "upstream_tables": []}
Note that this file does not represent a single JSON object, but instead newline-delimited JSON, in which each line is a separate JSON object.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
datahub_api: # Only necessary if using a non-DataHub sink, e.g. the file sink
server: http://localhost:8080
timeout_sec: 60
source:
type: sql-queries
config:
platform: "snowflake"
default_db: "SNOWFLAKE"
query_file: "./queries.json"
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
platform ✅ string | The platform for which to generate data, e.g. snowflake |
query_file ✅ string | Path to file to ingest |
default_db One of string, null | The default database to use for unqualified table names Default: None |
default_schema One of string, null | The default schema to use for unqualified table names Default: None |
incremental_lineage boolean | When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. Default: False |
override_dialect One of string, null | The SQL dialect to use when parsing queries. Overrides automatic dialect detection. Default: None |
platform_instance One of string, null | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. Default: None |
env string | The environment that all assets produced by this connector belong to Default: PROD |
usage BaseUsageConfig | |
usage.bucket_duration Enum | One of: "DAY", "HOUR" |
usage.end_time string(date-time) | Latest date of lineage/usage to consider. Default: Current time in UTC |
usage.format_sql_queries boolean | Whether to format sql queries Default: False |
usage.include_operational_stats boolean | Whether to display operational stats. Default: True |
usage.include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False |
usage.include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True |
usage.start_time string(date-time) | Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'. Default: None |
usage.top_n_queries integer | Number of top queries to save to each table. Default: 10 |
usage.user_email_pattern AllowDenyPattern | A class to store allow deny regexes |
usage.user_email_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"BaseUsageConfig": {
"additionalProperties": false,
"properties": {
"bucket_duration": {
"$ref": "#/$defs/BucketDuration",
"default": "DAY",
"description": "Size of the time window to aggregate usage stats."
},
"end_time": {
"description": "Latest date of lineage/usage to consider. Default: Current time in UTC",
"format": "date-time",
"title": "End Time",
"type": "string"
},
"start_time": {
"default": null,
"description": "Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
"format": "date-time",
"title": "Start Time",
"type": "string"
},
"top_n_queries": {
"default": 10,
"description": "Number of top queries to save to each table.",
"exclusiveMinimum": 0,
"title": "Top N Queries",
"type": "integer"
},
"user_email_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "regex patterns for user emails to filter in usage."
},
"include_operational_stats": {
"default": true,
"description": "Whether to display operational stats.",
"title": "Include Operational Stats",
"type": "boolean"
},
"include_read_operational_stats": {
"default": false,
"description": "Whether to report read operational stats. Experimental.",
"title": "Include Read Operational Stats",
"type": "boolean"
},
"format_sql_queries": {
"default": false,
"description": "Whether to format sql queries",
"title": "Format Sql Queries",
"type": "boolean"
},
"include_top_n_queries": {
"default": true,
"description": "Whether to ingest the top_n_queries.",
"title": "Include Top N Queries",
"type": "boolean"
}
},
"title": "BaseUsageConfig",
"type": "object"
},
"BucketDuration": {
"enum": [
"DAY",
"HOUR"
],
"title": "BucketDuration",
"type": "string"
}
},
"additionalProperties": false,
"properties": {
"incremental_lineage": {
"default": false,
"description": "When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
"title": "Incremental Lineage",
"type": "boolean"
},
"env": {
"default": "PROD",
"description": "The environment that all assets produced by this connector belong to",
"title": "Env",
"type": "string"
},
"platform_instance": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
"title": "Platform Instance"
},
"query_file": {
"description": "Path to file to ingest",
"title": "Query File",
"type": "string"
},
"platform": {
"description": "The platform for which to generate data, e.g. snowflake",
"title": "Platform",
"type": "string"
},
"usage": {
"$ref": "#/$defs/BaseUsageConfig",
"default": {
"bucket_duration": "DAY",
"end_time": "2025-10-08T10:35:06.695291Z",
"start_time": "2025-10-07T00:00:00Z",
"queries_character_limit": 24000,
"top_n_queries": 10,
"user_email_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"include_operational_stats": true,
"include_read_operational_stats": false,
"format_sql_queries": false,
"include_top_n_queries": true
},
"description": "The usage config to use when generating usage statistics"
},
"default_db": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The default database to use for unqualified table names",
"title": "Default Db"
},
"default_schema": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The default schema to use for unqualified table names",
"title": "Default Schema"
},
"override_dialect": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The SQL dialect to use when parsing queries. Overrides automatic dialect detection.",
"title": "Override Dialect"
}
},
"required": [
"query_file",
"platform"
],
"title": "SqlQueriesSourceConfig",
"type": "object"
}
Code Coordinates
- Class Name:
datahub.ingestion.source.sql_queries.SqlQueriesSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for SQL Queries, feel free to ping us on our Slack.