Created
May 17, 2021 17:54
-
-
Save TomAugspurger/c1773efa06d71443f135eb1af8832b82 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dask | |
import os | |
import logging | |
from collections.abc import MutableMapping | |
from numpy.core.fromnumeric import var | |
import azure.storage.blob | |
from azure.core.exceptions import ResourceNotFoundError | |
from pangeo_forge_recipes import patterns | |
from pangeo_forge_recipes.recipes import XarrayZarrRecipe | |
from pangeo_forge_recipes.storage import FSSpecTarget | |
from pangeo_forge_recipes.executors import DaskPipelineExecutor | |
from dask_gateway import GatewayCluster | |
from distributed.diagnostics.plugin import PipInstall | |
def init_logging(dask_worker=None): | |
logger = logging.getLogger("pangeo_forge_recipes") | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
handler.setLevel(logging.INFO) | |
formatter = logging.Formatter("[%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(funcName)10s() ] %(message)s") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
if dask_worker: | |
dask_worker._setup_logging(logger) | |
class AzureBlobStorageStore(MutableMapping): | |
def __init__(self, container_client, root=""): | |
if len(root): | |
assert root[0] != "/" | |
assert root[-1] == "/" | |
self.container_client = container_client | |
self.root = root | |
def __getitem__(self, key): | |
key = os.path.join(self.root, key) | |
with self.container_client.get_blob_client(key) as bc: | |
try: | |
stream = bc.download_blob() | |
except ResourceNotFoundError as e: | |
raise KeyError(key) from e | |
data = stream.readall() | |
return data | |
def __setitem__(self, key, value): | |
key = os.path.join(self.root, key) | |
# bug in zarr? xarray? | |
if hasattr(value, "size") and value.size == 1 and hasattr(value, "tobytes"): | |
value = value.tobytes() | |
with self.container_client.get_blob_client(key) as bc: | |
bc.upload_blob(value, overwrite=True) | |
def __delitem__(self, key): | |
key = os.path.join(self.root, key) | |
with self.container_client.get_blob_client(key) as bc: | |
bc.delete_blob() | |
def __iter__(self): | |
prefix_len = len(self.root) | |
return ( | |
x["name"][prefix_len:] for x in self.container_client.list_blobs(self.root) | |
) | |
return self.lisdir() | |
def __len__(self): | |
return len(list(self.container_client.list_blobs(self.root))) | |
class AzureBlobStorageFS: | |
def __init__(self, container_client): | |
self.container_client = container_client | |
def get_mapper(self, root): | |
return AzureBlobStorageStore(self.container_client, root) | |
def isdir(self, path): | |
return True | |
def make_format_function(region, frequency): | |
if frequency == "daily": | |
template = "https://daymeteuwest.blob.core.windows.net/daymet-nc/daymet_v4_{frequency}/daymet_v4_daily_{region}_{{variable}}_{{time}}.nc".format(region=region, frequency=frequency) | |
else: | |
template = "https://daymeteuwest.blob.core.windows.net/daymet-nc/daymet_v4_{frequency}/daymet_v4_{{variable}}_{{kind}}_{region}_{{time}}.nc".format(region=region, frequency=frequency) | |
def format_function(variable, time): | |
if variable == "prcp": | |
kind = "annttl" | |
else: | |
kind = "annavg" | |
return template.format(variable=variable, kind=kind, time=time) | |
return format_function | |
def make_recipe(region, frequency): | |
assert region in {"hi", "pr", "na"} | |
assert frequency in {"daily", "monthly", "annual"} | |
if frequency == "daily": | |
variable_keys = ["prcp", "swe", "tmax", "tmin", "vp", "srad"] | |
n_items_per_file = 365 | |
else: | |
if frequency == "monthly": | |
n_items_per_file = 12 | |
else: | |
n_items_per_file = 1 | |
variable_keys = ["prcp", "swe", "tmax", "tmin", "vp"] | |
variable_merge_dim = patterns.MergeDim("variable", keys=variable_keys) | |
time_concat_dim = patterns.ConcatDim("time", keys=list(range(1980, 2021)), nitems_per_file=1) | |
pattern = patterns.FilePattern(make_format_function(region, frequency), variable_merge_dim, time_concat_dim) | |
pattern | |
recipe = XarrayZarrRecipe( | |
pattern, | |
cache_inputs=False, | |
copy_input_to_local_file=True | |
) | |
return recipe | |
def run(region, frequency, container_name="daymet-zarr-test"): | |
recipe = make_recipe(region, frequency) | |
cc = azure.storage.blob.ContainerClient.from_connection_string(..., container_name) | |
fs_remote = AzureBlobStorageFS(cc) | |
target = FSSpecTarget(fs_remote, f"{region}/{frequency}/") | |
recipe.target = target | |
# executor = DaskPipelineExecutor() | |
# result = executor.pipelines_to_plan(recipe.to_pipelines()) | |
# cluster = GatewayCluster() | |
# plugin = PipInstall(['git+https://github.com/pangeo-forge/pangeo-forge-recipes']) | |
# client = cluster.get_client() | |
# client.register_worker_plugin(plugin) | |
# N_WORKERS=4 | |
# cluster.scale(N_WORKERS) | |
# client.wait_for_workers(N_WORKERS) | |
# client.run(init_logging) | |
# print(client.dashboard_link) | |
# result.compute() | |
# return recipe |
I think it's passed through from https://gist.github.com/TomAugspurger/c1773efa06d71443f135eb1af8832b82#file-daymet-py-L131. So the f{region}/{frequency}
ends up being the root.
Okay, what would that equate to in my case?
I'm pretty lost wiring in your stuff into the flow we've got
pangeo-forge/pangeo-forge-azure-bakery@5868189 is the diff to show what I've done
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@TomAugspurger what would you expect the value of
root
here https://gist.github.com/TomAugspurger/c1773efa06d71443f135eb1af8832b82#file-daymet-py-L33 be?Would something like
abfs://ciarandev-bakery-flow-storage-container/target/dask_transform_flow.zarr/
be correct?