Interactive dagster debugging

dagster assets experimentation

Having a background in data science, I have experienced that interactive environments like jupyter notebooks allow to effectively debug complicated data-based projects. Furthermore, they can also be helpful when learning a new API. For me, this currently means learning more about dagster.

Using the metadata

I quite like the idea of having a central data catalog. The assets view of dagster might be the first step towards this (at least for pipelines managed in/from dagster). However, unlike, i.e., the Hive metastore, it is not a database.

Being new to dagster, I wanted to figure out how to easily access the key/value metadata types from the pipeline to, i.e., obtain the file path of an asset as I Imagine not having to pass around details about the asset - rather only its asset key. Any up/downstream pipeline should then retrieve the necessary information about the asset (like the storage location) to perform a read when using the asset.

In the excellent dagster Slack forum, I quickly got help and the suggestion to use:

context.solid_def.input_defs[0].asset_key

to access the metadata of the asset materialization.

However, it was initially unclear how to interact with the running system for me. For sure, adding debugging print statements all over the place is one way to go about it. But it is much more efficient to use an interactive notebook environment, connect to a running instance and directly interact & experiment with the live API.  

Example

You can easily do so by firstly starting the dagit process in a shell. Then, in a running notebook, you can connect to it using:

from dagster import DagsterInstance
instance = DagsterInstance.get() # needs your DAGSTER_HOME to be set

NOTICE: the DAGSTER_HOME variable needs to be set up correctly for this step to work.

For example:

from dagster import PipelineRunStatus
from dagster.core.storage.pipeline_run import PipelineRunsFilter

print(len(instance.get_runs()))

queued_runs = instance.get_runs(filters=PipelineRunsFilter(statuses=[PipelineRunStatus.QUEUED]))
#for run in queued_runs:
#    instance.report_run_canceled(run)

would allow you to cancel any scheduled jobs programmatically. This might be useful when developing a custom sensor that (by accident) generated thousands and thousands of run requests.

The nice API allows interacting with database backing dagster easily. For example, the following statement can obtain all the runs recorded:

all_runs = instance.get_runs()

Most likely, however, this is not a very sensible query. You are probably interested in details about a specific run or perhaps even asset.

Firstly, it might be helpful to note that all available asset keys can qkuickly be listed:

instance.get_asset_keys()

For any specific asset key of your choice:

from dagster.core.storage.event_log import EventRecordsFilter
from dagster import AssetKey, DagsterEventType

a = instance.get_event_records(
            EventRecordsFilter(
                event_type=DagsterEventType.ASSET_MATERIALIZATION,
                #asset_key=AssetKey(["daily_temperature_highs"]),
                asset_key=AssetKey(["hottest_dates"]),
                #after_cursor=comments_cursor,
            ),
            ascending=False,
            limit=1,
        )
a = a[0]
a

you can obtain specific types of events - such as only ASSET_MATERIALIZATIONs. The particular: daily_temperature_highs asset stems from the included dagster example projects. All the data is recorded in a structured format:

EventLogRecord(storage_id=38, event_log_entry=EventLogEntry(error_info=None, message='cybersio_poc - ebe996df-3041-4301-9e40-f2270a5a5b8e - 20467 - hottest_dates - ASSET_MATERIALIZATION - Materialized value hottest_dates.', level=10, user_message='Materialized value hottest_dates.', run_id='ebe996df-3041-4301-9e40-f2270a5a5b8e', timestamp=1643832923.3627, step_key='hottest_dates', pipeline_name='cybersio_poc', dagster_event=DagsterEvent(event_type_value='ASSET_MATERIALIZATION', pipeline_name='cybersio_poc', step_handle=StepHandle(solid_handle=NodeHandle(name='hottest_dates', parent=None), key='hottest_dates'), solid_handle=NodeHandle(name='hottest_dates', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline_name': 'cybersio_poc', 'pipeline_tags': '{\'.dagster/grpc_info\': \'{"host": "localhost", "socket": "/var/folders/cx/tptc40fs3492df12b62dx_kr0000gn/T/tmpfsq9_xan"}\'}', 'resource_fn_name': 'None', 'resource_name': 'None', 'run_id': 'ebe996df-3041-4301-9e40-f2270a5a5b8e', 'solid_name': 'hottest_dates', 'step_key': 'hottest_dates'}, event_specific_data=StepMaterializationData(materialization=AssetMaterialization(asset_key=AssetKey(['hottest_dates']), description=None, metadata_entries=[], partition=None, tags={}), asset_lineage=[AssetLineageInfo(asset_key=AssetKey(['daily_temperature_highs']), partitions=set())]), message='Materialized value hottest_dates.', pid=20467, step_key='hottest_dates')))

even the Lineage can be accessed nicely programmatically:

a.event_log_entry.dagster_event.step_materialization_data.asset_lineage

# output:
# [AssetLineageInfo(asset_key=AssetKey(['daily_temperature_highs']), partitions=set())]

I personally was most interested in figuring out how to handle the metadata:

a.event_log_entry.dagster_event.step_materialization_data.materialization.metadata_entries

In my example, the output is:

[EventMetadataEntry(label='path', description=None, entry_data=PathMetadataEntryData(path='/path/to/file')),
 EventMetadataEntry(label='value_counts', description=None, entry_data=IntMetadataEntryData(value=10))]
Georg Heiler
Georg Heiler
Researcher & data scientist

My research interests include large geo-spatial time and network data analytics.