Unlocking Advanced Metadata Extraction with the New DBT API in Dagster

Applying the recent DBT API improvements of Dagster in action

overview

As data engineering professionals, staying at the forefront of innovative tools and techniques is crucial for optimizing our workflows. Recently, Dagster, the powerful data orchestration framework, introduced an enhanced DBT integration, bringing a wealth of opportunities for seamless collaboration and advanced metadata extraction. In this blog post, we will explore the capabilities of the new DBT API in Dagster and demonstrate its usage with a concrete example of metadata extraction and integration into the data catalog Open Metadata.

the new API

With the recent upgrades dagster makes its DBT API more composable for the end user. However, a couple of changes are relevant to note

DBT target

Dagster is unifying the resource handlign of DBT + dagster resources (removing the old dagster resource for DBT here) to configure all DBT related parts with the DBT native means.

For example you could retrieve the desired target from an ENV variable and then feed it to the DBT CLI to support i.e. branch deployments in dagster.

target = os.environ.get("DBT_TARGET", "dev")
context.log.debug(f"Running dbt with target {target}")

# in the definitions
from dagster_dbt import DbtCliResource
dbt = DbtCliResource(
    project_dir="/path/to/dbt/project",
    global_config_flags=["--no-use-colors"],
    profile="jaffle_shop",
    target="dev",
)

dbt_cli_task = dbt.cli(
    ["run", "--profiles-dir", DBT_PROFILES_DIR],
    manifest=manifest,
    context=context,
)

retrieving metadata from DBT

Since DBT 1.4 dagsters DBT integration is no longer blocking. However, if you need to retrieve advanced metadata from DBTs output files i.e. record count of the affected rows then a blocking execution of DBT is required:

dbt_cli_task.wait()

Then the assets can be retrieved:

run_results = dbt_cli_task.get_artifact("run_results.json")
executed_manifest = dbt_cli_task.get_artifact("manifest.json")

Finally, the relevant meta information can be extracted from them:

# Then, we can use the run results to add metadata to the outputs.
for event in dbt_cli_task.stream_raw_events():
    for dagster_event in event.to_default_asset_events(manifest=manifest):
        if isinstance(dagster_event, Output):
            event_node_info = event.event["data"]["node_info"]

            output_name = dagster_event.output_name
            result = results_by_output_name[output_name]
            rows_affected: Optional[int] = result["adapter_response"].get(
                "rows_affected"
            )
            rows_affected_metadata = (
                {"rows_affected": rows_affected} if rows_affected else {}
            )

            node = manifest_by_output_name[output_name]
            compiled_sql: Optional[str] = node.get("compiled_code")
            compiled_sql_metadata = (
                {"compiled_sql": MetadataValue.md(compiled_sql)}
                if compiled_sql
                else {}
            )

            context.add_output_metadata(
                metadata={
                    **rows_affected_metadata,
                    **compiled_sql_metadata,
                },
                output_name=output_name,
            )

        yield dagster_event

Notice, Dagster and DBT use different primary keys (asset key vs. unique_id), a lookup table must be created before you can access the metadata.

results_by_output_name = {
    output_name_fn({"unique_id": result["unique_id"]}): result
    for result in run_results["results"]
}
manifest_by_output_name = {
    output_name_fn({"unique_id": unique_id}): node
    for unique_id, node in executed_manifest["nodes"].items()
}

The extracted metrics can then directly be visualized in Dagster:

Asset plots

When the asset is selected in dagit, the UI is directly showing all the metadata and plots at a glance:

Global Asset Lineage Overview

openmetadata DBT integration with dagster

For more advanced data governance/data management usecases a dedicated data catalog is required. One such tool is Open Metadata. Open Metadata is a data catalog. It has a great DBT integration. There is no first class support for a dagster ingestion process yet: https://github.com/open-metadata/OpenMetadata/issues/7762 However, the DBT CLI of Open Metadata can easily be accessed from dagster.

You need to configure a file similar to:

source:
  type: dbt
  serviceName: "my_db${DEPLOYMENT_NAME}"
  sourceConfig:
    config:
      type: DBT
      dbtConfigSource:
        dbtCatalogFilePath: /path/to/dbt/target/catalog.json
        dbtManifestFilePath: /path/to/dbt/target/manifest.json
        dbtRunResultsFilePath: /path/to/dbt/target/run_results.json
sink:
  type: metadata-rest
  config: {}
workflowConfig:
  openMetadataServerConfig:
    hostPort: $OPEN_METADATA_URL
    authProvider: openmetadata
    securityConfig:
      jwtToken: $OPEN_METADATA_INGESTION_TOKEN

It can then be ingested using either the bare CLI like:

metadata ingest -c /path/to/recipe_dbt.yml

or most likely better using the native python workflow:

# NOTICE: partial pseudocode only
from metadata.ingestion.api.workflow import Workflow
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()

Before you can shedule the DBT ingest to Open Metadata you have to retrieve the DBT artifacts. As discussed before, the artifacts i.e. catalog, tests, … have to be created. Then they can be passed along.

However, dagster will create a dynamic path to support running multiple instances of the pipeline in parallel. The OM process needs to be fed with a suitable path to a JSON file in order to execute successfully:

# NOTICE: partial pseudocode only
dbt_cli_task.wait()
executed_manifest = dbt_cli_task.get_artifact("manifest.json")
executed_catalog = dbt_cli_task.get_artifact("catalog.json")

write_json_to_file(executed_manifest, "om_dbt_this_dynamic_path/target/manifest.json")
write_json_to_file(executed_catalog, "om_dbt_this_dynamic_path/target/catalog.json")

dbt_cli_task = dbt.cli(
    ["test", "--profiles-dir", DBT_PROFILES_DIR],
    manifest=manifest,
)
# Run the task, but don't yield events (and block execution).
_ = list(dbt_cli_task.stream_raw_events())

run_results = dbt_cli_task.get_artifact("run_results.json")
write_json_to_file(run_results, "MWF_dbt/target/run_results.json")

summary

The new DBT API integration in Dagster offers data engineering professionals an exciting opportunity to unlock advanced metadata extraction and integration capabilities. By harnessing the power of the DBT API, we can enhance data governance, improve collaboration, and streamline data cataloging processes. Start exploring the new DBT API in Dagster today and elevate your data engineering workflows to new heights.

The full E2E code example for the new API of DBT can be found on github: https://gist.github.com/geoHeil/a7ee774e65b597e9ac534e678cf5ecfc.

The image was generated using firefly by adobe. Parts of the text were neatly generated by chatgpt.

See also https://github.com/dagster-io/dagster/discussions/14477 for more details

Georg Heiler
Georg Heiler
Researcher & data scientist

My research interests include large geo-spatial time and network data analytics.