Source code for aeolus.coord

"""Functionality related to coordinates of cubes."""

from datetime import timedelta

from cartopy.util import add_cyclic_point
import dask.array as da
from iris import Constraint
import iris.analysis
import iris.analysis.cartography
from iris.analysis.cartography import wrap_lons
from iris.coord_categorisation import _months_in_season, add_categorised_coord
from iris.coords import AuxCoord, DimCoord
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError as CoNotFound
import iris.fileformats
from iris.util import (
    broadcast_to_shape,
    guess_coord_axis,
    is_regular,
    promote_aux_coord_to_dim_coord,
    squeeze,
)
import numpy as np

from .const import get_planet_radius
from .exceptions import ArgumentError, BadCoordinateError, NotFoundError, _warn
from .meta import const_from_attrs
from .model import um

__all__ = (
    "CoordContainer",
    "add_binned_coord",
    "add_cyclic_point_to_cube",
    "add_planet_calendar",
    "area_weights_cube",
    "check_coords",
    "coarsen_cube",
    "coord_delta_to_cube",
    "coord_to_cube",
    "ensure_bounds",
    "get_cube_datetimes",
    "get_cube_rel_days",
    "get_dim_coord",
    "get_xy_coords",
    "isel",
    "interp_cube_from_height_to_pressure_levels",
    "interp_cubelist_from_height_to_pressure_levels",
    "interp_to_cube_time",
    "nearest_coord_value",
    "not_equal_coord_axes",
    "regrid_3d",
    "replace_z_coord",
    "roll_cube_0_360",
    "roll_cube_pm180",
    "vertical_cross_section_area",
    "volume_weights_cube",
)


def _cell_bounds(points, bound_position=0.5):
    """
    Calculate coordinate cell boundaries.

    Taken from SciTools iris package.

    Parameters
    ----------
    points: numpy.ndarray
        One-dimensional array of uniformy spaced values of shape (M,)
    bound_position: bool, optional
        The desired position of the bounds relative to the position
        of the points.

    Returns
    -------
    bounds: numpy.ndarray
        Array of shape (M+1,)

    Examples
    --------
    >>> a = np.arange(-1, 2.5, 0.5)
    >>> a
    array([-1. , -0.5,  0. ,  0.5,  1. ,  1.5,  2. ])
    >>> cell_bounds(a)
    array([-1.25, -0.75, -0.25,  0.25,  0.75,  1.25,  1.75,  2.25])

    See Also
    --------
    aeolus.coord._cell_centres
    """
    assert points.ndim == 1, "Only 1D points are allowed"
    diffs = np.diff(points)
    if not np.allclose(diffs, diffs[0]):
        _warn("_cell_bounds() assumes uniformly spaced points")
    delta = diffs[0] * bound_position
    bounds = np.concatenate([[points[0] - delta], points + delta])
    return bounds


def _cell_centres(bounds, bound_position=0.5):
    """
    Calculate coordinate cell centres.

    Taken from SciTools iris package.

    Parameters
    ----------
    bounds: numpy.ndarray
        One-dimensional array of cell boundaries of shape (M,)
    bound_position: bool, optional
        The desired position of the bounds relative to the position
        of the points.

    Returns
    -------
    centres: numpy.ndarray
        Array of shape (M-1,)

    Examples
    --------
    >>> a = np.arange(-1, 3., 1.)
    >>> a
    array([-1,  0,  1,  2])
    >>> cell_centres(a)
    array([-0.5,  0.5,  1.5])

    See Also
    --------
    aeolus.coord._cell_bounds
    """
    assert bounds.ndim == 1, "Only 1D points are allowed"
    deltas = np.diff(bounds) * bound_position
    centres = bounds[:-1] + deltas
    return centres


def _is_longitude_global(lon_points):
    """Return True if array of longitudes covers the whole sphere."""
    dx = np.diff(lon_points)[0]  # assume regular grid
    case_0_360 = ((lon_points[0] - dx) <= 0) and ((lon_points[-1] + dx) >= 360)
    case_pm180 = ((lon_points[0] - dx) <= -180) and (
        (lon_points[-1] + dx) >= 180
    )
    return case_0_360 or case_pm180


[docs] class CoordContainer: """ Coordinate container. Attributes ---------- {x,y,z,t}: iris.coord.Coord Coordinates in the respective dimensions """
[docs] def __init__(self, cubes, coord_check=False, model=um): """ Instantiate an `AtmosFlow` object. Parameters ---------- cubes: iris.cube.CubeList Atmospheric fields with necessary coordinates. coord_check: bool, optional Check if all cubes have the same set of coordinates. model: aeolus.model.Model, optional Model class with relevant coordinate and variable names. """ if coord_check: check_coords(cubes) # self.x = cubes[0].coord(model.x) self.model = model for axis in ["x", "y", "z", "t"]: try: setattr(self, axis, cubes[0].coord(axis=axis, dim_coords=True)) except CoNotFound: pass
[docs] def add_binned_coord(cube, coord_name, bins): """ Bin coordinate points and add them as an auxiliary coordinate to a cube. Parameters ---------- cube: iris.cube.Cube Cube with the given coordinate. coord_name: str or iris.coords.Coord Coordinate name. bins: array-like Coordinate bins. Returns ------- iris.cube.Cube """ cube_out = cube.copy() binned_points = np.digitize(cube_out.coord(coord_name).points, bins) binned_points = np.clip(binned_points, 0, len(bins) - 1) new_coord = AuxCoord(binned_points, long_name=f"{coord_name}_binned") cube_out.add_aux_coord(new_coord, cube_out.coord_dims(coord_name)) return cube_out
[docs] def add_cyclic_point_to_cube(cube, coord=um.x): """ Add a cyclic point to a cube and a corresponding coordinate. A wrapper for `cartopy.util.add_cyclic_point()` for iris cubes. Parameters ---------- cube: iris.cube.Cube An n-dimensional cube of data to add a cyclic point to. coord: iris.coords.Coord or str A 1-dimensional coordinate which specifies the coordinate values for the dimension the cyclic point is to be added to. The coordinate values must be regularly spaced. Defaults to the "x"-coordinate. Returns ------- iris.cube.Cube The cube with a cyclic point added. """ the_coord = cube.coord(coord) dim = cube.coord_dims(the_coord) # TODO: use core_data()? cy_data, cy_coord_pnts = add_cyclic_point( cube.data, coord=the_coord.points, axis=dim[0] ) dim_coords_and_dims = [ (coord, cube.coord_dims(coord)) for coord in cube.dim_coords if coord != the_coord ] dim_coords_and_dims.append((the_coord.copy(cy_coord_pnts), dim)) aux_coords_and_dims = [ (coord, cube.coord_dims(coord)) for coord in cube.aux_coords if cube.coord_dims(coord) != dim ] other_kwargs = { key: getattr(cube, key, None) for key in [ "attributes", "standard_name", "long_name", "var_name", "units", "cell_methods", "aux_factories", "cell_measures_and_dims", ] } cyclic_cube = Cube( cy_data, dim_coords_and_dims=dim_coords_and_dims, aux_coords_and_dims=aux_coords_and_dims, **other_kwargs, ) return cyclic_cube
[docs] def add_planet_calendar( cube, time_coord, days_in_year, days_in_month, days_in_day, run_start_day=0, seasons=("djf", "mam", "jja", "son"), planet="planet", ): """ Add an auxiliary time axis with the non-Earth period lengths. Parameters ---------- cube: iris.cube.Cube Input cube. time_coord: iris.coords.Coord or str Original time coordinate of the cube. days_in_year: int or float Number of Earth days in one year on the given planet. days_in_month: int or float Number of Earth days in one month on the given planet. days_in_day: int or float Number of Earth days in one day on the given planet. run_start_day: int or float, optional Earth day of the start of the simulation. seasons: tuple, optional Sequences of letters corresponding to month names. planet: str, optional Name of the planet to be used to name the new coordinate. """ def rel_day(coord, value): """Get the relative number of the day.""" start = coord.units.num2date(coord.points[0]) current = coord.units.num2date(value) iday = run_start_day + (current - start).days return iday def determine_season(coord, value): """Determine season from the month number.""" assert coord.name() == f"{planet}_month" for season in seasons: if value + 1 in _months_in_season(season): return season new_coords = { "year": lambda c, v: rel_day(c, v) // days_in_year, "month": lambda c, v: (rel_day(c, v) % days_in_year) // days_in_month, "day": lambda c, v: (rel_day(c, v) % days_in_month) // days_in_day, "season": determine_season, } for key, op in new_coords.items(): if key == "season": coord = f"{planet}_month" else: coord = time_coord add_categorised_coord(cube, f"{planet}_{key}", coord, op)
[docs] def area_weights_cube(cube, r_planet=None, normalize=False, model=um): """ Create a cube of area weights for an arbitrary planet. Parameters ---------- cube: iris.cube.Cube Cube with longitude and latitude coordinates r_planet: float, optional Radius of the planet (m). If not given, an attempt is made to get it from the cube metadata. normalize: bool, optional Normalize areas. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- iris.cube.Cube Cube of area weights with the same metadata as the input cube """ cube = cube.copy() ensure_bounds(cube, model=model) aw = iris.analysis.cartography.area_weights(cube, normalize=normalize) if normalize: aw = cube.copy(data=aw) aw.rename("normalized_grid_cell_area") aw.units = "1" else: if r_planet is None: r = get_planet_radius(cube) else: r = r_planet aw = cube.copy(data=aw * (r / iris.fileformats.pp.EARTH_RADIUS) ** 2) aw.rename("grid_cell_area") aw.units = "m**2" return aw
[docs] def check_coords(cubes): """Check the cubes coordinates for consistency.""" # get the names of all coords binned into useful comparison groups coord_comparison = iris.analysis._dimensional_metadata_comparison(*cubes) bad_coords = coord_comparison["ungroupable_and_dimensioned"] if bad_coords: raise BadCoordinateError( "Coordinates found in one cube that describe " "a data dimension which weren't in the other " "cube ({}), try removing this coordinate.".format( ", ".join(group.name() for group in bad_coords) ) ) bad_coords = coord_comparison["resamplable"] if bad_coords: raise BadCoordinateError( "Some coordinates are different ({}), " "consider resampling.".format( ", ".join(group.name() for group in bad_coords) ) )
[docs] def coarsen_cube(cube, lon_bins, lat_bins, model=um): """ Block-average cube in longitude and latitude. Note: no weighting is applied! Parameters ---------- cube: iris.cube.Cube Cube with longitude and latitude coordinates. lon_bins: array-like Longitude bins. lat_bins: array-like Latitude bins. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- iris.cube.Cube """ cube_out = cube.copy() coord_names = [model.y, model.x] cube_out = add_binned_coord(cube_out, model.x, lon_bins) cube_out = add_binned_coord(cube_out, model.y, lat_bins) # To avoid oversampling on the edges, # extract subset within the boundaries of target coords for coord, target_points in zip(coord_names, (lat_bins, lon_bins)): cube_out = cube_out.extract( Constraint( **{ coord: lambda p: target_points.min() <= p <= target_points.max() } ) ) for coord in coord_names: cube_out = cube_out.aggregated_by( [f"{coord}_binned"], iris.analysis.MEAN ) for coord, target_points in zip(coord_names, (lat_bins, lon_bins)): dim = cube_out.coord_dims(coord) units = cube_out.coord(coord).units cube_out.remove_coord(coord) aux = cube_out.coord(f"{coord}_binned") new_points = target_points[aux.points] new_coord = DimCoord.from_coord( aux.copy(points=new_points, bounds=None) ) cube_out.remove_coord(f"{coord}_binned") new_coord.rename(coord) new_coord.units = units cube_out.add_dim_coord(new_coord, dim) return cube_out
[docs] def coord_delta_to_cube(cube, coord, normalize=False): """ Convert 1D coord spacings to a cube of the same dim as the given cube. Parameters ---------- cube: iris.cube.Cube Cube containing the coordinate to be broadcast. coord: str or iris.coords.Coord Coordinate to be broadcast. normalize: bool, optional Normalize the data. Returns ------- iris.cube.Cube Cube of broadcast coordinate deltas. """ # Extract the coordinate and its dimension number if isinstance(coord, str): _coord = cube.coord(coord).copy() else: _coord = coord.copy() dim_map = cube.coord_dims(_coord.name()) # Ensure the coordinate has bounds if not _coord.has_bounds(): _coord.guess_bounds() _data = _coord.bounds[:, 1] - _coord.bounds[:, 0] if normalize: _data_max = np.nanmax(np.abs(_data)) _data /= _data_max units = "1" prefix = "normalized_" else: units = _coord.units prefix = "" if len(dim_map) > 0: _data = broadcast_to_shape(_data, cube.shape, dim_map) dc = [(c.copy(), cube.coord_dims(c)) for c in cube.dim_coords] ac = [(c.copy(), cube.coord_dims(c)) for c in cube.aux_coords] new_cube = Cube( data=_data, units=units, dim_coords_and_dims=dc, aux_coords_and_dims=ac, ) else: new_cube = Cube(data=_data, units=units) new_cube.rename(f"{prefix}delta_{_coord.name()}") return new_cube
[docs] def coord_to_cube(cube, coord, broadcast=True): """ Convert coordinate points to a cube with the same dimensions. Parameters ---------- cube: iris.cube.Cube Cube containing the coordinate to be broadcast. coord: str or iris.coords.Coord Coordinate to be broadcast. broadcast: bool, optional Broadcast the coordinate points to the shape of the cube. Returns ------- iris.cube.Cube Cube of broadcast coordinate. """ if isinstance(coord, str): _coord = cube.coord(coord) else: _coord = coord kw = { "units": _coord.units, "long_name": _coord.long_name, "standard_name": _coord.standard_name, "var_name": _coord.var_name, } dim_map = cube.coord_dims(_coord.name()) _data = _coord.points if len(dim_map) > 0 and broadcast: _data = broadcast_to_shape(_data, cube.shape, dim_map) dc = [(c.copy(), cube.coord_dims(c)) for c in cube.dim_coords] ac = [(c.copy(), cube.coord_dims(c)) for c in cube.aux_coords] new_cube = Cube( data=_data, dim_coords_and_dims=dc, aux_coords_and_dims=ac, **kw ) else: new_cube = Cube( data=_data, dim_coords_and_dims=[(_coord.copy(), 0)], **kw ) return new_cube
[docs] def ensure_bounds(cube, coords=("x", "y"), model=um): """Auto-generate bounds for cube coordinates.""" for coord in coords: try: c_name = getattr(model, coord) except (AttributeError, TypeError): c_name = coord c = cube.coord(c_name) if not c.has_bounds(): if len(c.points) > 1: c.guess_bounds()
[docs] def get_cube_datetimes(cube, model=um): """ Convert points of a cube's time coordinate to datetimes. Parameters ---------- cube: iris.cube.Cube Cube containing a time coordinate. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- numpy.ndarray Array of datetime-like objects. """ return cube.coord(model.t).units.num2date(cube.coord(model.t).points)
[docs] def get_cube_rel_days(cube, model=um): """ Convert points of a cube's time coordinate to relative number of days. Parameters ---------- cube: iris.cube.Cube Cube containing a time coordinate. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- numpy.ndarray Array of relative days. """ dts = get_cube_datetimes(cube, model=model) days = ((dts - dts[0]) / timedelta(days=1)).astype(np.float64) return days
[docs] def get_dim_coord(cube, axis): """ Return a coordinate from a cube based on the axis it represents. Uses :py:func:`iris.util.guess_coord_axis` to heuristically match a dimensional coordinate with the requested axis. Adapted from https://github.com/LSaffin/iris-extensions Parameters ---------- cube: iris.cube.Cube Cube with the desired coordinate. axis: str The co-ordinate axis to take from the cube. Must be one of X, Y, Z, T. Returns ------- iris.coords.DimCoord The dim coordinate matching the requested axis on the given cube. Raises ------ ArgumentError: If axis is not one of {X, Y, Z, T}. NotFoundError: If the cube does not contain a coord with the given axis. """ _allowed = ["X", "Y", "Z", "T"] axis = axis.upper() # If the axis supplied is not correct raise an error if axis not in _allowed: raise ArgumentError( f"Axis must be one of {_allowed}, {axis} is given." ) # Loop over dimensional coords in the cube for coord in cube.dim_coords: # Return the coordinate if it matches the axis if axis == guess_coord_axis(coord): return coord # If no coordinate matches raise an error raise NotFoundError(f"Cube has no coordinate for axis {axis}")
[docs] def get_xy_coords(cube, model=um): """ Return X and Y coords tuple using names from a given `model` container. Parameters ---------- cube: iris.cube.Cube Input cube. model: aeolus.model.Model, optional Model class with relevant coordinate and variable names. """ y_coord = cube.coord(model.y) x_coord = cube.coord(model.x) return (x_coord, y_coord)
[docs] @const_from_attrs(strict=False) def interp_cube_from_height_to_pressure_levels( variable, pressure, levels, p_ref_frac=False, const=None, interpolator=None, model=um, ): """ Interpolate a cube from height to pressure level(s). Parameters ---------- variable: iris.cube.Cube A cube to interpolate. Must have the same dimensions as the pressure cube. pressure: iris.cube.Cube A cube of pressure. levels: array-like Sequence of pressure levels (same units as the units of pressure cube in `cubelist`). p_ref_frac: bool, optional If True, `levels` are used as fractions of the surface pressure. const: aeolus.const.const.ConstContainer, optional Required if p_ref_frac=True. If not given, constants are attempted to be retrieved from attributes of a cube in the cube list. interpolator: callable or None The interpolator to use when computing the interpolation. See `relevel()` docs for more. model: aeolus.model.Model, optional Model class with relevant variable names. Returns ------- iris.cube.Cube The variable interpolated to pressure level(s). """ from iris.experimental import stratify if p_ref_frac: p_ref = const.reference_surface_pressure.copy() p_ref.convert_units(pressure.units) p_tgt = np.atleast_1d(levels) * float(p_ref.data) else: p_tgt = np.atleast_1d(levels) cube_plev = stratify.relevel( variable, pressure, p_tgt, axis=model.z, interpolator=interpolator ) cube_plev.coord(model.pres).attributes = {} return squeeze(cube_plev)
[docs] @const_from_attrs(strict=False) def interp_cubelist_from_height_to_pressure_levels( cubelist, levels, p_ref_frac=False, const=None, interpolator=None, model=um, include_pressure=False, ): """ Interpolate all cubes to the given set of pressure levels. Pressure variable is found using the `model.pres` name and then optionally excluded from the output list of interpolated cubes. Parameters ---------- cubelist: iris.cube.CubeList List of cubes, including a cube of pressure. levels: array-like Sequence of pressure levels (same units as the units of pressure cube in `cubelist`). p_ref_frac: bool, optional If True, `levels` are used as fractions of the surface pressure. const: aeolus.const.const.ConstContainer, optional Required if p_ref_frac=True. If not given, constants are attempted to be retrieved from attributes of a cube in the cube list. interpolator: callable or None The interpolator to use when computing the interpolation. See `relevel()` docs for more. model: aeolus.model.Model, optional Model class with relevant variable names. include_pressure: bool, optional If True, include the pressure cube in the output. Returns ------- iris.cube.CubeList List of cubes interpolated to pressure level(s). """ pres = cubelist.extract_cube(model.pres) cl_out = CubeList() for cube in cubelist: if include_pressure or cube != pres: cube_plev = interp_cube_from_height_to_pressure_levels( cube, pres, levels, p_ref_frac=p_ref_frac, const=const, interpolator=interpolator, model=model, ) cl_out.append(cube_plev) return cl_out
[docs] def interp_to_cube_time(cube_src, cube_tgt, model=um): """ Interpolate `cube_src` to `cube_tgt` along the time dimension (`model.t`). Linear interpolation is used. Forecast period is copied from `cube_tgt`. Parameters ---------- cube_src: iris.cube.Cube Cube to interpolate. cube_tgt: iris.cube.Cube Cube with time dimension to interpolate to. model: aeolus.model.Model, optional Model class with relevant variable names. Returns ------- iris.cube.Cube Cube with the time dimension equal to `cube_tgt`. """ target = [(model.t, cube_tgt.coord(model.t).points)] out = cube_src.interpolate(target, iris.analysis.Linear()) # Replace time coordinates, because interpolation removes bounds. for coord in ["t", "fcst_prd", "fcst_ref"]: try: out.replace_coord(cube_tgt.coord(getattr(model, coord))) except (AttributeError, CoNotFound): pass return out
[docs] def isel(obj, coord, idx, skip_not_found=None): """ Slice cubes by an index of a coordinate (index-selector). Parameters ---------- obj: iris.cube.Cube or iris.cube.CubeList Cube or list of cubes. coord: str or iris.coords.Coord Coordinate for selection. idx: int Index along the given coordinate. skip_not_found: bool or str, optional Skip if coordinate not found. By default it is active when dealing with a cube list and inactive if dealing with a single cube. Returns ------- iris.cube.Cube or iris.cube.CubeList Slice along the coordinate. """ if isinstance(obj, Cube): try: # _coord = obj.coord(coord) # val = _coord.points[idx] # try: # val = _coord.units.num2date(val) # except ValueError: # pass # constr = Constraint(**{_coord.name(): lambda x: x.point == val}) # out = obj.extract(constr) val = obj.coord(coord)[idx] out = squeeze(obj.subset(val)) except CoNotFound as e: if skip_not_found: out = obj else: raise e elif isinstance(obj, (list, set, tuple)): out = [] for cube in obj: out.append( isel( cube, coord, idx, skip_not_found=( (skip_not_found is None) or skip_not_found ), ) ) out = obj.__class__(out) return out
[docs] def nearest_coord_value(cube, coord_name, val): """ Get the nearest value of a coordinate. Parameters ---------- cube: iris.cube.Cube Cube with the coordinate coord_name: str or iris.coords.Coord Coordinate where to look the nearest point up val: int or float The value to find Returns ------- int or float Element of the coordinate array closest to the given `val`. """ coord = cube.coord(coord_name) i = coord.nearest_neighbour_index(val) return coord.points[i]
[docs] def not_equal_coord_axes(cube1, cube2): """Given 2 cubes, return axes of unequal dimensional coordinates.""" coord_comp = iris.analysis._dimensional_metadata_comparison(cube1, cube2) neq_dim_coords = set(coord_comp["not_equal"]).intersection( set(coord_comp["dimensioned"]) ) dims = [] for coord_pair in neq_dim_coords: for coord in coord_pair: dims.append(guess_coord_axis(coord)) return set(filter(None, dims))
[docs] def regrid_3d(cube, target, model=um): """ Regrid a cube in the horizontal and in the vertical. Adapted from https://github.com/LSaffin/iris-extensions Parameters ---------- cube: iris.cube.Cube The cube to be regridded. target: iris.cube.Cube The cube to regrid to. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- iris.cube.Cube """ neq_axes = not_equal_coord_axes(cube, target) if neq_axes.intersection(["X", "Y"]): cube = cube.regrid(target, iris.analysis.Linear()) # Interpolate in the vertical if needed if "Z" in neq_axes: vert_coord = model.z if vert_coord is None: z = get_dim_coord(target, "Z") else: z = target.coord(vert_coord) cube = cube.interpolate([(z.name(), z.points)], iris.analysis.Linear()) ensure_bounds(cube, coords=[z]) # Match coordinate information # XXX is this needed? # newcube = target.copy(data=cube.data) # newcube.rename(cube.name()) # newcube.units = cube.units # Put back correct time information # for coord in newcube.aux_coords: # if iris.util.guess_coord_axis(coord) == "T": # newcube.remove_coord(coord) # for coord in cube.aux_coords: # if iris.util.guess_coord_axis(coord) == "T": # newcube.add_aux_coord(coord) return cube
[docs] def replace_z_coord(cube, model=um): """ Replace dimensional vertical coordinate. Parameters ---------- cube: iris.cube.Cube Input cube. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- iris.cube.Cube Copy of the input cube with a new vertical coordinate. """ new_cube = cube.copy() new_cube.coord(model.z).bounds = None promote_aux_coord_to_dim_coord(new_cube, model.z) ensure_bounds(new_cube, coords=[model.z]) for coord in [model.s, model.lev]: # By default, model levels and sigma coordinates are removed. try: new_cube.remove_coord(coord) except CoNotFound: pass return new_cube
[docs] def roll_cube_0_360(cube_in, add_shift=0, model=um): """ Take a cube spanning -180...180 degrees and roll it to 0...360 degrees. Works with global model output, and in some cases for regional. Parameters ---------- cube: iris.cube.Cube Cube with longitude and latitude coordinates. add_shift: int Additional shift of data (is not applied to the longitude coordinate). model: aeolus.model.Model, optional Model class with a relevant longitude coordinate name. Returns ------- iris.cube.Cube See Also -------- aeolus.coord.roll_cube_pm180 """ cube = cube_in.copy() coord_name = model.x # get the name of the longitude coordinate lon = cube.coord(coord_name) if (lon.points < 0.0).any(): add = 180 cube.data = da.roll( cube.core_data(), len(lon.points) // 2 + add_shift, axis=-1 ) if lon.has_bounds(): bounds = lon.bounds + add else: bounds = None new_points = lon.points + add cube.replace_coord(lon.copy(points=new_points, bounds=bounds)) return cube
[docs] def roll_cube_pm180(cube_in, add_shift=0, model=um): """ Take a cube spanning 0...360 degrees and roll it to -180...180 degrees. Works with global model output, and in some cases for regional. Parameters ---------- cube: iris.cube.Cube Cube with longitude and latitude coordinates. add_shift: int Additional shift of data (is not applied to the longitude coordinate). model: aeolus.model.Model, optional Model class with a relevant longitude coordinate name. Returns ------- iris.cube.Cube See Also -------- aeolus.coord.roll_cube_0_360 """ cube = cube_in.copy() coord_name = model.x # get the name of the longitude coordinate xcoord = cube.coord(coord_name) if (xcoord.points >= 0.0).all(): assert is_regular( xcoord ), "Operation is only valid for a regularly spaced coordinate." if _is_longitude_global(xcoord.points): # Shift data symmetrically only when dealing with global cubes cube.data = da.roll( cube.core_data(), len(xcoord.points) // 2 + add_shift, axis=-1 ) if xcoord.has_bounds(): bounds = wrap_lons(xcoord.bounds, -180, 360) # + subtract bounds = bounds[bounds[:, 0].argsort(axis=0)] else: bounds = None cube.replace_coord( xcoord.copy( points=np.sort(wrap_lons(xcoord.points, -180, 360)), bounds=bounds, ) ) else: # Nothing to do, the cube is already centered on 0 longitude # unless there is something wrong with longitude msg = ( f"Incorrect {coord_name} values: " f"from {xcoord.points.min()} to {xcoord.points.max()}" ) assert ( (xcoord.points >= -180.0) & (xcoord.points <= 180.0) ).all(), msg return cube
[docs] def vertical_cross_section_area(cube2d, r_planet=None): """Create a cube of vertical cross-section areas in metres.""" cube2d = cube2d.copy() if r_planet is None: r = get_planet_radius(cube2d) else: r = r_planet m_per_deg = (np.pi / 180) * r if guess_coord_axis(cube2d.dim_coords[1]) == "X": m_per_deg *= np.cos(np.deg2rad(cube2d.coord(axis="Y").points[0])) for dim_coord in cube2d.dim_coords: if not dim_coord.has_bounds(): dim_coord.guess_bounds() x_bounds = cube2d.coord(cube2d.dim_coords[1]).bounds z_bounds = cube2d.coord(cube2d.dim_coords[0]).bounds vc_area = cube2d.copy( data=(z_bounds[:, 1] - z_bounds[:, 0])[:, None] * ((x_bounds[:, 1] - x_bounds[:, 0])[None, :] * m_per_deg) ) vc_area.units = "m**2" vc_area.rename("vertical_section_area") for dim_coord in vc_area.dim_coords: dim_coord.bounds = None return vc_area
[docs] def volume_weights_cube(cube, r_planet=None, normalize=False, model=um): """ Create a cube of volume weights from a grid of a given cube. Parameters ---------- cube: iris.cube.Cube Cube with longitude, latitude and height coordinates r_planet: float, optional Radius of the planet (m). If not given, an attempt is made to get it from the cube metadata. normalize: bool, optional Normalize the data. model: aeolus.model.Model, optional Model class with relevant coordinate names. Returns ------- iris.cube.Cube Cube of area weights with the same metadata as the input cube """ area_cube = area_weights_cube( cube, r_planet=r_planet, normalize=normalize, model=model ) height_deltas = coord_delta_to_cube(cube, model.z, normalize=normalize) volume = area_cube * height_deltas if normalize: volume.rename("normalized_volume_weights") volume.convert_units("1") else: volume.rename("volume_weights") volume.convert_units("m**3") return volume