Skip to content

Instantly share code, notes, and snippets.

@yuhan
Last active February 2, 2024 00:36
Show Gist options
  • Save yuhan/38630b6a4f88b7ec9fd725418e1e5eed to your computer and use it in GitHub Desktop.
Save yuhan/38630b6a4f88b7ec9fd725418e1e5eed to your computer and use it in GitHub Desktop.
from typing import Sequence
from dagster import (
DagsterEventType,
Definitions,
EventLogRecord,
RunFailureSensorContext,
SkipReason,
asset,
run_failure_sensor,
)
@asset
def asset_to_fail():
raise Exception("This asset is designed to fail")
# This monitor all runs in the current code location
@run_failure_sensor(minimum_interval_seconds=5)
def on_asset_failure(context: RunFailureSensorContext):
failed_run_id = context.dagster_run.run_id
records: Sequence[EventLogRecord] = context.instance.get_records_for_run(
failed_run_id,
of_type={
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.ASSET_MATERIALIZATION_PLANNED,
},
).records
# Can change the values to include more info like failure message
planned_asset_keys = {
record.event_log_entry.dagster_event.asset_key
for record in records
if record.event_log_entry.dagster_event.event_type_value
== DagsterEventType.ASSET_MATERIALIZATION_PLANNED
}
successful_asset_keys = {
record.event_log_entry.dagster_event.asset_key
for record in records
if record.event_log_entry.dagster_event.event_type_value
== DagsterEventType.ASSET_MATERIALIZATION
}
# Skip if all assets were successfully materialized. the run failure sensor is triggered because of non-asset related
if len(planned_asset_keys) == len(successful_asset_keys):
return SkipReason("All assets were successfully materialized")
# find asset materialization failure
failed_asset_keys = planned_asset_keys.difference(successful_asset_keys)
print("failed_asset_keys", failed_asset_keys)
# Do something with the failed asset keys
defs = Definitions(
assets=[asset_to_fail],
sensors=[on_asset_failure],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment