Skip to content

Instantly share code, notes, and snippets.

@strickvl
Last active October 20, 2023 07:07
Show Gist options
  • Save strickvl/2178d93c8693f068768a82587fd4db75 to your computer and use it in GitHub Desktop.
Save strickvl/2178d93c8693f068768a82587fd4db75 to your computer and use it in GitHub Desktop.
Script to fix ZenML database for 0.45.2
import json
from sqlalchemy import text
from sqlmodel import Session
from zenml.client import Client
zs = Client().zen_store
update_query_pd = text(
"UPDATE pipeline_deployment SET step_configurations = :data WHERE id = :id_"
)
update_query_sr = text(
"UPDATE step_run SET step_configuration = :data WHERE id = :id_"
)
with Session(zs.engine) as session:
rows_pd = session.execute(
text(
"SELECT id, step_configurations FROM pipeline_deployment WHERE step_configurations IS NOT NULL"
)
)
for id_, data in rows_pd:
try:
data_dict = json.loads(data)
except json.JSONDecodeError:
continue
has_changes = False
for k in data_dict:
if "config" in data_dict[k] and "external_input_artifacts" in data_dict[k]["config"]:
for eip_name in data_dict[k]["config"]["external_input_artifacts"]:
current = data_dict[k]["config"]["external_input_artifacts"][
eip_name
]
if not isinstance(current, dict):
data_dict[k]["config"]["external_input_artifacts"][
eip_name
] = {"id": current}
has_changes = True
else:
data_dict[k]["config"]["external_input_artifacts"] = {}
has_changes = True
if has_changes:
data = json.dumps(data_dict)
session.execute(update_query_pd, params=(dict(data=data, id_=id_)))
# update step_run
rows_sr = session.execute(
text(
"SELECT id, step_configuration FROM step_run WHERE step_configuration IS NOT NULL"
)
)
for id_, data in rows_sr:
try:
data_dict = json.loads(data)
except json.JSONDecodeError:
continue
has_changes = False
if "config" in data_dict and "external_input_artifacts" in data_dict["config"]:
for eip_name in data_dict["config"]["external_input_artifacts"]:
current = data_dict["config"]["external_input_artifacts"][eip_name]
if not isinstance(current, dict):
data_dict["config"]["external_input_artifacts"][eip_name] = {
"id": current
}
has_changes = True
else:
data_dict["config"]["external_input_artifacts"] = {}
has_changes = True
if has_changes:
data = json.dumps(data_dict)
session.execute(update_query_sr, params=(dict(data=data, id_=id_)))
session.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment