Source code for aeolus.calc.stats

"""Statistical functions."""

from collections.abc import Iterable, Mapping

import iris.analysis
from iris.analysis.maths import apply_ufunc
from iris.coords import AuxCoord
from iris.cube import Cube
from iris.exceptions import CoordinateCollapseError as CoColErr
from iris.exceptions import CoordinateNotFoundError as CoNotFound
import iris.util
from iris.util import broadcast_to_shape, promote_aux_coord_to_dim_coord
import numpy as np

from ..coord import area_weights_cube, coord_to_cube, ensure_bounds
from ..exceptions import ArgumentError, _warn
from ..model import um
from ..subset import (
    extract_after_n_days,
    extract_between_days,
    extract_last_n_days,
)
from .calculus import integrate

__all__ = (
    "abs_coord_mean",
    "after_n_day_mean",
    "between_day_mean",
    "cumsum",
    "last_n_day_mean",
    "meridional_mean",
    "minmaxdiff",
    "normalize_cube",
    "region_mean_diff",
    "spatial",
    "spatial_mean",
    "spatial_quartiles",
    "time_mean",
    "vertical_mean",
    "zonal_mean",
)


[docs] def abs_coord_mean(cube, coord): """ Calculate cube's average over absolute values of a coordinate. For example, applying this to a cube with N latitudes ranging from SP to NP returns a cube with (N+1)//2 latitudes. Parameters ---------- cube: iris.cube.Cube Cube with the specified coordinate. coord: str or iris.coords.Coord Coordinate to apply abs(). Returns ------- iris.cube.Cube Cube with a reduced dimension. """ sign_lat = apply_ufunc( np.sign, coord_to_cube(cube, coord, broadcast=False) ) sign_cube = sign_lat * cube _coord = sign_cube.coord(coord) _coord_dim = sign_cube.coord_dims(_coord) abs_coord = AuxCoord.from_coord(_coord) abs_coord.rename(f"abs_{abs_coord.name()}") abs_coord.points = np.abs(_coord.points) abs_coord.bounds = np.abs(_coord.bounds) sign_cube.add_aux_coord(abs_coord, data_dims=_coord_dim) out = sign_cube.aggregated_by(abs_coord, iris.analysis.MEAN) out.remove_coord(_coord) out.rename(cube.name()) out.units = cube.units promote_aux_coord_to_dim_coord(out, abs_coord.name()) out.coord(abs_coord).rename(_coord.name()) return out
[docs] def cumsum(cube, axis, axis_weights=False, model=um): """ Cumulative sum of a cube. Parameters ---------- cube: iris.cube.Cube Input cube. axis: str Coordinate axis of operation (t|z|y|x). axis_weights: bool, optional If True, multiply data by the coordinate spacings. model: aeolus.model.Model, optional Model class with a relevant coordinate name. Returns ------- iris.cube.Cube Cube of cumulative sums with the same dimensions as the input cube. """ try: c = cube.coord(getattr(model, axis)).copy() except (AttributeError, CoNotFound): c = cube.coord(axis=axis).copy() dim = cube.coord_dims(c) if axis_weights: if not c.has_bounds(): c.guess_bounds() weights = broadcast_to_shape( c.bounds[:, 1] - c.bounds[:, 0], cube.shape, dim ) data = cube.data * weights units = cube.units * c.units else: data = cube.data units = cube.units data = np.nancumsum(data, axis=dim[0]) res = cube.copy(data=data) res.rename(f"cumulative_sum_of_{cube.name()}_along_{axis}") res.units = units return res
[docs] def last_n_day_mean(cube, days=365, model=um): """Average the cube over the last `n` days of its time dimension.""" cube_sub = time_mean( extract_last_n_days(cube, days=days, model=model), model=model ) return cube_sub
[docs] def after_n_day_mean(cube, days=365, model=um): """Average the cube over the last `n` days of its time dimension.""" cube_sub = time_mean( extract_after_n_days(cube, days=days, model=model), model=model ) return cube_sub
[docs] def between_day_mean(cube, day_start, day_end, model=um): """Average the cube over a subset of days.""" cube_sub = time_mean( extract_between_days( cube, day_start=day_start, day_end=day_end, model=model ), model=model, ) return cube_sub
[docs] def meridional_mean(cube, model=um): """ Calculate cube's meridional average. Parameters ---------- cube: iris.cube.Cube Cube with a latitude coordinate. model: aeolus.model.Model, optional Model class with a relevant coordinate name. Returns ------- iris.cube.Cube Collapsed cube. """ lat_name = model.y coslat = np.cos(np.deg2rad(cube.coord(lat_name).points)) coslat2d = broadcast_to_shape( coslat, cube.shape, cube.coord_dims(lat_name) ) cube_mean = (cube * coslat2d).collapsed( lat_name, iris.analysis.SUM ) / np.sum(coslat) return cube_mean
[docs] def minmaxdiff(cubelist, name): """ Spatial maximum minus spatial minimum for a given cube. Parameters ---------- cubelist: iris.cube.CubeList Input list of cubes. name: str Cube name. Returns ------- iris.cube.Cube Difference between the extrema with collapsed spatial dimensions. """ _min = spatial(cubelist.extract_cube(name), "min") _max = spatial(cubelist.extract_cube(name), "max") diff = _max - _min diff.rename(f"{name}_difference") return diff
[docs] def normalize_cube(cube): """ Normalize cube data, i.e. make the values range from 0 to 1. .. math:: z_i = (x_i - min(x)) / (max(x) - min(x)) Parameters ---------- cube: iris.cube.Cube The input cube. """ cube_min = cube.collapsed(cube.dim_coords, iris.analysis.MIN) cube_max = cube.collapsed(cube.dim_coords, iris.analysis.MAX) norm = (cube - cube_min) / (cube_max - cube_min) norm.rename(f"normalized_{cube.name()}") return norm
[docs] def region_mean_diff(cubelist, name, region_a, region_b): """ Difference between averages over two regions for a given cube. Parameters ---------- cubelist: iris.cube.CubeList Input list of cubes. name: str Cube name. region_a: aeolus.region.Region First region. region_b: aeolus.region.Region Second region. Returns ------- iris.cube.Cube Difference between the region averages with collapsed spatial dims. """ mean_a = spatial_mean( cubelist.extract_cube(name).extract(region_a.constraint) ) mean_b = spatial_mean( cubelist.extract_cube(name).extract(region_b.constraint) ) diff = mean_a - mean_b diff.rename(f"{name}_mean_diff_{region_a}_{region_b}") return diff
[docs] def spatial(cube, aggr, model=um): """ Calculate spatial statistic with geographic grid weights. Parameters ---------- cube: iris.cube.Cube Cube with longitude and latitude coordinates. aggr: str Statistical aggregator (see iris.analysis for available aggregators). model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- iris.cube.Cube Collapsed cube. Examples -------- >>> spatial(my_data_cube, "mean") """ ensure_bounds(cube, coords=("x", "y"), model=model) coords = (model.y, model.x) flag = all(cube.coord(c).has_bounds() for c in coords) aggregator = getattr(iris.analysis, aggr.upper()) if flag and isinstance(aggregator, iris.analysis.WeightedAggregator): kw = {"weights": area_weights_cube(cube, normalize=True).data} else: kw = {} try: out = cube.collapsed(coords, aggregator, **kw) except CoColErr as e: _warn(f"Caught exception in spatial():\n{e}") # out = iris.util.squeeze(cube) out = cube return out
[docs] def spatial_mean(cube, model=um): """Shortcut for spatial(cube, "mean").""" return spatial(cube, "mean", model=model)
[docs] def spatial_quartiles(cube, model=um): """Calculate quartiles over horizontal coordinates.""" _warn("No weights are applied!") q25 = cube.collapsed( (model.y, model.x), iris.analysis.PERCENTILE, percent=25 ) q75 = cube.collapsed( (model.y, model.x), iris.analysis.PERCENTILE, percent=75 ) return q25, q75
[docs] def time_mean(obj, squeeze=False, model=um): """Time average of a cube or a container with cubes.""" if isinstance(obj, Cube): try: out = obj.collapsed(model.t, iris.analysis.MEAN) except CoColErr as e: _warn(f"Caught exception in time_mean():\n{e}") out = obj if squeeze: out = iris.util.squeeze(out) elif isinstance(obj, Mapping): out = {} for key, cube in obj.items(): out[key] = time_mean(cube, model=model) out = obj.__class__(out) elif isinstance(obj, Iterable): out = [] for cube in obj: out.append(time_mean(cube, model=model)) out = obj.__class__(out) else: raise ArgumentError(f"Unrecognised type of obj: {type(obj)}") return out
[docs] def vertical_mean(cube, weight_by=None, model=um): """ Vertical mean of a cube with optional weighting. Parameters ---------- cube: iris.cube.Cube Cube to average. weight_by: str or iris.coords.Coord or iris.cube.Cube, optional Coordinate of the given cube or another cube used for weighting. model: aeolus.model.Model, optional Model class with a relevant coordinate name. Returns ------- iris.cube.Cube Collapsed cube. """ coord = model.z if len(cube.coord_dims(coord)) == 0: _warn( f"{repr(coord)} does not describe any dimension in {repr(cube)}." ) return cube if weight_by is None: vmean = cube.collapsed(coord, iris.analysis.MEAN) else: if isinstance(weight_by, (str, iris.coords.Coord)): weights = broadcast_to_shape( cube.coord(weight_by).points.squeeze(), cube.shape, cube.coord_dims(weight_by), ) vmean = cube.collapsed(coord, iris.analysis.MEAN, weights=weights) elif isinstance(weight_by, iris.cube.Cube): a_copy = cube.copy() b_copy = weight_by.copy() a_copy.coord(coord).bounds = None b_copy.coord(coord).bounds = None prod = b_copy * a_copy vmean = integrate(prod, coord) / integrate(weight_by, coord) else: raise ArgumentError( f"Unrecognised type of weight_by: {type(weight_by)}" ) vmean.rename(f"vertical_mean_of_{cube.name()}") return vmean
[docs] def zonal_mean(cube, model=um): """ Calculate cube's zonal average. Parameters ---------- cube: iris.cube.Cube Cube with a latitude coordinate. model: aeolus.model.Model, optional Model class with a relevant coordinate name. Returns ------- iris.cube.Cube Collapsed cube. """ lon_name = model.x cube_mean = cube.collapsed(lon_name, iris.analysis.MEAN) return cube_mean