-
-
Save strickvl/2178d93c8693f068768a82587fd4db75 to your computer and use it in GitHub Desktop.
Script to fix ZenML database for 0.45.2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
| |
from sqlalchemy import text | |
from sqlmodel import Session | |
| |
from zenml.client import Client | |
| |
zs = Client().zen_store | |
update_query_pd = text( | |
"UPDATE pipeline_deployment SET step_configurations = :data WHERE id = :id_" | |
) | |
update_query_sr = text( | |
"UPDATE step_run SET step_configuration = :data WHERE id = :id_" | |
) | |
with Session(zs.engine) as session: | |
rows_pd = session.execute( | |
text( | |
"SELECT id, step_configurations FROM pipeline_deployment WHERE step_configurations IS NOT NULL" | |
) | |
) | |
for id_, data in rows_pd: | |
try: | |
data_dict = json.loads(data) | |
except json.JSONDecodeError: | |
continue | |
has_changes = False | |
for k in data_dict: | |
if "config" in data_dict[k] and "external_input_artifacts" in data_dict[k]["config"]: | |
for eip_name in data_dict[k]["config"]["external_input_artifacts"]: | |
current = data_dict[k]["config"]["external_input_artifacts"][ | |
eip_name | |
] | |
if not isinstance(current, dict): | |
data_dict[k]["config"]["external_input_artifacts"][ | |
eip_name | |
] = {"id": current} | |
has_changes = True | |
else: | |
data_dict[k]["config"]["external_input_artifacts"] = {} | |
has_changes = True | |
if has_changes: | |
data = json.dumps(data_dict) | |
session.execute(update_query_pd, params=(dict(data=data, id_=id_))) | |
| |
# update step_run | |
rows_sr = session.execute( | |
text( | |
"SELECT id, step_configuration FROM step_run WHERE step_configuration IS NOT NULL" | |
) | |
) | |
for id_, data in rows_sr: | |
try: | |
data_dict = json.loads(data) | |
except json.JSONDecodeError: | |
continue | |
has_changes = False | |
if "config" in data_dict and "external_input_artifacts" in data_dict["config"]: | |
for eip_name in data_dict["config"]["external_input_artifacts"]: | |
current = data_dict["config"]["external_input_artifacts"][eip_name] | |
if not isinstance(current, dict): | |
data_dict["config"]["external_input_artifacts"][eip_name] = { | |
"id": current | |
} | |
has_changes = True | |
else: | |
data_dict["config"]["external_input_artifacts"] = {} | |
has_changes = True | |
if has_changes: | |
data = json.dumps(data_dict) | |
session.execute(update_query_sr, params=(dict(data=data, id_=id_))) | |
session.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment