Skip to content

Instantly share code, notes, and snippets.

@dcherian
Created March 2, 2020 06:57
Show Gist options
  • Save dcherian/130ba22d0fbadb616837deb914eaa67e to your computer and use it in GitHub Desktop.
Save dcherian/130ba22d0fbadb616837deb914eaa67e to your computer and use it in GitHub Desktop.
map_blocks_for_metsim_test
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
#!/usr/bin/env python
# coding: utf-8
# <h1>Table of Contents<span class="tocSkip"></span></h1>
# <div class="toc"><ul class="toc-item"><li><span><a href="#define-'schema'-functions" data-toc-modified-id="define-'schema'-functions-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>define 'schema' functions</a></span></li><li><span><a href="#change-size-of-existing-dimension" data-toc-modified-id="change-size-of-existing-dimension-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>change size of existing dimension</a></span></li><li><span><a href="#decimate-existing-dimension" data-toc-modified-id="decimate-existing-dimension-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>decimate existing dimension</a></span></li><li><span><a href="#add-new-unindexed-dimension" data-toc-modified-id="add-new-unindexed-dimension-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>add new unindexed dimension</a></span></li><li><span><a href="#add-new-chunked-dimension" data-toc-modified-id="add-new-chunked-dimension-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>add new chunked dimension</a></span></li><li><span><a href="#Notes" data-toc-modified-id="Notes-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Notes</a></span></li></ul></div>
# In[1]:
import xarray as xr
import pandas as pd
import numpy as np
def metsim_1block(ds, new_index):
ds_out = xr.Dataset()
ds_out["time"] = ("time", new_index)
var_shape = list(ds["tmax"].shape)
var_shape[0] = len(new_index)
print(var_shape)
dims = ds["tmax"].dims
coords = dict(ds["tmax"].coords)
coords["time"] = ds_out.coords["time"]
for var in [
"temp",
"prec",
"shortwave",
"vapor_pressure",
"rel_humid",
"spec_humid",
"longwave",
"tsck",
]:
ds_out[var] = xr.DataArray(np.random.rand(*var_shape), dims=dims, coords=coords)
print(ds_out.dims)
return ds_out
ds = xr.tutorial.load_dataset("air_temperature")
ds_daily = xr.Dataset()
ds_daily["tmax"] = (
ds["air"].resample(time="1D").max("time").chunk({"lat": 5, "lon": 5, "time": -1})
)
ds_daily["tmin"] = (
ds["air"].resample(time="1D").min("time").chunk({"lat": 5, "lon": 5, "time": -1})
)
ds_daily["pcp"] = xr.zeros_like(ds_daily["tmin"])
ds_daily
# # define 'schema' functions
#
# `to_schema` and `from_schema` are modified from `to_dict` and `from_dict`
# In[2]:
def from_schema(d):
"""
Convert a dictionary into an xarray.Dataset.
Input dict can take several forms::
d = {'t': {'dims': ('t'), 'data': t},
'a': {'dims': ('t'), 'data': x},
'b': {'dims': ('t'), 'data': y}}
d = {'coords': {'t': {'dims': 't', 'data': t,
'attrs': {'units':'s'}}},
'attrs': {'title': 'air temperature'},
'dims': 't',
'data_vars': {'a': {'dims': 't', 'data': x, },
'b': {'dims': 't', 'data': y}}}
where 't' is the name of the dimesion, 'a' and 'b' are names of data
variables and t, x, and y are lists, numpy.arrays or pandas objects.
Parameters
----------
d : dict, with a minimum structure of {'var_0': {'dims': [..], \
'data': [..]}, \
...}
Returns
-------
obj : xarray.Dataset
See also
--------
Dataset.to_dict
DataArray.from_dict
"""
from xarray import Dataset
if not {"coords", "data_vars"}.issubset(set(d)):
variables = d.items()
else:
import itertools
variables = itertools.chain(
d.get("coords", {}).items(), d.get("data_vars", {}).items()
)
chunks = d["chunks"]
try:
variable_dict = {}
for k, v in variables:
shape = [d["dims"][k] for k in v["dims"]]
data = v.get("data")
if data is None:
if chunks is None: # TODO: this check needs to be better
data = np.empty(shape, dtype=v["dtype"])
else:
import dask.array
chunks = [d["chunks"][k] for k in v["dims"]]
data = dask.array.empty(shape, dtype=v["dtype"], chunks=chunks)
variable_dict[k] = (v["dims"], data, v.get("attrs"))
except KeyError as e:
raise ValueError(
"cannot convert dict without the key "
"'{dims_data}'".format(dims_data=str(e.args[0]))
)
obj = Dataset(variable_dict)
# what if coords aren't dims?
coords = set(d.get("coords", {})) - set(d.get("dims", {}))
obj = obj.set_coords(coords)
obj.attrs.update(d.get("attrs", {}))
return obj
def to_schema(ds):
dicty = ds.to_dict(data=False)
# remove shape on data_vars, coords since that is redundant
# This makes it easier to modify schema of input variable
# when making schema for output variable
for var in dicty["data_vars"]:
dicty["data_vars"][var].pop("shape")
for var in dicty["coords"]:
dicty["coords"][var].pop("shape")
for var in dicty["dims"]:
if var in ds.coords:
dicty["coords"][var].update(data=ds.coords[var].values)
# unify chunks so that it can be a global property?
# This is required for map_blocks anyway...
ds = ds.unify_chunks()
dicty["chunks"] = dict(ds.chunks)
return dicty
schema = to_schema(ds_daily)
schema
# In[4]:
new_index = pd.date_range('2013-01-01 00:00:00', '2014-12-31 23:00:00', freq='1H')
print(len(new_index))
# In[5]:
# update the schema
# 1. Update time dimension
# Should this be easier? in general if an indexed dimension changes size,
# we need to provide new size, new data and new chunk size
schema["dims"]["time"] = len(new_index)
schema["coords"]["time"]["data"] = new_index
schema["chunks"]["time"] = len(new_index)
# 2. Add new variables
for var in [
"temp",
"prec",
"shortwave",
"vapor_pressure",
"rel_humid",
"spec_humid",
"longwave",
"tsck",
]:
schema["data_vars"][var] = schema["data_vars"]["pcp"]
# 3. remove old variables
for var in ["pcp", "tmax", "tmin"]:
schema["data_vars"].pop(var)
schema
# In[7]:
template = from_schema(schema)
template
# # change size of existing dimension
# In[50]:
new_index = pd.date_range('2013-01-01 00:00:00', '2014-12-31 23:00:00', freq='1H')
template = ds_daily.drop_vars(ds_daily.data_vars).assign(time=new_index)
# 2. Add new variables
for var in [
"temp",
"prec",
"shortwave",
"vapor_pressure",
"rel_humid",
"spec_humid",
"longwave",
"tsck",
]:
template[var] = xr.ones_like(ds_daily.pcp)
template
# In[51]:
xr.map_blocks(metsim_1block, ds_daily, args=[new_index], template=template).compute()
# # decimate existing dimension
# In[48]:
ds = xr.tutorial.load_dataset("air_temperature").chunk({"lat": 5})
template = ds.isel(lat=slice(1, None, 5))
# result = xr.map_blocks(lambda x: x.isel(lat=[1]), ds) # fails
result = xr.map_blocks(lambda x: x.isel(lat=[1]), ds, template=template)
print(result)
xr.testing.assert_equal(result, template)
# In[100]:
# raise nice error
xr.map_blocks(lambda x: x.isel(lat=1), ds_daily, template=template).compute()
# # add new unindexed dimension
# In[50]:
xr.map_blocks(lambda x: x.expand_dims(k=3), ds_daily).compute()
# # add new chunked dimension
#
# this doesn't make sense I don't think. Should see if dask supports this.
#
# For now, we raise an error
# In[66]:
def expand(ds):
return ds.expand_dims(lat2=(ds.lat / 2).values)
template = expand(ds_daily).chunk({"lat2": 5})
xr.map_blocks(expand, ds_daily, template=template).compute(
scheduler="single-threaded"
)
# In[60]:
print(template.chunks)
print(ds_daily.chunks)
# # Notes
#
# 1. `to_dict` saves shape but this is unnecessary
# 2. `from_dict` requires data. This could be `np.empty` by default?
# 3. Does `np.empty` actually allocate memory?
# 4. Current schema doesn't include chunks
# 5. What if we just provided one block to the function? I guess the problem is if the function isn't lazy... since it expects to receive numpy arrays?
I think there are two things that are going on here. The main one is the shape of the arrays in the returned dataset are not right.
The size of the time dimension shoud be 17520 but instead it is (still) 730.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment