"""Dataset module for functions related to an xarray.Dataset."""
import pathlib
from datetime import datetime
from functools import partial
from glob import glob
from typing import Any, Callable, Dict, Hashable, List, Literal, Optional, Tuple, Union
import numpy as np
import xarray as xr
from dateutil import parser
from dateutil import relativedelta as rd
from xarray.coding.cftime_offsets import get_date_type
from xarray.coding.times import convert_times
from xcdat import bounds # noqa: F401
from xcdat.axis import center_times as center_times_func
from xcdat.axis import get_axis_coord, get_axis_dim, swap_lon_axis
from xcdat.logger import setup_custom_logger
logger = setup_custom_logger(__name__)
#: List of non-CF compliant time units.
NON_CF_TIME_UNITS: List[str] = ["months", "years"]
# Type annotation for the `paths` arg.
Paths = Union[
str,
pathlib.Path,
List[str],
List[pathlib.Path],
List[List[str]],
List[List[pathlib.Path]],
]
[docs]def open_dataset(
path: str,
data_var: Optional[str] = None,
add_bounds: bool = True,
decode_times: bool = True,
center_times: bool = False,
lon_orient: Optional[Tuple[float, float]] = None,
**kwargs: Dict[str, Any],
) -> xr.Dataset:
"""Wraps ``xarray.open_dataset()`` with post-processing options.
Parameters
----------
path : str
Path to Dataset.
data_var: Optional[str], optional
The key of the non-bounds data variable to keep in the Dataset,
alongside any existing bounds data variables, by default None.
add_bounds: bool, optional
If True, add bounds for supported axes (X, Y, T) if they are missing in
the Dataset, by default True. Bounds are required for many xCDAT
features.
decode_times: bool, optional
If True, attempt to decode times encoded in the standard NetCDF
datetime format into datetime objects. Otherwise, leave them encoded
as numbers. This keyword may not be supported by all the backends,
by default True.
center_times: bool, optional
If True, center time coordinates using the midpoint between its upper
and lower bounds. Otherwise, use the provided time coordinates, by
default False.
lon_orient: Optional[Tuple[float, float]], optional
The orientation to use for the Dataset's longitude axis (if it exists),
by default None.
Supported options:
* None: use the current orientation (if the longitude axis exists)
* (-180, 180): represents [-180, 180) in math notation
* (0, 360): represents [0, 360) in math notation
kwargs : Dict[str, Any]
Additional arguments passed on to ``xarray.open_dataset``. Refer to the
[1]_ xarray docs for accepted keyword arguments.
Returns
-------
xr.Dataset
Dataset after applying operations.
Notes
-----
``xarray.open_dataset`` opens the file with read-only access. When you
modify values of a Dataset, even one linked to files on disk, only the
in-memory copy you are manipulating in xarray is modified: the original file
on disk is never touched.
References
----------
.. [1] https://xarray.pydata.org/en/stable/generated/xarray.open_dataset.html
"""
if decode_times:
cf_compliant_time: Optional[bool] = _has_cf_compliant_time(path)
# xCDAT attempts to decode non-CF compliant time coordinates.
if cf_compliant_time is False:
ds = xr.open_dataset(path, decode_times=False, **kwargs) # type: ignore
ds = decode_non_cf_time(ds)
else:
ds = xr.open_dataset(path, decode_times=True, **kwargs) # type: ignore
else:
ds = xr.open_dataset(path, decode_times=False, **kwargs) # type: ignore
ds = _postprocess_dataset(ds, data_var, center_times, add_bounds, lon_orient)
return ds
[docs]def open_mfdataset(
paths: Paths,
data_var: Optional[str] = None,
add_bounds: bool = True,
decode_times: bool = True,
center_times: bool = False,
lon_orient: Optional[Tuple[float, float]] = None,
data_vars: Union[Literal["minimal", "different", "all"], List[str]] = "minimal",
preprocess: Optional[Callable] = None,
**kwargs: Dict[str, Any],
) -> xr.Dataset:
"""Wraps ``xarray.open_mfdataset()`` with post-processing options.
Parameters
----------
path : Union[str, pathlib.Path, List[str], List[pathlib.Path], \
List[List[str]], List[List[pathlib.Path]]]
Either a string glob in the form ``"path/to/my/files/*.nc"`` or an
explicit list of files to open. Paths can be given as strings or as
pathlib Paths. If concatenation along more than one dimension is desired,
then ``paths`` must be a nested list-of-lists (see ``combine_nested``
for details). (A string glob will be expanded to a 1-dimensional list.)
add_bounds: bool, optional
If True, add bounds for supported axes (X, Y, T) if they are missing in
the Dataset, by default True. Bounds are required for many xCDAT
features.
data_var: Optional[str], optional
The key of the data variable to keep in the Dataset, by default None.
decode_times: bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
This keyword may not be supported by all the backends, by default True.
center_times: bool, optional
If True, center time coordinates using the midpoint between its upper
and lower bounds. Otherwise, use the provided time coordinates, by
default False.
lon_orient: Optional[Tuple[float, float]], optional
The orientation to use for the Dataset's longitude axis (if it exists),
by default None.
Supported options:
* None: use the current orientation (if the longitude axis exists)
* (-180, 180): represents [-180, 180) in math notation
* (0, 360): represents [0, 360) in math notation
data_vars: Union[Literal["minimal", "different", "all"], List[str]], optional
These data variables will be concatenated together:
* "minimal": Only data variables in which the dimension already
appears are included, the default value.
* "different": Data variables which are not equal (ignoring
attributes) across all datasets are also concatenated (as well as
all for which dimension already appears). Beware: this option may
load the data payload of data variables into memory if they are not
already loaded.
* "all": All data variables will be concatenated.
* list of str: The listed data variables will be concatenated, in
addition to the "minimal" data variables.
The ``data_vars`` kwarg defaults to ``"minimal"``, which concatenates
data variables in a manner where only data variables in which the
dimension already appears are included. For example, the time dimension
will not be concatenated to the dimensions of non-time data variables
such as "lat_bnds" or "lon_bnds". `data_vars="minimal"` is required for
some XCDAT functions, including spatial averaging where a reduction is
performed using the lat/lon bounds.
preprocess : Optional[Callable], optional
If provided, call this function on each dataset prior to concatenation.
You can find the file-name from which each dataset was loaded in
``ds.encoding["source"]``.
kwargs : Dict[str, Any]
Additional arguments passed on to ``xarray.open_mfdataset``. Refer to
the [2]_ xarray docs for accepted keyword arguments.
Returns
-------
xr.Dataset
The Dataset.
Notes
-----
``xarray.open_mfdataset`` opens the file with read-only access. When you
modify values of a Dataset, even one linked to files on disk, only the
in-memory copy you are manipulating in xarray is modified: the original file
on disk is never touched.
References
----------
.. [2] https://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html
"""
# `xr.open_mfdataset()` drops the time coordinates encoding dictionary if
# multiple files are merged with `decode_times=True` (refer to
# https://github.com/pydata/xarray/issues/2436). The workaround is to store
# the time encoding from the first dataset as a variable, and add the time
# encoding back to final merged dataset.
time_encoding = None
if decode_times:
time_encoding = _keep_time_encoding(paths)
cf_compliant_time: Optional[bool] = _has_cf_compliant_time(paths)
# xCDAT attempts to decode non-CF compliant time coordinates using the
# preprocess keyword arg with `xr.open_mfdataset()`.
if cf_compliant_time is False:
decode_times = False
preprocess = partial(_preprocess_non_cf_dataset, callable=preprocess)
ds = xr.open_mfdataset(
paths,
decode_times=decode_times,
data_vars=data_vars,
preprocess=preprocess,
**kwargs, # type: ignore
)
ds = _postprocess_dataset(ds, data_var, center_times, add_bounds, lon_orient)
if time_encoding is not None:
time_dim = get_axis_dim(ds, "T")
ds[time_dim].encoding = time_encoding
# Update "original_shape" to reflect the final time coordinates shape.
ds[time_dim].encoding["original_shape"] = ds[time_dim].shape
return ds
[docs]def decode_non_cf_time(dataset: xr.Dataset) -> xr.Dataset:
"""Decodes time coordinates and time bounds with non-CF compliant units.
By default, ``xarray`` only supports decoding time with CF compliant units
[3]_. This function enables decoding time with non-CF compliant units.
The time coordinates must have a "calendar" attribute set to a CF calendar
type supported by ``cftime`` ("noleap", "360_day", "365_day", "366_day",
"gregorian", "proleptic_gregorian", "julian", "all_leap", or "standard")
and a "units" attribute set to a supported format ("months since ..." or
"years since ...").
The logic for this function:
1. Extract units and reference date strings from the "units" attribute.
* For example with "months since 1800-01-01", the units are "months" and
reference date is "1800-01-01".
2. Using the reference date, create a reference ``datetime`` object.
3. Starting from the reference ``datetime`` object, use the numerically
encoded time coordinate values (each representing an offset) to create an
array of ``cftime`` objects based on the calendar type.
4. Using the array of ``cftime`` objects, create a new xr.DataArray
of time coordinates to replace the numerically encoded ones.
5. If it exists, create a time bounds DataArray using steps 3 and 4.
Parameters
----------
dataset : xr.Dataset
Dataset with numerically encoded time coordinates and time bounds (if
they exist). If the time coordinates cannot be decoded then the original
dataset is returned.
Returns
-------
xr.Dataset
Dataset with decoded time coordinates and time bounds (if they exist) as
``cftime`` objects.
Notes
-----
Time coordinates are represented by ``cftime.datetime`` objects because
it is not restricted by the ``pandas.Timestamp`` range (years 1678 through
2262). Refer to [4]_ and [5]_ for more information on this limitation.
References
-----
.. [3] https://cfconventions.org/cf-conventions/cf-conventions.html#time-coordinate
.. [4] https://docs.xarray.dev/en/stable/user-guide/weather-climate.html#non-standard-calendars-and-dates-outside-the-timestamp-valid-range
.. [5] https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timestamp-limitations
Examples
--------
Decode the time coordinates with non-CF units in a Dataset:
>>> from xcdat.dataset import decode_non_cf_time
>>>
>>> ds.time
<xarray.DataArray 'time' (time: 3)>
array([0, 1, 2])
Coordinates:
* time (time) int64 0 1 2
Attributes:
units: years since 2000-01-01
bounds: time_bnds
axis: T
long_name: time
standard_name: time
calendar: noleap
>>>
>>> ds_decoded = decode_non_cf_time(ds)
>>> ds_decoded.time
<xarray.DataArray 'time' (time: 3)>
array([cftime.DatetimeNoLeap(1850, 1, 1, 0, 0, 0, 0, has_year_zero=True),
cftime.DatetimeNoLeap(1850, 1, 1, 0, 0, 0, 0, has_year_zero=True),
cftime.DatetimeNoLeap(1850, 1, 1, 0, 0, 0, 0, has_year_zero=True)],
dtype='object')
Coordinates:
* time (time) datetime64[ns] 2000-01-01 2001-01-01 2002-01-01
Attributes:
units: years since 2000-01-01
bounds: time_bnds
axis: T
long_name: time
standard_name: time
calendar: noleap
View time encoding information:
>>> ds_decoded.time.encoding
{'source': None,
'dtype': dtype('int64'),
'original_shape': (3,),
'units': 'years since 2000-01-01',
'calendar': 'noleap'}
"""
ds = dataset.copy()
time = get_axis_coord(ds, "T")
time_attrs = time.attrs
# NOTE: When opening datasets with `decode_times=False`, the "calendar" and
# "units" attributes are stored in `.attrs` (unlike `decode_times=True`
# which stores them in `.encoding`). Since xCDAT manually decodes non-CF
# compliant time coordinates by first setting `decode_times=False`, the
# "calendar" and "units" attrs are popped from the `.attrs` dict and stored
# in the `.encoding` dict to mimic xarray's behavior.
calendar = time_attrs.pop("calendar", None)
units_attr = time_attrs.pop("units", None)
if calendar is None:
logger.warning(
"This dataset's time coordinates do not have a 'calendar' attribute set, "
"so time coordinates could not be decoded. Set the 'calendar' attribute "
f"(`ds.{time.name}.attrs['calendar]`) and try decoding the time "
"coordinates again."
)
return ds
if units_attr is None:
logger.warning(
"This dataset's time coordinates do not have a 'units' attribute set, "
"so the time coordinates could not be decoded. Set the 'units' attribute "
f"(`ds.{time.name}.attrs['units']`) and try decoding the time "
"coordinates again."
)
return ds
try:
units, ref_date = _split_time_units_attr(units_attr)
except ValueError:
logger.warning(
f"This dataset's time coordinates 'units' attribute ('{units_attr}') is "
"not in a supported format ('months since...' or 'years since...'), so the "
"time coordinates could not be decoded."
)
return ds
data = _get_cftime_coords(ref_date, time.values, calendar, units)
decoded_time = xr.DataArray(
name=time.name,
data=data,
dims=time.dims,
coords={time.name: data},
# As mentioned in a comment above, the units and calendar attributes are
# popped from the `.attrs` dict.
attrs=time_attrs,
)
decoded_time.encoding = {
"source": ds.encoding.get("source", "None"),
"dtype": time.dtype,
"original_shape": time.shape,
# The units and calendar attributes are now saved in the `.encoding`
# dict.
"units": units_attr,
"calendar": calendar,
}
ds = ds.assign_coords({time.name: decoded_time})
try:
time_bounds = ds.bounds.get_bounds("T")
except KeyError:
time_bounds = None
if time_bounds is not None:
lowers = _get_cftime_coords(ref_date, time_bounds.values[:, 0], calendar, units)
uppers = _get_cftime_coords(ref_date, time_bounds.values[:, 1], calendar, units)
data_bounds = np.vstack((lowers, uppers)).T
decoded_time_bnds = xr.DataArray(
name=time_bounds.name,
data=data_bounds,
dims=time_bounds.dims,
coords=time_bounds.coords,
attrs=time_bounds.attrs,
)
decoded_time_bnds.coords[time.name] = decoded_time
decoded_time_bnds.encoding = time_bounds.encoding
ds = ds.assign({time_bounds.name: decoded_time_bnds})
return ds
def _keep_time_encoding(paths: Paths) -> Dict[Hashable, Any]:
"""
Returns the time encoding attributes from the first dataset in a list of
paths.
Time encoding information is critical for several xCDAT operations such as
temporal averaging (e.g., uses the "calendar" attr). This function is a
workaround to the undesired xarray behavior/quirk with
`xr.open_mfdataset()`, which drops the `.encoding` dict from the final
merged dataset (refer to https://github.com/pydata/xarray/issues/2436).
Parameters
----------
paths: Paths
The paths to the dataset(s).
Returns
-------
Dict[Hashable, Any]
The time encoding dictionary.
"""
first_path = _get_first_path(paths)
# xcdat.open_dataset() is called instead of xr.open_dataset() because
# we want to handle decoding non-CF compliant as well.
# FIXME: Remove `type: ignore` comment after properly handling the type
# annotations in `_get_first_path()`.
ds = open_dataset(first_path, decode_times=True, add_bounds=False) # type: ignore
time_coord = get_axis_coord(ds, "T")
time_encoding = time_coord.encoding
time_encoding["source"] = paths
return time_coord.encoding
def _has_cf_compliant_time(paths: Paths) -> Optional[bool]:
"""Checks if a dataset has time coordinates with CF compliant units.
If the dataset does not contain a time dimension, None is returned.
Otherwise, the units attribute is extracted from the time coordinates to
determine whether it is CF or non-CF compliant.
Parameters
----------
path : Union[str, pathlib.Path, List[str], List[pathlib.Path], \
List[List[str]], List[List[pathlib.Path]]]
Either a file (``"file.nc"``), a string glob in the form
``"path/to/my/files/*.nc"``, or an explicit list of files to open.
Paths can be given as strings or as pathlib Paths. If concatenation
along more than one dimension is desired, then ``paths`` must be a
nested list-of-lists (see ``combine_nested`` for details). (A string
glob will be expanded to a 1-dimensional list.)
Returns
-------
Optional[bool]
None if time dimension does not exist, True if CF compliant, or False if
non-CF compliant.
Notes
-----
This function only checks one file for multi-file datasets to optimize
performance because it is slower to combine all files then check for CF
compliance.
"""
first_path = _get_first_path(paths)
ds = xr.open_dataset(first_path, decode_times=False) # type: ignore
if ds.cf.dims.get("T") is None:
return None
time = ds.cf["T"]
# If the time units attr cannot be split, it is not cf_compliant.
try:
units = _split_time_units_attr(time.attrs.get("units"))[0]
except ValueError:
return False
cf_compliant = units not in NON_CF_TIME_UNITS
return cf_compliant
def _get_first_path(path: Paths) -> Optional[Union[pathlib.Path, str]]:
"""Returns the first path from a list of paths.
Parameters
----------
path : Paths
A list of paths.
Returns
-------
str
Returns the first path from a list of paths.
"""
# FIXME: This function should throw an exception if the first file
# is not a supported type.
# FIXME: The `type: ignore` comments should be removed after properly
# handling the types.
first_file: Optional[Union[pathlib.Path, str]] = None
if isinstance(path, str) and "*" in path:
first_file = glob(path)[0]
elif isinstance(path, str) or isinstance(path, pathlib.Path):
first_file = path
elif isinstance(path, list):
if any(isinstance(sublist, list) for sublist in path):
first_file = path[0][0] # type: ignore
else:
first_file = path[0] # type: ignore
return first_file
def _postprocess_dataset(
dataset: xr.Dataset,
data_var: Optional[str] = None,
center_times: bool = False,
add_bounds: bool = True,
lon_orient: Optional[Tuple[float, float]] = None,
) -> xr.Dataset:
"""Post-processes a Dataset object.
Parameters
----------
dataset : xr.Dataset
The dataset.
data_var: Optional[str], optional
The key of the data variable to keep in the Dataset, by default None.
center_times: bool, optional
If True, center time coordinates using the midpoint between its upper
and lower bounds. Otherwise, use the provided time coordinates, by
default False.
add_bounds: bool, optional
If True, add bounds for supported axes (X, Y, T) if they are missing in
the Dataset, by default False.
lon_orient: Optional[Tuple[float, float]], optional
The orientation to use for the Dataset's longitude axis (if it exists),
by default None.
Supported options:
* None: use the current orientation (if the longitude axis exists)
* (-180, 180): represents [-180, 180) in math notation
* (0, 360): represents [0, 360) in math notation
Returns
-------
xr.Dataset
The Dataset.
Raises
------
ValueError
If ``center_times==True`` but there are no time coordinates.
ValueError
If ``lon_orient is not None`` but there are no longitude coordinates.
"""
if data_var is not None:
dataset = _keep_single_var(dataset, data_var)
if center_times:
if dataset.cf.dims.get("T") is not None:
dataset = center_times_func(dataset)
else:
raise ValueError("This dataset does not have a time coordinates to center.")
if add_bounds:
dataset = dataset.bounds.add_missing_bounds()
if lon_orient is not None:
if dataset.cf.dims.get("X") is not None:
dataset = swap_lon_axis(dataset, to=lon_orient, sort_ascending=True)
else:
raise ValueError(
"This dataset does not have longitude coordinates to reorient."
)
return dataset
def _keep_single_var(dataset: xr.Dataset, key: str) -> xr.Dataset:
"""Keeps a single non-bounds data variable in the Dataset.
This function checks if the ``data_var`` key exists in the Dataset and
it is not related to bounds. If those checks pass, it will subset the
Dataset to retain that non-bounds ``data_var`` and all bounds data vars.
Parameters
----------
dataset : xr.Dataset
The Dataset.
key: str
The key of the non-bounds data variable to keep in the Dataset.
Returns
-------
xr.Dataset
The Dataset.
Raises
------
ValueError
If the dataset only contains bounds data variables.
ValueError
If the specified key does not exist in the dataset.
ValueError
If the specified key matches a bounds data variable.
"""
all_vars = dataset.data_vars.keys()
bounds_vars = dataset.bounds.keys
non_bounds_vars = sorted(list(set(all_vars) ^ set(bounds_vars)))
if len(non_bounds_vars) == 0:
raise ValueError("This dataset only contains bounds data variables.")
if key not in all_vars:
raise ValueError(f"The data variable '{key}' does not exist in the dataset.")
if key in bounds_vars:
raise ValueError("Please specify a non-bounds data variable.")
return dataset[[key] + bounds_vars]
def _get_data_var(dataset: xr.Dataset, key: str) -> xr.DataArray:
"""Get a data variable in the Dataset by key.
Parameters
----------
dataset : xr.Dataset
The Dataset.
key : str
The data variable key.
Returns
-------
xr.DataArray
The data variable.
Raises
------
KeyError
If the data variable does not exist in the Dataset.
"""
dv = dataset.get(key, None)
if dv is None:
raise KeyError(f"The data variable '{key}' does not exist in the Dataset.")
return dv.copy()
def _preprocess_non_cf_dataset(
ds: xr.Dataset, callable: Optional[Callable] = None
) -> xr.Dataset:
"""Preprocessing for each non-CF compliant dataset in ``open_mfdataset()``.
This function accepts a user specified preprocess function, which is
executed before additional internal preprocessing functions.
One call is performed to ``decode_non_cf_time()`` for decoding each
dataset's time coordinates and time bounds (if they exist) with non-CF
compliant units. By default, if ``decode_times=False`` is passed, xarray
will concatenate time values using the first dataset's ``units`` attribute.
This is an issue for cases where the numerically encoded time values are the
same and the ``units`` attribute differs between datasets.
For example, two files have the same time values, but the units of the first
file is "months since 2000-01-01" and the second is "months since
2001-01-01". Since the first dataset's units are used in xarray for
concatenating datasets, the time values corresponding to the second file
will be dropped since they appear to be the same as the first file.
Calling ``decode_non_cf_time()`` on each dataset individually before
concatenating solves the aforementioned issue.
Parameters
----------
ds : xr.Dataset
The Dataset.
callable : Optional[Callable], optional
A user specified optional callable function for preprocessing.
Returns
-------
xr.Dataset
The preprocessed Dataset.
"""
ds_new = ds.copy()
if callable:
ds_new = callable(ds)
# Attempt to decode non-cf-compliant time axis.
ds_new = decode_non_cf_time(ds_new)
return ds_new
def _split_time_units_attr(units_attr: str) -> Tuple[str, str]:
"""Splits the time coordinates' units attr into units and reference date.
Parameters
----------
units_attr : str
The units attribute (e.g., "months since 1800-01-01").
Returns
-------
Tuple[str, str]
The units (e.g, "months") and the reference date (e.g., "1800-01-01").
Raises
------
KeyError
If the time units attribute was not found.
ValueError
If the time units attribute is not of the form `X since Y`.
"""
if "since" in units_attr:
units, reference_date = units_attr.split(" since ")
else:
raise ValueError(
"This dataset does not have time coordinates of the form 'X since Y'."
)
return units, reference_date
def _get_cftime_coords(
ref_date: str, offsets: np.ndarray, calendar: str, units: str
) -> np.ndarray:
"""Get an array of `cftime` coordinates starting from a reference date.
Parameters
----------
ref_date : str
The starting reference date.
offsets : np.ndarray
An array of numerically encoded time offsets from the reference date.
calendar : str
The CF calendar type supported by ``cftime``. This includes "noleap",
"360_day", "365_day", "366_day", "gregorian", "proleptic_gregorian",
"julian", "all_leap", and "standard".
units : str
The time units.
Returns
-------
np.ndarray
An array of `cftime` coordinates.
"""
# Starting from the reference date, create an array of `datetime` objects
# by adding each offset (a numerically encoded value) to the reference date.
# The `parse.parse` default is set to datetime(2000, 1, 1), with each
# component being a placeholder if the value does not exist. For example, 1
# and 1 are placeholders for month and day if those values don't exist.
ref_datetime: datetime = parser.parse(ref_date, default=datetime(2000, 1, 1))
offsets = np.array(
[ref_datetime + rd.relativedelta(**{units: offset}) for offset in offsets],
dtype="object",
)
# Convert the array of `datetime` objects into `cftime` objects based on
# the calendar type.
date_type = get_date_type(calendar)
coords = convert_times(offsets, date_type=date_type)
return coords