# Climate indicator manager - a package for managing and building climate indicator dashboards.
# Copyright (c) 2022 John Kennedy
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy
import pandas as pd
import pkg_resources
import itertools
import xarray as xa
import numpy as np
import logging
import regionmask
from pathlib import Path
from datetime import datetime
from typing import List, Tuple, Callable
from climind.data_manager.metadata import CombinedMetadata
import climind.data_types.timeseries as ts
[docs]
def get_1d_transfer(
zero_point_original: float,
grid_space_original: float,
zero_point_target: float,
grid_space_target: float,
index_in_original: int
) -> tuple:
"""
Find the overlapping grid spacings for a new grid based on an index in the old grid
Parameters
----------
zero_point_original: float
longitude or latitude of the zero-indexed grid cell
grid_space_original: float
grid spacing in degrees
zero_point_target: float
longitude or latitude of the zero-indexed grid cells in the targe grid
grid_space_target: float
grid spacing in degrees of the target grid
index_in_original: int
index of the gridcell in the original grid
Returns
-------
tuple
Returns, the longitude of the first grid cell in the new grid, the number of steps,
and the first and last indices on the new grid.
"""
llon = zero_point_target + index_in_original * grid_space_target
hlon = zero_point_target + (index_in_original + 1) * grid_space_target
mlon_lo = (llon - zero_point_original) / grid_space_original
mlon_hi = (hlon - zero_point_original) / grid_space_original
lonindexlo = int(np.floor(mlon_lo))
lonindexhi = int(np.floor(mlon_hi))
final_cell = (hlon - (zero_point_original + lonindexhi * grid_space_original)) / grid_space_original
if final_cell == 0:
lonindexhi -= 1
final_cell = (hlon - (zero_point_original + lonindexhi * grid_space_original)) / grid_space_original
nlonsteps = lonindexhi - lonindexlo + 1
transfer_lon = np.zeros((nlonsteps)) + 1.0
if nlonsteps == 1:
transfer_lon[0] = grid_space_target / grid_space_original
else:
transfer_lon[0] = ((zero_point_original + (lonindexlo + 1) * grid_space_original) - llon) / grid_space_original
transfer_lon[nlonsteps - 1] = final_cell
return transfer_lon, nlonsteps, lonindexlo, lonindexhi
[docs]
def simple_regrid(ingrid: np.ndarray, lon0: float, lat0: float, dx: float, target_dy: float) -> np.ndarray:
"""
Perform a simple regridding, using a simple average of grid cells from the original grid
that fall within the target grid cell.
Parameters
----------
ingrid: np.ndarray
Starting grid which we want to regrid
lon0: float
Longitude of zero-indexed grid cell in longitudinal direction
lat0: float
Latitude of zero-indexed grid cell in latitudinal direction
dx: float
Grid spacing in degrees
target_dy: float
Target grid spacing
Returns
-------
np.ndarray
Returns regridded array.
"""
nlat = int(180 / target_dy)
nlon = int(360 / target_dy)
outgrid = np.zeros((nlat, nlon))
for xlon, ylat in itertools.product(range(nlon), range(nlat)):
# Longitudes
y0 = -180.0
transfer_lon, nlonsteps, lolon, hilon = get_1d_transfer(lon0, dx, y0, target_dy, xlon)
# Latitudes
y0 = -90.0
transfer_lat, nlatsteps, lolat, hilat = get_1d_transfer(lat0, dx, y0, target_dy, ylat)
transfer_lon = np.repeat(np.reshape(transfer_lon, (1, nlonsteps)), nlatsteps, 0)
transfer_lat = np.repeat(np.reshape(transfer_lat, (nlatsteps, 1)), nlonsteps, 1)
transfer = transfer_lat * transfer_lon
outgrid[ylat, xlon] = np.sum(ingrid[lolat:hilat + 1, lolon:hilon + 1] * transfer) / np.sum(transfer)
return outgrid
[docs]
def make_xarray(target_grid, times, latitudes, longitudes, variable: str = 'tas_mean') -> xa.Dataset:
"""
Make a xarray Dataset for a regular lat-lon grid from a numpy grid (ntime, nlat, nlon),
and arrays of time (ntime), latitude (nlat) and longitude (nlon).
Parameters
----------
target_grid: np.ndarray
numpy array of shape (ntime, nlat, nlon)
times: np.ndarray
Array of times, shape (ntime)
latitudes: np.ndarray
Array of latitudes, shape (nlat)
longitudes: np.ndarray
Array of longitudes, shape (nlon)
variable: str
Variable name
Returns
-------
xa.Dataset
Dataset built from the input components
"""
ds = xa.Dataset({
variable: xa.DataArray(
data=target_grid,
dims=['time', 'latitude', 'longitude'],
coords={'time': times, 'latitude': latitudes, 'longitude': longitudes},
attrs={'long_name': '2m air temperature', 'units': 'K'}
)
},
attrs={'project': 'NA'}
)
return ds
[docs]
def make_standard_grid(out_grid: np.ndarray, start_date: datetime, freq: str, number_of_times: int) -> xa.Dataset:
"""
Make the standard 5x5 grid from a numpy array, start date, temporal frequency and
number of time steps.
Parameters
----------
out_grid: np.ndarray
Numpy array containing the data. Shape should be (number_of_times, 36, 72)
start_date: datetime
Date of the first time step
freq: str
Temporal frequency
number_of_times: int
Number of time steps, should match the first dimension of the out_grid
Returns
-------
xa.Dataset
xarray Dataset containing the data in out_grid with the specified temporal
frequency and number of time steps
"""
times = pd.date_range(start=start_date, freq=freq, periods=number_of_times)
latitudes = np.linspace(-87.5, 87.5, 36)
longitudes = np.linspace(-177.5, 177.5, 72)
dataset = make_xarray(out_grid, times, latitudes, longitudes)
return dataset
[docs]
def rank_array(in_array: np.ndarray) -> int:
"""
Rank array
Parameters
----------
in_array: np.ndarray
Array to be ranked
Returns
-------
int
"""
in_array[np.isnan(in_array)] = -9999.9999
ntime = len(in_array)
temp = in_array.argsort()
ranks = np.empty_like(temp)
ranks[temp] = np.arange(ntime)
rank = ntime - ranks
return rank
[docs]
class GridMonthly:
"""
A :class:`GridMonthly` combines an xarray Dataset with a
:class:`.CombinedMetadata` to bring together data and
metadata in one object. It represents monthly averages of data on a regular grid.
"""
def __init__(self, input_data: xa.Dataset, metadata: CombinedMetadata):
"""
Create a :class:'.GridMonthly` object from an xarray
Dataset and a :class:`.CombinedMetadata` object.
Parameters
----------
input_data: xa.Dataset
xarray dataset
metadata: CombinedMetadata
CombinedMetadata object
"""
self.df = input_data
if metadata is None:
self.metadata = {"name": "", "history": []}
else:
self.metadata = metadata
self.metadata.dataset['last_month'] = str(self.get_last_month())
[docs]
def get_last_month(self) -> datetime:
"""
Get the date of the last month in the dataset
Returns
-------
datetime
Date of the last month in the dataset
"""
year = self.df.time.dt.year.data[-1]
month = self.df.time.dt.month.data[-1]
day = self.df.time.dt.day.data[-1]
last_month = datetime(year, month, day)
return last_month
[docs]
def get_start_year(self) -> int:
"""
Get the first year in the dataset
Returns
-------
int
First year in the dataset
"""
start_year = self.df.time.dt.year.data[0]
return start_year
[docs]
def get_end_year(self) -> int:
"""
Get the last year in the dataset
Returns
-------
int
Last year in the dataset
"""
start_year = self.df.time.dt.year.data[-1]
return start_year
[docs]
def rebaseline(self, first_year: int, final_year: int) -> xa.Dataset:
"""
Change the baseline of the data to the period between first_year and final_year by
subtracting the average of the available data between those two years (inclusive).
Parameters
----------
first_year: int
First year of climatology period
final_year: int
Final year of climatology period
Returns
-------
xa.Dataset
Changes the dataset in place, but also returns the dataset if needed
"""
dsg = self.df.groupby('time.month')
gb = self.df.sel(time=slice(f'{first_year}-01-01', f'{final_year}-12-31')).groupby('time.month')
clim = gb.mean(dim='time')
anom = dsg - clim
self.df = anom
self.metadata['climatology_start'] = first_year
self.metadata['climatology_end'] = final_year
self.metadata['actual'] = False
self.metadata['derived'] = True
self.update_history(f'Rebaselined to {first_year}-{final_year}')
return self
[docs]
def make_annual(self):
"""
Calculate an annual average from a monthly grid by taking the arithmetic mean of
available monthly anomalies.
Returns
-------
GridAnnual
Return annual average of the grid
"""
dsg = self.df.groupby('time.year').mean(dim='time')
annual = GridAnnual(dsg, self.metadata)
annual.update_history('Calculated annual average')
annual.metadata['time_resolution'] = 'annual'
annual.metadata['derived'] = True
return annual
[docs]
def select_year_and_month(self, year: int, month: int):
"""
Select a particular month from the data set and throw away the rest.
Parameters
----------
year: int
Year of selection
month: int
Month of selection
Returns
-------
GridMonthly
Returns a :class:`GridMonthly` containing only data within the specified year range.
"""
self.df = self.df.sel(time=slice(f'{year}-{month:02d}-01', f'{year}-{month:02d}-28'))
self.update_history(f'Selected single month {month:02d}/{year}')
return self
[docs]
def select_period(self, start_year: int, start_month: int, end_year: int, end_month: int):
"""
Select a period from the grid specifed by start year and month and end year and month, inclusive.
Parameters
----------
start_year: int
Year of start date
start_month: int
Month of start date
end_year: int
Year of end date
end_month: int
Month of end date
Returns
-------
GridMonthly
Returns a :class:`GridMonthly` containing only data within the specified date range.
"""
self.df = self.df.sel(time=slice(f'{start_year}-{start_month:02d}-01', f'{end_year}-{end_month:02d}-28'))
self.update_history(f'Selected period from {start_month:02d}/{start_year} to {end_month:02d}/{end_year}')
return self
[docs]
def calculate_time_mean(self, cumulative=False):
"""
Calculate the time mean of the map
Returns
-------
GridMonthly
Returns a :class:`GridMonthly` containing the time mean of the data.
"""
if cumulative:
time_mean = self.df.sum(dim='time', keepdims=True)
else:
time_mean = self.df.mean(dim='time', keepdims=True)
main_variable_list = list(time_mean.keys())
if len(main_variable_list) > 1:
raise RuntimeError('Cant take time mean of xarray dataset with more than one variable')
main_variable = main_variable_list[0]
# save the start time and create a time axis (time meaning destroys this)
start_year = self.df.time.dt.year.data[0]
start_month = self.df.time.dt.month.data[0]
times = pd.date_range(start=f'{start_year}-{start_month:02d}-01', freq='1MS', periods=1)
# Extract dimensions and the main variable
latitudes = self.df.latitude.data
longitudes = self.df.longitude.data
target_grid = time_mean[main_variable].data
# Make the output grid and update the metadata
out_array = make_xarray(target_grid, times, latitudes, longitudes, variable=main_variable)
output_grid = GridMonthly(out_array, copy.deepcopy(self.metadata))
output_grid.update_history('Calculated time mean')
return output_grid
[docs]
def calculate_regional_average(self, regions, region_number, land_only=True) -> ts.TimeSeriesMonthly:
"""
Calculate a regional average from the grid. The region is specified by a geopandas
Geodataframe and the index (region_number) of the chosen shape. By default, the output
is masked to land areas only, this can be switched off by setting land_only to False.
Parameters
----------
regions: Geodataframe
geopandas Geodataframe specifying the region to be average over
region_number: int
the index of the particular region in the Geodataframe
land_only: bool
By defauly output is masked to land areas only, to calculate a full area average set
land_only to False
Returns
-------
ts.TimeSeriesMonthly
Returns time series of area averages.
"""
mask = regionmask.mask_3D_geopandas(regions,
self.df.longitude,
self.df.latitude, drop=False, overlap=True)
r1 = mask.sel(region=region_number)
selected_variable = self.df.tas_mean.where(r1)
if land_only:
land_110 = regionmask.defined_regions.natural_earth_v5_0_0.land_110
land_mask = land_110.mask_3D(self.df.longitude, self.df.latitude)
land_mask = land_mask.sel(region=0)
selected_variable = selected_variable.where(land_mask)
weights = np.cos(np.deg2rad(selected_variable.latitude))
regional_ts = selected_variable.weighted(weights).mean(dim=("latitude", "longitude"))
# It's such a struggle extracting time information from these blasted xarrays
years = regional_ts.time.dt.year.data.tolist()
months = regional_ts.time.dt.month.data.tolist()
data = regional_ts.values.tolist()
timeseries_metadata = copy.deepcopy(self.metadata)
timeseries_metadata['type'] = 'timeseries'
timeseries_metadata['history'].append('Calculated area-average')
out_series = ts.TimeSeriesMonthly(years, months, data, timeseries_metadata)
return out_series
[docs]
def calculate_regional_average_missing(self, regions, region_number, threshold=0.3,
land_only=True, ocean_only=False) -> ts.TimeSeriesMonthly:
"""
Calculate a regional average from the grid. The region is specified by a geopandas
Geodataframe and the index (region_number) of the chosen shape. By default, the output
is masked to land areas only, this can be switched off by setting land_only to False.
Parameters
----------
regions: Geodataframe
geopandas Geodataframe specifying the region to be average over
region_number: int
the index of the particular region in the Geodataframe
threshold: float
If the area covered by data in the region drops below this threshold then NaN is returned.
land_only: bool
By defauly output is masked to land areas only, to calculate a full area average set
land_only to False
Returns
-------
ts.TimeSeriesMonthly
Returns time series of area averages.
"""
if land_only and ocean_only:
raise RuntimeError('Selected both land_only and ocean_only. This combination is not allowed.')
main_variable_list = list(self.df.keys())
main_variable = main_variable_list[0]
mask = regionmask.mask_3D_geopandas(regions,
self.df.longitude,
self.df.latitude, drop=False, overlap=True)
r1 = mask.sel(region=region_number)
selected_variable = self.df[main_variable].where(r1)
# copy the variable array, set values to one and missing data to zero then mask that
missing = copy.deepcopy(self.df[main_variable])
replacer = (np.isnan(missing.data))
missing.data[:, :, :] = 1.0
missing.data[replacer] = 0.0
missing = missing.where(r1)
if land_only:
land_110 = regionmask.defined_regions.natural_earth_v5_0_0.land_110
land_mask = land_110.mask_3D(self.df.longitude, self.df.latitude)
land_mask = land_mask.sel(region=0)
selected_variable = selected_variable.where(land_mask)
missing = missing.where(land_mask)
if ocean_only:
land_110 = regionmask.defined_regions.natural_earth_v5_0_0.land_110
land_mask = land_110.mask_3D(self.df.longitude, self.df.latitude)
land_mask = land_mask.sel(region=0)
land_mask.data = ~land_mask.data
selected_variable = selected_variable.where(land_mask)
missing = missing.where(land_mask)
# Calculate the area weighted average
weights = np.cos(np.deg2rad(selected_variable.latitude))
regional_ts = selected_variable.weighted(weights).mean(dim=("latitude", "longitude"))
missing_ts = missing.weighted(weights).mean(dim=("latitude", "longitude"))
regional_ts[missing_ts < threshold] = np.nan
# It's such a struggle extracting time information from these blasted xarrays
years = regional_ts.time.dt.year.data.tolist()
months = regional_ts.time.dt.month.data.tolist()
data = regional_ts.values.tolist()
timeseries_metadata = copy.deepcopy(self.metadata)
timeseries_metadata['type'] = 'timeseries'
timeseries_metadata['history'].append('Calculated area-average')
out_series = ts.TimeSeriesMonthly(years, months, data, timeseries_metadata)
return out_series
[docs]
def update_history(self, message: str) -> None:
"""
Update the history metadata with a message.
Parameters
----------
message : str
Message to be added to history
Returns
-------
None
"""
self.metadata['history'].append(message)
[docs]
class GridAnnual:
"""
A :class:`GridAnnual` combines an xarray Dataset with a
:class:`.CombinedMetadata` to bring together data and
metadata in one object. It represents annual averages of data.
"""
def __init__(self, input_data, metadata: CombinedMetadata):
"""
Create an annual gridded data set from an xarray Dataset and
:class:`.CombinedMetadata` object.
Parameters
----------
input_data: xa.Dataset
xarray dataset
metadata: CombinedMetadata
CombinedMetadata object
"""
self.df = input_data
if metadata is None:
self.metadata = {"name": "", "history": []}
else:
self.metadata = metadata
[docs]
def update_history(self, message: str) -> None:
"""
Update the history metadata
Parameters
----------
message : str
Message to be added to history
Returns
-------
None
"""
self.metadata['history'].append(message)
[docs]
def write_grid(self, filename: Path, metadata_filename: Path = None, name: str = None) -> None:
"""
Write the grid to file.
Parameters
----------
filename: Path
Filename to write grid to
metadata_filename: Path
Filename to write metadata to
name: str
Optional name to give the data set being written. Note that names should be unique in any
data archive.
Returns
-------
None
"""
if metadata_filename is not None:
if name is not None:
self.metadata['name'] = name
self.metadata['filename'] = [str(filename.name)]
self.metadata['url'] = [""]
self.metadata['reader'] = "reader_standard_grid"
self.metadata['fetcher'] = "fetcher_no_url"
self.metadata['history'].append(f"Wrote to file {str(filename.name)}")
self.metadata.write_metadata(metadata_filename)
self.df.to_netcdf(filename, format="NETCDF4")
[docs]
def select_year_range(self, start_year: int, end_year: int):
"""
Select a particular range of consecutive years from the data set and throw away the rest.
Parameters
----------
start_year: int
First year of selection
end_year: int
Final year of selction
Returns
-------
GridAnnual
Returns a :class:`GridAnnual` containing only data within the specified year range.
"""
self.df = self.df.where(self.df['year'] >= start_year, drop=True)
self.df = self.df.where(self.df.year <= end_year, drop=True)
self.update_history(f'Selected year range {start_year} to {end_year}')
return self
[docs]
def get_year_range(self, start_year: int, end_year: int):
"""
Select a range of consecutive years from the data set.
Parameters
----------
start_year: int
start year
end_year: int
end year
Returns
-------
GridAnnual
Returns a :class:`GridAnnual` containing only data within the specified year range.
"""
out = copy.deepcopy(self)
out.df = out.df.where(out.df['year'] >= start_year, drop=True)
out.df = out.df.where(out.df.year <= end_year, drop=True)
out.update_history(f'Selected year range {start_year} to {end_year}')
return out
[docs]
def rank(self):
"""
Return a data set where the values are the ranks of each grid cell value.
Returns
-------
GridAnnual
Return a :class:`GridAnnual` containing the values as ranks from highest (1) to lowest.
"""
output = copy.deepcopy(self)
out_grid = np.zeros(output.df['tas_mean'].data.shape)
for xx, yy in itertools.product(range(72), range(36)):
selection = output.df['tas_mean'].data[:, yy, xx]
rank = rank_array(selection)
out_grid[:, yy, xx] = rank
output.df['tas_mean'].data = out_grid
return output
[docs]
def get_start_year(self) -> int:
"""
Get the first year in the dataset
Returns
-------
int
First year in the dataset
"""
start_date = self.df.year.data[0]
return start_date
[docs]
def get_end_year(self) -> int:
"""
Get the last year in the dataset
Returns
-------
int
Last year in the data set
"""
end_date = self.df.year.data[-1]
return end_date
[docs]
def running_average(self, n_year: int):
"""
Calculate an n_year running average of the data in the dataset
Parameters
----------
n_year: int
Number of years for which the running average is calculated
Returns
-------
GridAnnual
Annual gridded dataset which contains the running averages
"""
self.df['tas_mean'] = self.df['tas_mean'].rolling(year=n_year).mean()
self.update_history(f'Calculate rolling {n_year}-year average')
return self
[docs]
def get_start_and_end_year(all_datasets: List[GridAnnual]) -> Tuple[int, int]:
"""
Given a list of :class:`GridAnnual` datasets, find the earliest start year and the latest end year
Parameters
----------
all_datasets: List[GridAnnual]
List of datasets for which we want to find the first and last year
Returns
-------
Tuple[int, int]
"""
start_dates = []
end_dates = []
for ds in all_datasets:
start_dates.append(ds.get_start_year())
end_dates.append(ds.get_end_year())
start_date = min(start_dates)
end_date = max(end_dates)
return start_date, end_date
[docs]
def process_datasets(all_datasets: List[GridAnnual], grid_type: str) -> GridAnnual:
"""
Calculate the median or range (depending on selected type) of a list of :class:`GridAnnual` data sets.
Medians are calculated on a grid cell by grid cell basis based on all available data in the
list of data sets.
Parameters
----------
all_datasets: List[GridAnnual]
list of GridAnnual data sets
grid_type: str
Either 'median' or 'range'
Returns
-------
GridAnnual
Data set containing the median (or half-range) values from all the data sets supplied
"""
start_date, end_date = get_start_and_end_year(all_datasets)
number_of_years = end_date - start_date + 1
# create a dataset from the earliest start date to the latest end date
out_grid = np.zeros((number_of_years, 36, 72))
# for each time step calculate the median of available data sets.
for year in range(start_date, start_date + number_of_years):
n_data_sets = len(all_datasets)
stack = np.zeros((n_data_sets, 36, 72))
for i, ds in enumerate(all_datasets):
temp_df = ds.get_year_range(year, year)
if temp_df.df['tas_mean'].data.shape[0] != 0:
stack[i, :, :] = temp_df.df['tas_mean'].data[0, :, :]
else:
stack[i, :, :] = np.nan
for xx, yy in itertools.product(range(72), range(36)):
select = stack[:, yy, xx]
if grid_type == 'median':
out_grid[year - start_date, yy, xx] = np.median(select[~np.isnan(select)])
elif grid_type == 'range':
full_range = np.max(select[~np.isnan(select)]) - np.min(select[~np.isnan(select)])
out_grid[year - start_date, yy, xx] = full_range / 2.
dataset = make_standard_grid(out_grid, str(start_date), '1YS', number_of_years)
dataset = dataset.groupby('time.year').mean(dim='time')
dataset = GridAnnual(dataset, all_datasets[0].metadata)
return dataset
[docs]
def range_of_datasets(all_datasets: List[GridAnnual]) -> GridAnnual:
"""
Calculate the half-range of a list of :class:`GridAnnual` data sets
Parameters
----------
all_datasets: List[GridAnnual]
List of :class:`GridAnnual` datasets from which the ranges will be calculated.
Returns
-------
GridAnnual
"""
return process_datasets(all_datasets, 'range')