-
-
Save yuhan/38630b6a4f88b7ec9fd725418e1e5eed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Sequence | |
from dagster import ( | |
DagsterEventType, | |
Definitions, | |
EventLogRecord, | |
RunFailureSensorContext, | |
SkipReason, | |
asset, | |
run_failure_sensor, | |
) | |
@asset | |
def asset_to_fail(): | |
raise Exception("This asset is designed to fail") | |
# This monitor all runs in the current code location | |
@run_failure_sensor(minimum_interval_seconds=5) | |
def on_asset_failure(context: RunFailureSensorContext): | |
failed_run_id = context.dagster_run.run_id | |
records: Sequence[EventLogRecord] = context.instance.get_records_for_run( | |
failed_run_id, | |
of_type={ | |
DagsterEventType.ASSET_MATERIALIZATION, | |
DagsterEventType.ASSET_MATERIALIZATION_PLANNED, | |
}, | |
).records | |
# Can change the values to include more info like failure message | |
planned_asset_keys = { | |
record.event_log_entry.dagster_event.asset_key | |
for record in records | |
if record.event_log_entry.dagster_event.event_type_value | |
== DagsterEventType.ASSET_MATERIALIZATION_PLANNED | |
} | |
successful_asset_keys = { | |
record.event_log_entry.dagster_event.asset_key | |
for record in records | |
if record.event_log_entry.dagster_event.event_type_value | |
== DagsterEventType.ASSET_MATERIALIZATION | |
} | |
# Skip if all assets were successfully materialized. the run failure sensor is triggered because of non-asset related | |
if len(planned_asset_keys) == len(successful_asset_keys): | |
return SkipReason("All assets were successfully materialized") | |
# find asset materialization failure | |
failed_asset_keys = planned_asset_keys.difference(successful_asset_keys) | |
print("failed_asset_keys", failed_asset_keys) | |
# Do something with the failed asset keys | |
defs = Definitions( | |
assets=[asset_to_fail], | |
sensors=[on_asset_failure], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment