Source code for climind.stats.utils

#  Climate indicator manager - a package for managing and building climate indicator dashboards.
#  Copyright (c) 2022 John Kennedy
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import numpy as np
from typing import List, Union
from climind.data_types.timeseries import TimeSeriesMonthly, TimeSeriesAnnual


[docs] def table_by_year(datasets, match_year: int, years_to_show: int = 20) -> str: out_text = '' for year in range(match_year - years_to_show, match_year + 1): out_line = f'{year} ' for ds in datasets: rank = ds.get_rank_from_year(year) value = ds.get_value_from_year(year) if rank is not None: out_line += f'{value:.2f} ({rank:2d}) ' else: out_line += f'_.__ (__) ' out_text += f'{out_line}\n' return out_text
[docs] def record_margin_table_by_year(datasets, match_year: int, years_to_show: int = 50) -> str: out_text = '' for year in range(match_year - years_to_show, match_year + 1): out_line = f'{year} ' for ds in datasets: first_year, last_year = ds.get_first_and_last_year() index = year - first_year if year > last_year: margin = "XXXXXXXXX" else: subset = ds.df['data'][0: index + 1] latest = ds.df['data'][index] if np.max(subset) == latest: margin = latest - max(ds.df['data'][0:index]) margin = f"{margin:.2f} " else: margin = "---------" out_line += f'{margin} ' out_text += f'{out_line}\n' return out_text
[docs] def get_values(datasets, match_year): all_match_values = [] for ds in datasets: value = ds.get_value_from_year(match_year) if value is not None: all_match_values.append(value) return all_match_values
[docs] def get_ranks(datasets, match_year): all_match_ranks = [] for ds in datasets: rank = ds.get_rank_from_year(match_year) if rank is not None: all_match_ranks.append(rank) return all_match_ranks
[docs] def run_the_numbers(datasets: List[Union[TimeSeriesMonthly, TimeSeriesAnnual]], match_year: int, title: str, output_dir: Path, ipcc_unc: bool = True): """ Given a list of datasets calculate various statistics relating to ranking and values Parameters ---------- datasets: list List of data sets match_year: int Year of interest. Stats will be calculated up to the year of interest, but note that rankings will include years after the year of interest if it is not the most recent year in the data sets title: str name for the file output_dir: Path Returns ------- None """ with open(output_dir / f'{title}_{match_year}.txt', 'w') as output_file: # Table summary of all data sets out_line = 'Year ' for ds in datasets: out_line += f"{ds.metadata['name']:10.10} " output_file.write(f'{out_line}\n') output_file.write(table_by_year(datasets, match_year)) all_match_values = get_values(datasets, match_year) all_match_ranks = get_ranks(datasets, match_year) if len(all_match_values) > 0: sd = np.std(all_match_values) * 1.645 if ipcc_unc: sd = np.sqrt((sd ** 2) + ((0.25 / 2) ** 2)) mean_value = np.mean(all_match_values) min_value = np.min(all_match_values) max_value = np.max(all_match_values) min_rank = np.min(all_match_ranks) max_rank = np.max(all_match_ranks) output_file.write('\n') output_file.write(f'Mean for {match_year}: {mean_value :.2f} +- {sd:.2f} degC ' f'[{min_value :.2f}-{max_value :.2f}]\n') output_file.write(f'(alt rep) Mean for {match_year}: {mean_value :.2f} ' f'[{mean_value - sd:.2f} - {mean_value + sd:.2f}] degC\n') output_file.write(f'Rank between {min_rank} and {max_rank}\n') output_file.write(f'Based on {len(all_match_values)} data sets.\n') output_file.write('\n') output_file.write(f'Mean for {match_year}: {mean_value :.4f} +- {sd:.4f} degC ' f'[{min_value :.4f}-{max_value :.4f}]\n') output_file.write(f'(alt rep) Mean for {match_year}: {mean_value :.4f} ' f'[{mean_value - sd:.4f} - {mean_value + sd:.4f}] degC\n') output_file.write(f'Rank between {min_rank} and {max_rank}\n') output_file.write(f'Based on {len(all_match_values)} data sets.\n') else: output_file.write('NO DATA\n')
[docs] def record_margins(datasets: List[Union[TimeSeriesMonthly, TimeSeriesAnnual]], match_year: int, title: str, output_dir: Path): with open(output_dir / f'{title}_{match_year}.txt', 'w') as output_file: # Table summary of all data sets out_line = 'Year ' for ds in datasets: out_line += f"{ds.metadata['name']:10.10} " output_file.write(f'{out_line}\n') output_file.write(record_margin_table_by_year(datasets, match_year, years_to_show=50))
[docs] def get_latitudes(resolution): """ Generate a "latitude" array running from -90 + half the resolution to 90 - half the resolution. Parameters ---------- resolution: float Resolution of the grid Returns ------- ndarray """ return np.arange(-90.0 + resolution / 2., 90.0 + resolution / 2., resolution)
[docs] def get_n_years_from_n_months(n_months): """ For a given number of months, count the number of full years Parameters ---------- n_months: int Number of months to convert to whole years Returns ------- int Number of whole years """ n_years = int(np.floor(n_months / 12.)) return n_years
[docs] def monthly_to_annual_array(monthly_means): """ Calculate 12-month averages from a monthly array of shape (n_months, 3). For use in the IPCC averaging method Parameters ---------- monthly_means: ndarray(n_months, 3) Returns ------- ndarray(n_years, 3) Returns the annual averages """ annual_means = np.mean(monthly_means.reshape((-1, 12, 3)), axis=1) return annual_means
[docs] def rolling_average(input_array, window_length): """ Calculate a rolling average of specified window_length Parameters ---------- input_array: ndarray input array for which rolling averages are to be calculated window_length: int length of rolling average window Returns ------- ndarray """ out = np.zeros((len(input_array))) out[:] = np.nan for i in range(window_length, len(input_array) + 1): out[i - int(window_length / 2) - 1] = np.mean(input_array[i - window_length:i]) return out