-
-
Save dcherian/130ba22d0fbadb616837deb914eaa67e to your computer and use it in GitHub Desktop.
map_blocks_for_metsim_test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# <h1>Table of Contents<span class="tocSkip"></span></h1> | |
# <div class="toc"><ul class="toc-item"><li><span><a href="#define-'schema'-functions" data-toc-modified-id="define-'schema'-functions-1"><span class="toc-item-num">1 </span>define 'schema' functions</a></span></li><li><span><a href="#change-size-of-existing-dimension" data-toc-modified-id="change-size-of-existing-dimension-2"><span class="toc-item-num">2 </span>change size of existing dimension</a></span></li><li><span><a href="#decimate-existing-dimension" data-toc-modified-id="decimate-existing-dimension-3"><span class="toc-item-num">3 </span>decimate existing dimension</a></span></li><li><span><a href="#add-new-unindexed-dimension" data-toc-modified-id="add-new-unindexed-dimension-4"><span class="toc-item-num">4 </span>add new unindexed dimension</a></span></li><li><span><a href="#add-new-chunked-dimension" data-toc-modified-id="add-new-chunked-dimension-5"><span class="toc-item-num">5 </span>add new chunked dimension</a></span></li><li><span><a href="#Notes" data-toc-modified-id="Notes-6"><span class="toc-item-num">6 </span>Notes</a></span></li></ul></div> | |
# In[1]: | |
import xarray as xr | |
import pandas as pd | |
import numpy as np | |
def metsim_1block(ds, new_index): | |
ds_out = xr.Dataset() | |
ds_out["time"] = ("time", new_index) | |
var_shape = list(ds["tmax"].shape) | |
var_shape[0] = len(new_index) | |
print(var_shape) | |
dims = ds["tmax"].dims | |
coords = dict(ds["tmax"].coords) | |
coords["time"] = ds_out.coords["time"] | |
for var in [ | |
"temp", | |
"prec", | |
"shortwave", | |
"vapor_pressure", | |
"rel_humid", | |
"spec_humid", | |
"longwave", | |
"tsck", | |
]: | |
ds_out[var] = xr.DataArray(np.random.rand(*var_shape), dims=dims, coords=coords) | |
print(ds_out.dims) | |
return ds_out | |
ds = xr.tutorial.load_dataset("air_temperature") | |
ds_daily = xr.Dataset() | |
ds_daily["tmax"] = ( | |
ds["air"].resample(time="1D").max("time").chunk({"lat": 5, "lon": 5, "time": -1}) | |
) | |
ds_daily["tmin"] = ( | |
ds["air"].resample(time="1D").min("time").chunk({"lat": 5, "lon": 5, "time": -1}) | |
) | |
ds_daily["pcp"] = xr.zeros_like(ds_daily["tmin"]) | |
ds_daily | |
# # define 'schema' functions | |
# | |
# `to_schema` and `from_schema` are modified from `to_dict` and `from_dict` | |
# In[2]: | |
def from_schema(d): | |
""" | |
Convert a dictionary into an xarray.Dataset. | |
Input dict can take several forms:: | |
d = {'t': {'dims': ('t'), 'data': t}, | |
'a': {'dims': ('t'), 'data': x}, | |
'b': {'dims': ('t'), 'data': y}} | |
d = {'coords': {'t': {'dims': 't', 'data': t, | |
'attrs': {'units':'s'}}}, | |
'attrs': {'title': 'air temperature'}, | |
'dims': 't', | |
'data_vars': {'a': {'dims': 't', 'data': x, }, | |
'b': {'dims': 't', 'data': y}}} | |
where 't' is the name of the dimesion, 'a' and 'b' are names of data | |
variables and t, x, and y are lists, numpy.arrays or pandas objects. | |
Parameters | |
---------- | |
d : dict, with a minimum structure of {'var_0': {'dims': [..], \ | |
'data': [..]}, \ | |
...} | |
Returns | |
------- | |
obj : xarray.Dataset | |
See also | |
-------- | |
Dataset.to_dict | |
DataArray.from_dict | |
""" | |
from xarray import Dataset | |
if not {"coords", "data_vars"}.issubset(set(d)): | |
variables = d.items() | |
else: | |
import itertools | |
variables = itertools.chain( | |
d.get("coords", {}).items(), d.get("data_vars", {}).items() | |
) | |
chunks = d["chunks"] | |
try: | |
variable_dict = {} | |
for k, v in variables: | |
shape = [d["dims"][k] for k in v["dims"]] | |
data = v.get("data") | |
if data is None: | |
if chunks is None: # TODO: this check needs to be better | |
data = np.empty(shape, dtype=v["dtype"]) | |
else: | |
import dask.array | |
chunks = [d["chunks"][k] for k in v["dims"]] | |
data = dask.array.empty(shape, dtype=v["dtype"], chunks=chunks) | |
variable_dict[k] = (v["dims"], data, v.get("attrs")) | |
except KeyError as e: | |
raise ValueError( | |
"cannot convert dict without the key " | |
"'{dims_data}'".format(dims_data=str(e.args[0])) | |
) | |
obj = Dataset(variable_dict) | |
# what if coords aren't dims? | |
coords = set(d.get("coords", {})) - set(d.get("dims", {})) | |
obj = obj.set_coords(coords) | |
obj.attrs.update(d.get("attrs", {})) | |
return obj | |
def to_schema(ds): | |
dicty = ds.to_dict(data=False) | |
# remove shape on data_vars, coords since that is redundant | |
# This makes it easier to modify schema of input variable | |
# when making schema for output variable | |
for var in dicty["data_vars"]: | |
dicty["data_vars"][var].pop("shape") | |
for var in dicty["coords"]: | |
dicty["coords"][var].pop("shape") | |
for var in dicty["dims"]: | |
if var in ds.coords: | |
dicty["coords"][var].update(data=ds.coords[var].values) | |
# unify chunks so that it can be a global property? | |
# This is required for map_blocks anyway... | |
ds = ds.unify_chunks() | |
dicty["chunks"] = dict(ds.chunks) | |
return dicty | |
schema = to_schema(ds_daily) | |
schema | |
# In[4]: | |
new_index = pd.date_range('2013-01-01 00:00:00', '2014-12-31 23:00:00', freq='1H') | |
print(len(new_index)) | |
# In[5]: | |
# update the schema | |
# 1. Update time dimension | |
# Should this be easier? in general if an indexed dimension changes size, | |
# we need to provide new size, new data and new chunk size | |
schema["dims"]["time"] = len(new_index) | |
schema["coords"]["time"]["data"] = new_index | |
schema["chunks"]["time"] = len(new_index) | |
# 2. Add new variables | |
for var in [ | |
"temp", | |
"prec", | |
"shortwave", | |
"vapor_pressure", | |
"rel_humid", | |
"spec_humid", | |
"longwave", | |
"tsck", | |
]: | |
schema["data_vars"][var] = schema["data_vars"]["pcp"] | |
# 3. remove old variables | |
for var in ["pcp", "tmax", "tmin"]: | |
schema["data_vars"].pop(var) | |
schema | |
# In[7]: | |
template = from_schema(schema) | |
template | |
# # change size of existing dimension | |
# In[50]: | |
new_index = pd.date_range('2013-01-01 00:00:00', '2014-12-31 23:00:00', freq='1H') | |
template = ds_daily.drop_vars(ds_daily.data_vars).assign(time=new_index) | |
# 2. Add new variables | |
for var in [ | |
"temp", | |
"prec", | |
"shortwave", | |
"vapor_pressure", | |
"rel_humid", | |
"spec_humid", | |
"longwave", | |
"tsck", | |
]: | |
template[var] = xr.ones_like(ds_daily.pcp) | |
template | |
# In[51]: | |
xr.map_blocks(metsim_1block, ds_daily, args=[new_index], template=template).compute() | |
# # decimate existing dimension | |
# In[48]: | |
ds = xr.tutorial.load_dataset("air_temperature").chunk({"lat": 5}) | |
template = ds.isel(lat=slice(1, None, 5)) | |
# result = xr.map_blocks(lambda x: x.isel(lat=[1]), ds) # fails | |
result = xr.map_blocks(lambda x: x.isel(lat=[1]), ds, template=template) | |
print(result) | |
xr.testing.assert_equal(result, template) | |
# In[100]: | |
# raise nice error | |
xr.map_blocks(lambda x: x.isel(lat=1), ds_daily, template=template).compute() | |
# # add new unindexed dimension | |
# In[50]: | |
xr.map_blocks(lambda x: x.expand_dims(k=3), ds_daily).compute() | |
# # add new chunked dimension | |
# | |
# this doesn't make sense I don't think. Should see if dask supports this. | |
# | |
# For now, we raise an error | |
# In[66]: | |
def expand(ds): | |
return ds.expand_dims(lat2=(ds.lat / 2).values) | |
template = expand(ds_daily).chunk({"lat2": 5}) | |
xr.map_blocks(expand, ds_daily, template=template).compute( | |
scheduler="single-threaded" | |
) | |
# In[60]: | |
print(template.chunks) | |
print(ds_daily.chunks) | |
# # Notes | |
# | |
# 1. `to_dict` saves shape but this is unnecessary | |
# 2. `from_dict` requires data. This could be `np.empty` by default? | |
# 3. Does `np.empty` actually allocate memory? | |
# 4. Current schema doesn't include chunks | |
# 5. What if we just provided one block to the function? I guess the problem is if the function isn't lazy... since it expects to receive numpy arrays? | |
I think there are two things that are going on here. The main one is the shape of the arrays in the returned dataset are not right. | |
The size of the time dimension shoud be 17520 but instead it is (still) 730. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment