Skip to content

Instantly share code, notes, and snippets.

@TomAugspurger
Created May 17, 2021 17:54
Show Gist options
  • Save TomAugspurger/c1773efa06d71443f135eb1af8832b82 to your computer and use it in GitHub Desktop.
Save TomAugspurger/c1773efa06d71443f135eb1af8832b82 to your computer and use it in GitHub Desktop.
import dask
import os
import logging
from collections.abc import MutableMapping
from numpy.core.fromnumeric import var
import azure.storage.blob
from azure.core.exceptions import ResourceNotFoundError
from pangeo_forge_recipes import patterns
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from pangeo_forge_recipes.storage import FSSpecTarget
from pangeo_forge_recipes.executors import DaskPipelineExecutor
from dask_gateway import GatewayCluster
from distributed.diagnostics.plugin import PipInstall
def init_logging(dask_worker=None):
logger = logging.getLogger("pangeo_forge_recipes")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(funcName)10s() ] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
if dask_worker:
dask_worker._setup_logging(logger)
class AzureBlobStorageStore(MutableMapping):
def __init__(self, container_client, root=""):
if len(root):
assert root[0] != "/"
assert root[-1] == "/"
self.container_client = container_client
self.root = root
def __getitem__(self, key):
key = os.path.join(self.root, key)
with self.container_client.get_blob_client(key) as bc:
try:
stream = bc.download_blob()
except ResourceNotFoundError as e:
raise KeyError(key) from e
data = stream.readall()
return data
def __setitem__(self, key, value):
key = os.path.join(self.root, key)
# bug in zarr? xarray?
if hasattr(value, "size") and value.size == 1 and hasattr(value, "tobytes"):
value = value.tobytes()
with self.container_client.get_blob_client(key) as bc:
bc.upload_blob(value, overwrite=True)
def __delitem__(self, key):
key = os.path.join(self.root, key)
with self.container_client.get_blob_client(key) as bc:
bc.delete_blob()
def __iter__(self):
prefix_len = len(self.root)
return (
x["name"][prefix_len:] for x in self.container_client.list_blobs(self.root)
)
return self.lisdir()
def __len__(self):
return len(list(self.container_client.list_blobs(self.root)))
class AzureBlobStorageFS:
def __init__(self, container_client):
self.container_client = container_client
def get_mapper(self, root):
return AzureBlobStorageStore(self.container_client, root)
def isdir(self, path):
return True
def make_format_function(region, frequency):
if frequency == "daily":
template = "https://daymeteuwest.blob.core.windows.net/daymet-nc/daymet_v4_{frequency}/daymet_v4_daily_{region}_{{variable}}_{{time}}.nc".format(region=region, frequency=frequency)
else:
template = "https://daymeteuwest.blob.core.windows.net/daymet-nc/daymet_v4_{frequency}/daymet_v4_{{variable}}_{{kind}}_{region}_{{time}}.nc".format(region=region, frequency=frequency)
def format_function(variable, time):
if variable == "prcp":
kind = "annttl"
else:
kind = "annavg"
return template.format(variable=variable, kind=kind, time=time)
return format_function
def make_recipe(region, frequency):
assert region in {"hi", "pr", "na"}
assert frequency in {"daily", "monthly", "annual"}
if frequency == "daily":
variable_keys = ["prcp", "swe", "tmax", "tmin", "vp", "srad"]
n_items_per_file = 365
else:
if frequency == "monthly":
n_items_per_file = 12
else:
n_items_per_file = 1
variable_keys = ["prcp", "swe", "tmax", "tmin", "vp"]
variable_merge_dim = patterns.MergeDim("variable", keys=variable_keys)
time_concat_dim = patterns.ConcatDim("time", keys=list(range(1980, 2021)), nitems_per_file=1)
pattern = patterns.FilePattern(make_format_function(region, frequency), variable_merge_dim, time_concat_dim)
pattern
recipe = XarrayZarrRecipe(
pattern,
cache_inputs=False,
copy_input_to_local_file=True
)
return recipe
def run(region, frequency, container_name="daymet-zarr-test"):
recipe = make_recipe(region, frequency)
cc = azure.storage.blob.ContainerClient.from_connection_string(..., container_name)
fs_remote = AzureBlobStorageFS(cc)
target = FSSpecTarget(fs_remote, f"{region}/{frequency}/")
recipe.target = target
# executor = DaskPipelineExecutor()
# result = executor.pipelines_to_plan(recipe.to_pipelines())
# cluster = GatewayCluster()
# plugin = PipInstall(['git+https://github.com/pangeo-forge/pangeo-forge-recipes'])
# client = cluster.get_client()
# client.register_worker_plugin(plugin)
# N_WORKERS=4
# cluster.scale(N_WORKERS)
# client.wait_for_workers(N_WORKERS)
# client.run(init_logging)
# print(client.dashboard_link)
# result.compute()
# return recipe
@ciaransweet
Copy link

@TomAugspurger what would you expect the value of root here https://gist.github.com/TomAugspurger/c1773efa06d71443f135eb1af8832b82#file-daymet-py-L33 be?

Would something like abfs://ciarandev-bakery-flow-storage-container/target/dask_transform_flow.zarr/ be correct?

@TomAugspurger
Copy link
Author

I think it's passed through from https://gist.github.com/TomAugspurger/c1773efa06d71443f135eb1af8832b82#file-daymet-py-L131. So the f{region}/{frequency} ends up being the root.

@ciaransweet
Copy link

Okay, what would that equate to in my case?

I'm pretty lost wiring in your stuff into the flow we've got

pangeo-forge/pangeo-forge-azure-bakery@5868189 is the diff to show what I've done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment