Source code for climind.data_types.timeseries

#  Climate indicator manager - a package for managing and building climate indicator dashboards.
#  Copyright (c) 2022 John Kennedy
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Tuple, List, Callable, Union
import warnings
import pandas as pd
import numpy as np
import logging
import copy
import pkg_resources
from abc import ABC, abstractmethod
from pathlib import Path
from functools import reduce
from jinja2 import Environment, FileSystemLoader, select_autoescape
from datetime import datetime
import cftime as cf
from climind.data_manager.metadata import CombinedMetadata
from climind.definitions import ROOT_DIR
from statsmodels.nonparametric.smoothers_lowess import lowess


[docs] def log_activity(in_function: Callable) -> Callable: """ Decorator function to log name of function run and with which arguments. This aims to provide some traceability in the output. Parameters ---------- in_function: Callable The function to be decorated Returns ------- Callable """ def wrapper(*args, **kwargs): logging.info(f"Running: {in_function.__name__}") msg = [] for a in args: if isinstance(a, TimeSeries): logging.info(f"on {a.metadata['name']}") msg.append(str(a)) if len(msg) > 0: logging.info("With arguments:") logging.info(', '.join(msg)) msg = [] for k in kwargs: msg.append(str(k)) if len(msg) > 0: logging.info("And keyword arguments:") logging.info(', '.join(msg)) return in_function(*args, **kwargs) return wrapper
[docs] class TimeSeries(ABC): """ A base class for representing time series data sets. Note that this class should not generally be used and only its subclasses :class:`TimeSeriesMonthly`, :class:`TimeSeriesAnnual` and :class:`TimeSeriesIrregular` should be used. This class contains shared functionality from these classes but does not work on its own. """ def __init__(self, metadata: CombinedMetadata = None): self.df = None if metadata is None: self.metadata = {"name": "", "history": []} else: self.metadata = metadata
[docs] def select_year_range(self, start_year: int, end_year: int): """ Select consecutive years in the specified range and throw away the rest. Parameters ---------- start_year: int First year in the selected range end_year: int Final year in the selected range Returns ------- TimeSeries Return time series which only contains years in the specified range """ self.df = self.df[self.df['year'] >= start_year] self.df = self.df[self.df['year'] <= end_year] self.df = self.df.reset_index() self.update_history(f'Selected years within the range {start_year} to {end_year}.') return self
[docs] def manually_set_baseline(self, baseline_start_year: int, baseline_end_year: int) -> None: """ Manually set baseline. This changes the baseline in the metadata, but does not change the data themselves. Parameters ---------- baseline_start_year: int Start of baseline period baseline_end_year: int End of baseline period Returns ------- None """ # update attributes self.metadata['climatology_start'] = baseline_start_year self.metadata['climatology_end'] = baseline_end_year self.metadata['actual'] = False self.update_history(f'Manually changed baseline to {baseline_start_year}-{baseline_end_year}. ' f'Note that data values remain unchanged.')
[docs] def get_first_and_last_year(self) -> Tuple[int, int]: """ Get the first and last year in the series Returns ------- Tuple[int, int] first and last year """ first_year = self.df['year'].tolist()[0] last_year = self.df['year'].tolist()[-1] return first_year, last_year
[docs] def update_history(self, message: str) -> None: """ Update the history metadata Parameters ---------- message : str Message to be added to history Returns ------- None """ self.metadata['history'].append(message)
[docs] def add_offset(self, offset: float) -> None: """ Add an offset to the data set. Parameters ---------- offset : float offset to be added to all values in the data set. Returns ------- None """ self.df['data'] = self.df['data'] + offset self.metadata['derived'] = True self.update_history(f'Added offset of {offset} to all data values.')
[docs] def write_generic_csv(self, filename: Path, metadata_filename: Path, monthly: bool, uncertainty: bool, irregular: bool, columns_to_write: List[str]) -> None: """ Write the dataset out into csv format Parameters ---------- filename: Path Path of the csv file to which the data will be written. metadata_filename: Path Path of the json file to which the data will be written. monthly: bool Set to True for monthly data uncertainty: bool Set to True to print uncertainties irregular: bool Set to True for irregular data columns_to_write: List[str] List of the columns from the dataframe to be written to the data file Returns ------- None """ if metadata_filename is not None: self.metadata['filename'] = [str(filename.name)] self.metadata['url'] = [""] self.metadata['reader'] = "reader_badc_csv" self.metadata['fetcher'] = "fetcher_no_url" self.update_history(f"Wrote to file {str(filename.name)}") self.metadata.write_metadata(metadata_filename) now = datetime.today() climind_version = pkg_resources.get_distribution("climind").version time_units = 'days since 1800-01-01 00:00:00.0' self.df['time'] = self.generate_dates(time_units) # populate template to make webpage env = Environment( loader=FileSystemLoader(ROOT_DIR / "climind" / "data_types" / "jinja_templates"), autoescape=select_autoescape() ) template = env.get_template("badc_boilerplate.jinja2") rendered = template.render(now=now, climind_version=climind_version, metadata=self.metadata, monthly=monthly, irregular=irregular, time_units=time_units, uncertainty=uncertainty) with open(filename, 'w') as f: f.write(rendered) f.write(self.df.to_csv(index=False, lineterminator='\n', float_format='%.4f', header=False, columns=columns_to_write)) f.write("end data\n")
[docs] @abstractmethod def get_string_date_range(self) -> str: """ Create a string which specifies the date range covered by the time series Returns ------- str """
[docs] class TimeSeriesIrregular(TimeSeries): """ A :class:`TimeSeriesIrregular` combines a pandas Dataframe with a :class:`.CombinedMetadata` to bring together data and metadata in one object. It represents non-monthly, non-annual averages of data such as weekly, or 5-day averages. """ def __init__(self, years: List[int], months: List[int], days: List[int], data: List[float], metadata: CombinedMetadata = None, uncertainty: Optional[List[float]] = None): """ Create :class:`TimeSeriesIrregular` object. Parameters ---------- years: List[int] List of integers specifying the year of each data point months: List[int] List of integers specifying the month of each data point days: List[int] List of integers specifying the day of each data point data: List[float] List of floats with the data values metadata: CombinedMetadata CombinedMetadata object holding the metadata for the dataset uncertainty: List[float] List of floats with the uncertainty values for each data point """ super().__init__(metadata) dico = {'year': years, 'month': months, 'day': days, 'data': data} if uncertainty is not None: dico['uncertainty'] = uncertainty self.df = pd.DataFrame(dico) self.df['date'] = pd.to_datetime( dict( year=self.df['year'], month=self.df['month'], day=self.df['day'] ) ) def __str__(self) -> str: out_str = f'TimeSeriesIrregular: {self.metadata["name"]}' return out_str
[docs] def fill_daily(self) -> None: """ Ensure that a daily time series has data for every day between the start and end years. Returns ------- None """ self.df = self.df.set_index('date') start_date, end_date = self.get_start_and_end_dates() start_year = start_date.year final_year = end_date.year t_index = pd.DatetimeIndex(pd.date_range(start=f'{start_year}-01-01', end=f'{final_year}-12-31', freq='D')) df_rsmpld = self.df.reindex(t_index, method=None) self.df = df_rsmpld self.df.year = t_index.year self.df.month = t_index.month self.df.day = t_index.day self.update_history(f"Time series expanded with NaN to include all days between {start_year} and {final_year}")
[docs] def get_climatology(self, climatology_start_year, climatology_end_year): # Calculate climatology and fill out repeating climatology to full length of series df2 = self.df[self.df['year'] >= climatology_start_year] df2 = df2[df2['year'] <= climatology_end_year] climatology = df2.groupby([df2.index.month, df2.index.day]).mean() climatology = climatology.data[zip(self.df.index.month, self.df.index.day)] climatology.index = self.df.index climatology_stdev = df2.groupby([df2.index.month, df2.index.day]).std() climatology_stdev = climatology_stdev.data[zip(self.df.index.month, self.df.index.day)] climatology_stdev.index = self.df.index return climatology, climatology_stdev
[docs] def make_monthly(self): """ Calculate a :class:`TimeSeriesMonthly` from the :class:`TimeSeriesIrregular`. The monthly average is calculated from the mean of values within the month. Returns ------- TimeSeriesMonthly Return a :class:`TimeSeriesMonthly` containing the monthly averages. """ self.df['yearmonth'] = 100 * self.df['year'] + self.df['month'] grouped_data = self.df.groupby(['yearmonth'])['data'].mean().reset_index() grouped_years = self.df.groupby(['yearmonth'])['year'].mean().reset_index() grouped_months = self.df.groupby(['yearmonth'])['month'].mean().reset_index() grouped_months = grouped_months['month'].tolist() grouped_data = grouped_data['data'].tolist() grouped_years = grouped_years['year'].tolist() grouped_months = list(map(int, grouped_months)) grouped_years = list(map(int, grouped_years)) monthly_series = TimeSeriesMonthly(grouped_years, grouped_months, grouped_data, self.metadata) monthly_series.update_history('Calculated monthly average from values using arithmetic mean ' 'of all dates that fall within each month') # update attributes monthly_series.metadata['time_resolution'] = 'monthly' monthly_series.metadata['derived'] = True return monthly_series
[docs] def get_start_and_end_dates(self) -> Tuple[datetime, datetime]: """ Get the first and last dates in the dataset Returns ------- Tuple[datetime, datetime] """ time_str = self.df.year.astype(str) + \ self.df.month.map('{:02d}'.format) + \ self.df.day.map('{:02d}'.format) self.df['time'] = pd.to_datetime(time_str, format='%Y%m%d') n_time = len(self.df['time']) start_date = self.df['time'].iloc[0] end_date = self.df['time'].iloc[n_time - 1] return start_date, end_date
[docs] def generate_dates(self, time_units: str) -> List[int]: """ Given a string specifying the required time units (something like days since 1800-01-01 00:00:00.0), generate a list of times from the time series corresponding to those units. Parameters ---------- time_units: str String specifying the units to use for generating the times e.g. "days since 1800-01-01 00:00:00.0" Returns ------- List[int] """ time_str = self.df.year.astype(str) + self.df.month.map('{:02d}'.format) + self.df.day.map('{:02d}'.format) self.df['time'] = pd.to_datetime(time_str, format='%Y%m%d') dates = cf.date2num(self.df['time'].tolist(), units=time_units, has_year_zero=False, calendar='standard') return dates
[docs] def write_csv(self, filename: Path, metadata_filename: Path = None) -> None: """ Write the timeseries to a csv file with the specified filename. The format used for writing is given by the BADC CSV format. This has a lot of upfront metadata before the data section. An option for writing a metadata file is also provided. Parameters ---------- filename: Path Path of the filename to write the data to metadata_filename: Path Path of the filename to write the metadata to Returns ------- None """ monthly = False uncertainty = False irregular = True columns_to_write = ['time', 'year', 'month', 'day', 'data'] super().write_generic_csv(filename, metadata_filename, monthly, uncertainty, irregular, columns_to_write)
[docs] def get_year_axis(self) -> List[float]: """ Return a year in which all dates are represented as decimal years. January 1st 1984 is 1984.00. Returns ------- List[float] List of dates represented as decimal years. """ year_axis = self.df['year'] + (self.df['month'] - 1) / 12. + (self.df['day'] - 1) / 365. return year_axis
[docs] def get_string_date_range(self) -> str: """ Create a string which specifies the date range covered by the :class:`TimeSeriesIrregular` in the format YYYY.MM.DD-YYYY.MM.DD Returns ------- str String that specifies the date range covered """ start_date, end_date = self.get_start_and_end_dates() date_range = f"{start_date.year}.{start_date.month:02d}.{start_date.day:02d}-" \ f"{end_date.year}.{end_date.month:02d}.{end_date.day:02d}" return date_range
[docs] def zero_on_year(self, baseline_year): df_copy = copy.deepcopy(self.df) df_copy = df_copy.set_index('date') df2 = df_copy[df_copy['year'] == baseline_year] min_value = -1 * df2.data.iloc[0] self.update_history(f"Zeroed at first time step of {baseline_year}.") self.add_offset(min_value)
[docs] def rebaseline(self, baseline_start_year, baseline_end_year) -> None: """ Shift the time series to a new baseline, specified by start and end years (inclusive). Each day is rebaselined separately, allowing for changes in seasonality. If years are incomplete, this might give a different result to the annual and monthly versions. Parameters ---------- baseline_start_year : int The first year of the climatology period baseline_end_year : int The last year of the climatology period Returns ------- None Action occurs in place """ df_copy = copy.deepcopy(self.df) df_copy = df_copy.set_index('date') df2 = df_copy[df_copy['year'] >= baseline_start_year] df2 = df2[df2['year'] <= baseline_end_year] climatology = df2.groupby([df2.index.month, df2.index.day]).mean() climatology = climatology.data[zip(df_copy.index.month, df_copy.index.day)] climatology.index = df_copy.index df_copy.data = df_copy.data - climatology df_copy = df_copy.reset_index() self.df['data'] = df_copy['data'] # update attributes self.metadata['climatology_start'] = baseline_start_year self.metadata['climatology_end'] = baseline_end_year self.metadata['actual'] = False self.update_history( f'Rebaselined to {baseline_start_year}-{baseline_end_year} for each month separately by calculating the ' f'arithmetic mean of the data over the baseline period and subtracting the mean from all data values. ' f'This is done for each month separately (Januarys, Februarys etc).' )
[docs] def lowess(self, number_of_points: int = 60): """ Lowess smooth the series Parameters ---------- number_of_points: int Number of points to use in the lowess smoother Returns ------- """ moving_average = copy.deepcopy(self) snippet = self.df.data[:] time = self.get_year_axis()[:] fraction_of_data = number_of_points / len(snippet) fit = lowess(snippet, time, fraction_of_data) # Smoothing is different at ends of series (effectively extrapolation) so terminate half filter width from ends fit = fit[:, 1] # fit[0: int(number_of_points / 2)] = np.nan # fit[-1 * int(number_of_points / 2):] = np.nan moving_average.df.data = fit moving_average.update_history( f'Calculated lowess smoothed series with {fraction_of_data} of data used for each fit') moving_average.metadata['derived'] = True return moving_average
[docs] class TimeSeriesMonthly(TimeSeries): """ A :class:`TimeSeriesMonthly` combines a pandas Dataframe with a :class:`CombinedMetadata` to bring together data and metadata in one object. It represents monthly averages of data. """ def __init__(self, years: List[int], months: List[int], data: List[float], metadata: CombinedMetadata = None, uncertainty: Optional[List[float]] = None): """ Create :class:`TimeSeriesMonthly` object. Parameters ---------- years : List[int] List of years months : List[int] List of months data : List[float] List of data values metadata : CombinedMetadata CombinedMetadata object containing the metadata uncertainty: Optional[List[float]] Attributes ---------- df : pd.DataFrame Pandas dataframe used to contain the time and data information. metadata : dict Dictionary containing metadata. The only guaranteed entry is "history" """ super().__init__(metadata) dico = {'year': years, 'month': months, 'data': data} if uncertainty is not None: dico['uncertainty'] = uncertainty self.df = pd.DataFrame(dico) if self.metadata is not None: _, end_date = self.get_start_and_end_dates() self.metadata.dataset['last_month'] = str(end_date) def __str__(self) -> str: out_str = f'TimeSeriesMonthly: {self.metadata["name"]}' return out_str
[docs] @staticmethod def make_from_df(df: pd.DataFrame, metadata: CombinedMetadata): """ Create a :class:`TimeSeriesMonthly` from a pandas data frame. Parameters ---------- df : pd.DataFrame Pandas dataframe containing columns 'year' 'month' and 'data' (optionally 'uncertainty') metadata : dict Dictionary containing the metadata Returns ------- TimeSeriesMonthly :class:`TimeSeriesMonthly` built from input components. """ years = df['year'].tolist() months = df['month'].tolist() data = df['data'].tolist() if 'uncertainty' in df.columns: uncertainty = df['uncertainty'].tolist() return TimeSeriesMonthly(years, months, data, metadata, uncertainty=uncertainty) else: return TimeSeriesMonthly(years, months, data, metadata)
[docs] def change_end_month(self, year, month): self.df = self.df[self.df.year * 100 + self.df.month < year * 100 + month + 1] _, end_date = self.get_start_and_end_dates() self.metadata.dataset['last_month'] = str(end_date)
[docs] def make_annual(self, cumulative: bool = False): """ Calculate a :class:`TimeSeriesAnnual` from the :class:`TimeSeriesMonthly`. The annual average is calculated from the mean of available monthly values Parameters ---------- cumulative : bool Set to true to sum rather than average the monthly values to get the annual value. Returns ------- TimeSeriesAnnual Return a :class:`TimeSeriesAnnual` object containing the annual averages. """ if cumulative: grouped = self.df.groupby(['year'])['data'].sum().reset_index() else: if 'uncertainty' in self.df.columns: grouped = self.df.groupby(['year'])[['data', 'uncertainty']].mean().reset_index() else: grouped = self.df.groupby(['year'])['data'].mean().reset_index() annual_series = TimeSeriesAnnual.make_from_df(grouped, self.metadata) if cumulative: annual_series.update_history('Calculated annual value from monthly values by summing') else: annual_series.update_history('Calculated annual average from monthly averages using arithmetic mean') # update attributes annual_series.metadata['time_resolution'] = 'annual' annual_series.metadata['derived'] = True return annual_series
[docs] def make_annual_by_selecting_month(self, month: int): """ Calculate a :class:`TimeSeriesAnnual` from the :class:`TimeSeriesMonthly`. The annual value is taken from one of the monthly values specified by the user. Returns ------- TimeSeriesAnnual Return a :class:`TimeSeriesAnnual` object containing only the selected month from each year. """ month_names = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] grouped = self.df[self.df['month'] == month].reset_index() annual_series = TimeSeriesAnnual.make_from_df(grouped, self.metadata) annual_series.metadata['history'].append( f'Calculated annual series by extracting {month_names[month - 1]} from each year' ) # update attributes annual_series.metadata['time_resolution'] = 'annual' return annual_series
[docs] def calculate_climatology(self, baseline_start_year, baseline_end_year): # select part of series in climatology period climatology_part = self.df[(self.df['year'] >= baseline_start_year) & (self.df['year'] <= baseline_end_year)] # calculate monthly climatology climatology = climatology_part.groupby(['month'])['data'].mean().reset_index() climatology.rename(columns={'data': 'climatology'}, inplace=True) return climatology
[docs] def rebaseline(self, baseline_start_year, baseline_end_year) -> None: """ Shift the time series to a new baseline, specified by start and end years (inclusive). Each month is rebaselined separately, allowing for changes in seasonality. If years are incomplete, this might give a different result to the annual version. Parameters ---------- baseline_start_year : int The first year of the climatology period baseline_end_year : int The last year of the climatology period Returns ------- None Action occurs in place """ if 'climatology' in self.df.columns: self.df = self.df.drop('climatology', axis=1) # select part of series in climatology period climatology_part = self.df[(self.df['year'] >= baseline_start_year) & (self.df['year'] <= baseline_end_year)] # calculate monthly climatology climatology = climatology_part.groupby(['month'])['data'].mean().reset_index() climatology.rename(columns={'data': 'climatology'}, inplace=True) # join climatology to main time series self.df = pd.merge(self.df, climatology, on='month', how='left', validate='m:m') # subtract climatology self.df['data'] = self.df['data'] - self.df['climatology'] # update attributes self.metadata['climatology_start'] = baseline_start_year self.metadata['climatology_end'] = baseline_end_year self.metadata['actual'] = False self.update_history( f'Rebaselined to {baseline_start_year}-{baseline_end_year} for each month separately by calculating the ' f'arithmetic mean of the data over the baseline period and subtracting the mean from all data values. ' f'This is done for each month separately (Januarys, Februarys etc).' )
[docs] def get_value(self, year: int, month: int) -> Optional[float]: """ Get the current value for a particular year and month Parameters ---------- year: int Year for which the value is required. month: int Month for which the value is required. Returns ------- Optional[float] Value for the specified year and month or None if it does not exist """ selection = self.df[(self.df['year'] == year) & (self.df['month'] == month)] if len(selection) == 0: out_value = None elif len(selection) == 1: out_value = selection['data'].values[0] else: raise KeyError(f"Selection is not unique {year} {month}") return out_value
[docs] def get_uncertainty(self, year: int, month: int) -> Optional[float]: """ Get the current uncertainty for a particular year and month Parameters ---------- year: int Year for which the uncertainty is required. month: int Month for which the uncertainty is required. Returns ------- Optional[float] Value for the specified year and month or None if it does not exist """ if 'uncertainty' not in self.df.columns: return None selection = self.df[(self.df['year'] == year) & (self.df['month'] == month)] if len(selection) == 0: out_value = None elif len(selection) == 1: out_value = selection['uncertainty'].values[0] else: raise KeyError(f"Selection is not unique {year} {month}") return out_value
[docs] def zero_on_month(self, year: int, month: int) -> None: """ Zero data set on the value for a single month in a single year by substracting the value for that month from all values in the dataset. Parameters ---------- year: int Year of the month on which the data will be zeroed. month: int Month of the month on which the data will be zeroed. Returns ------- None """ month_names = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] zero_value = -1. * self.get_value(year, month) self.update_history(f'Zeroed series on {month_names[month - 1]} {year} by subtracting the value for that ' f'month from all data values (see next entry)') self.add_offset(zero_value) self.manually_set_baseline(year, year)
[docs] def get_rank_from_year_and_month(self, year: int, month: int, versus_all_months=False) -> Optional[int]: """ Given a year and month, extract the rank of the data for that month. Ties are given the same rank, which is the lowest rank of the group. Default behaviour is to rank the month against the same month in all other years. Setting all to True as a keyword argument ranks the month against all other months in all other years. Parameters ---------- year : int Year of year-month pair for which we want the rank month : int Month of year-month pair for which we want the rank versus_all_months : bool If set then the ranking is done for the monthly value relative to all other months. Returns ------- int Returns the rank of the specified year-month pair as compared to the same month in all other years. If "versus_all_months" is set then returns rank of the anomaly for a particular year and month ranked against all other years and months. """ if versus_all_months: month_select = self.df else: month_select = self.df[self.df['month'] == month] ranked = month_select.rank(method='min', ascending=False) rank = ranked[(month_select['year'] == year) & (month_select['month'] == month)]['data'] if len(rank) > 0: return int(rank.iloc[0]) else: return None
[docs] def generate_dates(self, time_units: str) -> List[int]: """ Given a string specifying the required time units (something like days since 1800-01-01 00:00:00.0), generate a list of times from the time series corresponding to those units. Parameters ---------- time_units: str String specifying the units to use for generating the times e.g. "days since 1800-01-01 00:00:00.0" Returns ------- List[int] """ time_str = self.df.year.astype(str) + self.df.month.astype(str) self.df['time'] = pd.to_datetime(time_str, format='%Y%m') dates = cf.date2num(self.df['time'].tolist(), units=time_units, has_year_zero=False, calendar='standard') return dates
[docs] def write_csv(self, filename: Path, metadata_filename: Path = None) -> None: """ Write the :class:`TimeSeriesMonthly` to a csv file with the specified filename. The format used for writing is given by the BADC CSV format. This has a lot of upfront metadata before the data section. An option for writing a metadata file is also provided. Parameters ---------- filename: Path Path of the filename to write the data to metadata_filename: Path Path of the filename to write the metadata to Returns ------- None """ columns_to_write = ['time', 'year', 'month', 'data'] monthly = True uncertainty = False irregular = False super().write_generic_csv(filename, metadata_filename, monthly, uncertainty, irregular, columns_to_write)
[docs] def get_start_and_end_dates(self) -> Tuple[datetime, datetime]: """ Get the first and last dates in the dataset Returns ------- Tuple[datetime, datetime] Start and end dates. """ time_str = self.df.year.astype(int).astype(str) + self.df.month.astype(int).astype(str) self.df['time'] = pd.to_datetime(time_str, format='%Y%m') n_time = len(self.df['time']) start_date = self.df['time'][0] end_date = self.df['time'][n_time - 1] return start_date, end_date
[docs] def get_year_axis(self) -> List[float]: """ Return a year axis as decimal year. 1st January 1984 is 1984.00. Returns ------- List[float] List of dates expressed as a decimal year. """ year_axis = self.df['year'] + (self.df['month'] - 1) / 12. return year_axis
[docs] def get_string_date_range(self) -> str: """ Create a string which specifies the date range covered by the :class:`TimeSeriesMonthly` in the format YYYY.MM-YYYY.MM Returns ------- str String that specifies the date range covered """ start_date, end_date = self.get_start_and_end_dates() date_range = f"{start_date.year}.{start_date.month:02d}-" \ f"{end_date.year}.{end_date.month:02d}" return date_range
[docs] def running_mean(self, run_length: int, centred: bool = False): """ Calculate running mean of the data for a specified run length Parameters ---------- run_length : int length of the run centred: bool Set to True to centre the times associated to the data points, otherwise the time used will be the last time in the n-year run. Returns ------- TimeSeriesMonthly :class:`TimeSeriesMonthly` containing running averages of length run_length. Where there are too few years to calculate a running average, np.nan appears in the data column of the data frame """ moving_average = copy.deepcopy(self) moving_average.df['data'] = moving_average.df['data'].rolling(run_length).mean() if centred: moving_average.df['year'] = moving_average.df['year'].rolling(run_length).mean() moving_average.df.dropna(how='any', inplace=True) if centred: moving_average.update_history( f'Calculated {run_length}-month moving average centred on the middle month of the period') else: moving_average.update_history(f'Calculated {run_length}-month trailing moving average') moving_average.metadata['derived'] = True return moving_average
[docs] def lowess(self, number_of_points: int = 60): """ Lowess smooth the series Parameters ---------- number_of_points: int Number of points to use in the lowess smoother Returns ------- """ moving_average = copy.deepcopy(self) snippet = self.df.data[:] time = self.get_year_axis()[:] fraction_of_data = number_of_points / len(snippet) fit = lowess(snippet, time, fraction_of_data) # Smoothing is different at ends of series (effectively extrapolation) so terminate half filter width from ends fit = fit[:, 1] # fit[0: int(number_of_points / 2)] = np.nan # fit[-1 * int(number_of_points / 2):] = np.nan moving_average.df.data = fit moving_average.update_history( f'Calculated lowess smoothed series with {fraction_of_data} of data used for each fit') moving_average.metadata['derived'] = True return moving_average
[docs] class TimeSeriesAnnual(TimeSeries): """ A :class:`TimeSeriesAnnual` combines a pandas Dataframe with a :class:`.CombinedMetadata` to bring together data and metadata in one object. It represents annual averages of data. """ def __init__(self, years: list, data: list, metadata=None, uncertainty: Optional[list] = None): """ Create :class:`TimeSeriesAnnual` object from its components. Parameters ---------- years : list List of years data : list List of data values metadata : CombinedMetadata Dictionary containing the metadata Attributes ---------- df : pd.DataFrame Pandas dataframe containing the time and data information metadata : dict Dictionary containing the metadata. The only guaranteed entry is 'history' """ super().__init__(metadata) dico = {'year': years, 'data': data} if uncertainty is not None: dico['uncertainty'] = uncertainty self.df = pd.DataFrame(dico) def __str__(self): out_str = f'TimeSeriesAnnual: {self.metadata["name"]}' return out_str
[docs] @staticmethod def make_from_df(df: pd.DataFrame, metadata: CombinedMetadata): """ Create a :class:`TimeSeriesAnnual` from a pandas data frame. Parameters ---------- df : pd.DataFrame Pandas dataframe containing columns 'year' and 'data' metadata : dict Dictionary containing the metadata Returns ------- TimeSeriesAnnual :class:`TimeSeriesAnnual` created from the elements in the dataframe and metadata. """ years = df['year'].tolist() data = df['data'].tolist() if 'uncertainty' in df.columns: uncertainty = df['uncertainty'].tolist() return TimeSeriesAnnual(years, data, metadata, uncertainty=uncertainty) else: return TimeSeriesAnnual(years, data, metadata)
[docs] def rebaseline(self, baseline_start_year: int, baseline_end_year: int) -> None: """ Shift the :class:`TimeSeriesAnnual` to a new baseline, specified by start and end years (inclusive). Parameters ---------- baseline_start_year : int First year of the climatology period baseline_end_year : int Last year of the climatology period Returns ------- None Action occurs in place. """ # select part of series in climatology period climatology_part = self.df[(self.df['year'] >= baseline_start_year) & (self.df['year'] <= baseline_end_year)] # calculate monthly climatology climatology = climatology_part['data'].mean() # subtract climatology self.df['data'] = self.df['data'] - climatology # update attributes self.metadata['climatology_start'] = baseline_start_year self.metadata['climatology_end'] = baseline_end_year self.metadata['actual'] = False self.update_history(f'Rebaselined to {baseline_start_year}-{baseline_end_year} by subtracting the arithemtic ' f'mean for that period from all data values.')
[docs] def time_average(self, start_year, end_year) -> float: # select part of series in climatology period climatology_part = self.df[(self.df['year'] >= start_year) & (self.df['year'] <= end_year)] # calculate monthly climatology climatology = climatology_part['data'].mean() return climatology
[docs] def get_rank_from_year(self, year: int) -> Optional[int]: """ Given a year, extract the rank of the data for that year. Ties are given the same rank, which is the lowest rank of the group. Parameters ---------- year : int Year for which we want the rank Returns ------- Optional[int] Rank of specified year or None if year is not available. """ ranked = self.df.rank(method='min', ascending=False) rank = ranked[self.df['year'] == year]['data'] if len(rank) == 0: return None return int(rank.iloc[0])
[docs] def get_value_from_year(self, year: int) -> Optional[float]: """ Get the data value for a specified year. Parameters ---------- year : int Year for which a value is desired Returns ------- Optional[float] Value for the year, or None if year is not in the data set """ val = self.df[self.df['year'] == year]['data'] if len(val) == 0: return None return val.iloc[0]
[docs] def get_uncertainty_from_year(self, year: int) -> Optional[float]: """ Get the data value for a specified year. Parameters ---------- year : int Year for which a value is desired Returns ------- Optional[float] Uncertainty for the year, or None if year is not in the data set """ if 'uncertainty' not in self.df.columns: return None val = self.df[self.df['year'] == year]['uncertainty'] if len(val) == 0: return None return val.iloc[0]
[docs] def get_year_from_rank(self, rank: int) -> List[int]: """ Given a particular rank, extract a list of years which match that rank. Returns a list because years can (theoretically) be tied with each other. Rank 1 corresponds to the highest value in the dataset. Parameters ---------- rank : int Rank for which we want the year which has that rank Returns ------- List[int] List of years that have the specified rank """ ranked = self.df.rank(method='min', ascending=False) years = self.df[ranked['data'] == rank]['year'].tolist() return years
[docs] def running_mean(self, run_length: int, centred: bool = False): """ Calculate running mean of the data for a specified run length Parameters ---------- run_length : int length of the run centred: bool Set to True to centre the times associated to the data points, otherwise the time used will be the last time in the n-year run. Returns ------- TimeSeriesAnnual :class:`TimeSeriesAnnual` containing running averages of length run_length. Where there are too few years to calculate a running average, np.nan appears in the data column of the data frame """ moving_average = copy.deepcopy(self) moving_average.df['data'] = moving_average.df['data'].rolling(run_length).mean() if centred: moving_average.df['year'] = moving_average.df['year'].rolling(run_length).mean() moving_average.df.dropna(how='any', inplace=True) if centred: moving_average.update_history( f'Calculated {run_length}-year moving average centred on the middle year of the period') else: moving_average.update_history(f'Calculated {run_length}-year moving average') moving_average.metadata['derived'] = True return moving_average
[docs] def running_trend(self, run_length: int): """ Calculate a smoothed series by fitting a straight line to the past 30 years of data and taking the final point as the data value instead Parameters ---------- run_length: int Number of years for which the trend should be calculated Returns ------- TimeSeriesAnnual :class:`TimeSeriesAnnual` containing the end point of trends of length run_length. Where there are too few years to calculate a trend, np.nan appears in the data column of the data frame """ moving_average = copy.deepcopy(self) moving_average.df.data[0:run_length] = np.nan for i in range(run_length - 1, len(self.df.data)): snippet = self.df.data[i - run_length + 1:i + 1] time = self.get_year_axis()[i - run_length + 1:i + 1] m, b = np.polyfit(time, snippet, 1) moving_average.df.data[i] = b + m * time.values[-1] moving_average.update_history(f'Calculated smoothed series with {run_length}-year trends') moving_average.metadata['derived'] = True return moving_average
[docs] def running_lowess(self, number_of_points: int = 10): """ Lowess smooth time point t by running a lowess smoother from t=0 to t=t. For a regular lowess smoother see method lowess. Parameters ---------- number_of_points: int Number of points to use in the lowess smoother Returns ------- """ moving_average = copy.deepcopy(self) moving_average.df.data[0:number_of_points] = np.nan for i in range(number_of_points, len(self.df.data)): snippet = self.df.data[0:i + 1] time = self.get_year_axis()[0:i + 1] fraction_of_data = number_of_points / len(snippet) fit = lowess(snippet, time, fraction_of_data) moving_average.df.data[i] = fit[i, 1] moving_average.update_history( f'Calculated lowess smoothed series with {fraction_of_data} of data used for each fit') moving_average.metadata['derived'] = True return moving_average
[docs] def lowess(self, number_of_points: int = 10): """ Lowess smooth the series Parameters ---------- number_of_points: int Number of points to use in the lowess smoother Returns ------- """ moving_average = copy.deepcopy(self) snippet = self.df.data[:] time = self.get_year_axis()[:] fraction_of_data = number_of_points / len(snippet) fit = lowess(snippet, time, fraction_of_data) moving_average.df.data[:] = fit[:, 1] moving_average.update_history( f'Calculated lowess smoothed series with {fraction_of_data} of data used for each fit') moving_average.metadata['derived'] = True return moving_average
[docs] def running_stdev(self, run_length: int, centred: bool = False): """ Calculate running standard deviation of the data for a specified run length Parameters ---------- run_length : int length of the run centred: bool Set to True to centre the times associated to the data points, otherwise the time used will be the last time in the n-year run. Returns ------- TimeSeriesAnnual :class:`TimeSeriesAnnual` containing running standard deviation of length run_length. Where there are too few years to calculate a running average, np.nan appears in the data column of the data frame """ moving_average = copy.deepcopy(self) moving_average.df['data'] = moving_average.df['data'].rolling(run_length).std() if centred: moving_average.df['year'] = moving_average.df['year'].rolling(run_length).mean() moving_average.df.dropna(how='any', inplace=True) if centred: moving_average.update_history( f'Calculated {run_length}-year moving standard deviation centred on the middle year of the period') else: moving_average.update_history(f'Calculated {run_length}-year moving standard deviation') moving_average.metadata['derived'] = True return moving_average
[docs] def record_margins(self): n_years = len(self.df) out_series = copy.deepcopy(self) out_series.df.loc[0, "data"] = np.nan for i in range(1, n_years): over_margin = self.df.data[i] - np.max(self.df.data[0:i]) under_margin = self.df.data[i] - np.min(self.df.data[0:i]) if over_margin > 0: out_series.df.loc[i, "data"] = over_margin elif under_margin < 0: out_series.df.loc[i, "data"] = under_margin else: out_series.df.loc[i, "data"] = np.nan return out_series
[docs] def select_decade(self, end_year: int = 0): """ Select every tenth year from the :class:`TimesSeriesAnnual`, the last digit of the years can be selected using the end_year keyword argument. The default is to select all years ending in 0, e.g. 1850, 1860, 1870... 2020. Parameters ---------- end_year: int Last digit of the years to be selected. e.g. set to 0 to pick 1850, 1860... 2010, 2020 etc. Returns ------- TimeSeriesAnnual :class:`TimeSeriesAnnual` containing every tenth year """ self.df = self.df[self.df['year'] % 10 == end_year] self.df = self.df.reset_index() self.metadata['derived'] = True self.update_history(f'Selected years ending in {end_year}') return self
[docs] def generate_dates(self, time_units: str) -> List[datetime]: """ Given a string specifying the required time units (something like days since 1800-01-01 00:00:00.0), generate a list of times from the time series corresponding to those units. Parameters ---------- time_units: str String specifying the units to use for generating the times e.g. "days since 1800-01-01 00:00:00.0" Returns ------- List[datetime] List of dates """ self.df['time'] = pd.to_datetime(self.df.year, format='%Y') dates = cf.date2num(self.df['time'].tolist(), units=time_units, has_year_zero=False, calendar='standard') return dates
[docs] def write_csv(self, filename, metadata_filename=None): """ Write the timeseries to a csv file with the specified filename. The format used for writing is given by the BADC CSV format. This has a lot of upfront metadata before the data section. An option for writing a metadata file is also provided. Parameters ---------- filename: Path Path of the filename to write the data to metadata_filename: Path Path of the filename to write the metadata to Returns ------- None """ monthly = False irregular = False uncertainty = False columns_to_write = ['time', 'year', 'data'] if 'uncertainty' in self.df.columns: uncertainty = True columns_to_write = ['time', 'year', 'data', 'uncertainty'] super().write_generic_csv(filename, metadata_filename, monthly, uncertainty, irregular, columns_to_write)
[docs] def write_simple_csv(self, filename): self.df.to_csv(filename)
[docs] def get_year_axis(self) -> List[float]: """ Return a year axis with dates represented as decimal years. Returns ------- List[float] List of dates as decimal years. """ year_axis = self.df['year'] return year_axis
[docs] def get_string_date_range(self) -> str: """ Create a string which specifies the date range covered by the :class:`TimeSeriesAnnual` in the format YYYY-YYYY Returns ------- str String that specifies the date range covered """ start_year, end_year = self.get_first_and_last_year() date_range = f"{start_year}-{end_year}" return date_range
[docs] def add_year(self, year: int, value: float, uncertainty: float = None) -> None: """ Add a year of data. Parameters ---------- year: int the year to be added value: float the data value to be added uncertainty: the uncertainty of the data value to be added (optional) Returns ------- None """ if year not in self.df['year'].tolist(): dict_to_add = {'year': year, 'data': value} if uncertainty is not None: dict_to_add['uncertainty'] = uncertainty mini_df = pd.DataFrame([dict_to_add]) self.df = pd.concat([self.df, mini_df], ignore_index=True) else: warnings.warn(f"Year {year} already exists. No change")
[docs] def get_start_and_end_year(all_datasets: List[TimeSeriesAnnual]) -> Tuple[Optional[int], Optional[int]]: """ Given a list of :class:`TimeSeriesAnnual`, extract the first year in any of the data sets and the last year in any of the data sets. Parameters ---------- all_datasets: List[TimeSeriesAnnual] List of datasets from which to extract the earliest first year and latest final year. Returns ------- Tuple[Optional[int], Optional[int]] Return the first and last years in the list of data sets """ if len(all_datasets) == 0: return None, None first_years = [] last_years = [] for ds in all_datasets: first_year, last_year = ds.get_first_and_last_year() first_years.append(first_year) last_years.append(last_year) return min(first_years), max(last_years)
[docs] def make_combined_series(all_datasets: List[TimeSeriesAnnual], augmented_uncertainty=True) -> TimeSeriesAnnual: """ Combine a list of datasets into a single :class:`TimeSeriesAnnual` by taking the arithmetic mean of all available datasets for each year. Merges the metadata for all the input time series. Parameters ---------- all_datasets: List[TimeSeriesAnnual] List of datasets to be combined augmented_uncertainty: bool Set to True if you want to add an additional uncertainty from the baseline Returns ------- TimeSeriesAnnual :class:`TimeSeriesAnnual` which is the mean of all availabale datasets in each year. """ data_frames = [] metadata = copy.deepcopy(all_datasets[0].metadata) metadata['name'] = 'Combined' metadata['display_name'] = 'Combined series' metadata['version'] = '' metadata['colour'] = '#ff0000' metadata['zpos'] = 0 list_attributes = ['citation', 'citation_url', 'data_citation', 'url', 'filename', 'history'] for i, ds in enumerate(all_datasets): dfi = copy.deepcopy(ds.df) dfi = dfi.rename(columns={'data': f'data_{ds.metadata["name"]}'}) if 'index' in dfi.columns: dfi = dfi.drop(columns=['index']) if 'time' in dfi.columns: dfi = dfi.drop(columns=['time']) data_frames.append(dfi) if i > 0: for att in list_attributes: metadata[att].extend(ds.metadata[att]) df_merged = reduce(lambda left, right: pd.merge(left, right, on=['year'], how='outer', validate='m:m'), data_frames, ) columns = [] for col in df_merged.columns: if 'data' in col: columns.append(col) unc_columns = [] for col in df_merged.columns: if 'uncertainty' in col: unc_columns.append(col) df_merged['combined'] = df_merged[columns].mean(axis=1) df_merged['uncertainty_a'] = df_merged[columns].std(axis=1) if len(columns) == 1: df_merged['uncertainty_a'] = 0.0 df_merged['uncertainty_a'] = df_merged['uncertainty_a'] * 1.645 if augmented_uncertainty: df_merged['uncertainty'] = np.sqrt(df_merged['uncertainty_a'] ** 2 + 0.12 ** 2) else: df_merged['uncertainty_b'] = df_merged[unc_columns].max(axis=1) df_merged['uncertainty'] = np.sqrt(df_merged['uncertainty_a'] ** 2 + df_merged['uncertainty_b'] ** 2) df_merged = df_merged.drop(columns=columns) df_merged = df_merged.rename(columns={'combined': 'data'}) df_merged = df_merged.sort_values(by='year', ascending=True) df_merged = df_merged.reset_index() to_drop = [] for col in df_merged.columns: if 'index_' in col or 'time_' in col or 'level_' in col: to_drop.append(col) df_merged = df_merged.drop(columns=to_drop) return TimeSeriesAnnual.make_from_df(df_merged, metadata)
[docs] def get_list_of_unique_variables(all_datasets: List[TimeSeriesAnnual]) -> List[str]: """ Given a list of :class:`TimeSeriesAnnual`, get a list of the unique variable names represented in that list. Parameters ---------- all_datasets: List[TimeSeriesAnnual] Returns ------- List[str] List of the unique variable names. """ # get list of all unique variables variables = [] for ds in all_datasets: if ds.metadata['variable'] not in variables: variables.append(ds.metadata['variable']) return variables
[docs] def superset_dataset_list(all_datasets: List[TimeSeriesAnnual], variables: List[str]) -> List[List[TimeSeriesAnnual]]: """ Given a list of variables, create a list where each entry is a list of all :class:`TimeSeriesAnnual` objects corresponding to the variable in that index position. Parameters ---------- all_datasets: List[TimeSeriesAnnual] List of datasets variables: List[str] List of variable names Returns ------- List[List[TimeSeriesAnnual]] List of lists of :class:`TimeSeriesAnnual`. """ superset = [] for _ in variables: superset.append([]) for ds in all_datasets: i = variables.index(ds.metadata['variable']) superset[i].append(ds) return superset
[docs] def write_dataset_summary_file(all_datasets, csv_filename): if len(all_datasets) == 0: return None dataframes = [] dataset_names = [] for ds in all_datasets: dataframes.append(ds.df) dataset_names.append(ds.metadata['name']) ds = all_datasets[0] # Find the first and last years from all the dataframes min_year = min(df['year'].min() for df in dataframes) max_year = max(df['year'].max() for df in dataframes) # Create a new dataframe that covers the whole date range if isinstance(ds, TimeSeriesAnnual): combined_df = pd.DataFrame({'year': range(min_year, max_year + 1)}) elif isinstance(ds, TimeSeriesMonthly): # build a dataframe from all the unique year-month pairs in the input datasets combined_df = pd.concat([df[['year', 'month']] for df in dataframes]).drop_duplicates() else: return None # Add columns for "data" from each dataframe and rename to the display_name for df, col_name in zip(dataframes, dataset_names): if 'data' in df.columns: if isinstance(ds, TimeSeriesAnnual): merged_df = pd.merge(combined_df, df[['year', 'data']], on='year', how='left', validate='m:m') if isinstance(ds, TimeSeriesMonthly): merged_df = pd.merge(combined_df, df[['year', 'month', 'data']], on=['year', 'month'], how='left', validate='m:m') merged_df.rename(columns={'data': col_name}, inplace=True) combined_df = merged_df # Write the combined DataFrame to the specified output path as a CSV file combined_df.to_csv(csv_filename, index=False, float_format='%.4f') return combined_df
[docs] def create_common_dataframe( dataframes: List[pd.DataFrame], monthly: bool = False, annual: bool = False, irregular: bool = False ) -> pd.DataFrame: """ Given a list of dataframes make a single dataframe which has rows corresponding to all time steps in the input dataframes Parameters ---------- dataframes: List[pd.DataFrame] List of dataframes which are to be used as the basis for the common data frame monthly: bool Set to true for monthly data annual: bool Set to true for annual data irregular: bool Set to true for daily/irregular data Returns ------- pd.DataFrame Pandas dataframe with one row for each row in the input dataframes """ if sum([annual, monthly, irregular]) != 1: raise ValueError("One and only one flag must be set to True") # Create a new dataframe that covers the whole date range from unique data combinations if annual: columns = ['year'] elif monthly: columns = ['year', 'month'] elif irregular: columns = ['year', 'month', 'day'] if 'time' in dataframes[0].columns: columns.append('time') common_dataframe = pd.concat([df[columns] for df in dataframes]).drop_duplicates() return common_dataframe
[docs] def equalise_datasets( all_datasets: List[Union[TimeSeriesAnnual, TimeSeriesMonthly, TimeSeriesIrregular]], uncertainty: bool = False ) -> pd.DataFrame: """ Given a list of datasets Parameters ---------- all_datasets: List[Union[TimeSeriesAnnual, TimeSeriesMonthly, TimeSeriesIrregular]] List of time series datasets whose data is to be combined in a single data frame. The data column from each data set will be combined into a single data from with each data column becoming a column identified by the "name" of the data set from its metadata. Returns ------- pd.DataFrame Pandas dataframe containing the data columns from all the input datasets. """ if len(all_datasets) <= 1: return all_datasets[0].df dataframes = [] for ds in all_datasets: if 'data' in ds.df.columns: dataframes.append(ds.df) ds = all_datasets[0] monthly = isinstance(ds, TimeSeriesMonthly) annual = isinstance(ds, TimeSeriesAnnual) irregular = isinstance(ds, TimeSeriesIrregular) combined_df = create_common_dataframe(dataframes, monthly=monthly, annual=annual, irregular=irregular) added_columns = ['data'] if uncertainty: added_columns.append('uncertainty') on_columns = ['year'] if monthly: on_columns = ['year', 'month'] if irregular: on_columns = ['year', 'month', 'day'] columns = [*on_columns, *added_columns] # for each dataset in the list, merge it with the combined dataframe, rename the data column and # update the combined dataframe for ds in all_datasets: merged_df = pd.merge(combined_df, ds.df[columns], on=on_columns, how='left', validate='m:m') merged_df.rename(columns={'data': ds.metadata['name']}, inplace=True) if uncertainty: merged_df.rename(columns={'uncertainty': f"{ds.metadata['name']}_uncertainty"}, inplace=True) combined_df = merged_df columns = [*on_columns, *added_columns] combined_df = combined_df.sort_values(by=on_columns, ascending=True) combined_df = combined_df.reset_index(drop=True) return combined_df
[docs] def write_dataset_summary_file_with_metadata( all_datasets: List[Union[TimeSeriesAnnual, TimeSeriesMonthly, TimeSeriesIrregular]], csv_filename: Union[str, Path] ) -> None: """ Given a list of time series data sets, write them out in a single BADC CSV format csv file with complete metadata. Parameters ---------- all_datasets: List[Union[TimeSeriesAnnual, TimeSeriesMonthly, TimeSeriesIrregular]] A list of time series which are going to be equalised csv_filename: str or Path The name of the file to which the summary will be written. Returns ------- None """ # Set up the information to fill the template now = datetime.today() climind_version = pkg_resources.get_distribution("climind").version time_units = 'days since 1800-01-01 00:00:00.0' for ds in all_datasets: ds.df['time'] = ds.generate_dates(time_units) ds = all_datasets[0] # To print out the datasets together, it's necessary to put them on the same time axis common_datasets = equalise_datasets(all_datasets) # populate template to make webpage env = Environment( loader=FileSystemLoader(ROOT_DIR / "climind" / "data_types" / "jinja_templates"), autoescape=select_autoescape() ) template = env.get_template("badc_boilerplate_multiple.jinja2") monthly = isinstance(ds, TimeSeriesMonthly) annual = isinstance(ds, TimeSeriesAnnual) irregular = isinstance(ds, TimeSeriesIrregular) rendered = template.render( now=now, climind_version=climind_version, time_units=time_units, datasets=all_datasets, monthly=monthly, annual=annual, irregular=irregular ) # Next set up columns to write from the combined dataframe n_data_columns = len(all_datasets) columns_to_write = ['time', 'year'] if monthly: columns_to_write = ['time', 'year', 'month'] elif irregular: columns_to_write = ['time', 'year', 'month', 'day'] for i in range(n_data_columns): columns_to_write.append(all_datasets[i].metadata['name']) # Now write everything to the file with open(csv_filename, 'w') as f: f.write(rendered) f.write(common_datasets.to_csv(index=False, lineterminator='\n', float_format='%.4f', header=False, columns=columns_to_write)) f.write("end data\n")
[docs] class AveragesCollection: """ A simple class to perform specific tasks on lists of :class:`.TimeSeriesAnnual` """ def __init__(self, all_datasets): self.averages = [] self.stdevs = [] self.plus_unc_lower = [] self.minus_unc_lower = [] self.plus_unc_upper = [] self.minus_unc_upper = [] self.expand = False self.widest = False for ds in all_datasets: first_year, _ = ds.get_first_and_last_year() if first_year == 1850: pre_average = ds.running_mean(51) pre_stdev = ds.running_stdev(51) pre_average = pre_average.df['data'][50] pre_stdev = pre_stdev.df['data'][50] self.averages.append(pre_average) self.stdevs.append(pre_stdev) self.plus_unc_lower.append(pre_average + 1.645 * pre_stdev / np.sqrt(51.)) self.minus_unc_lower.append(pre_average - 1.645 * pre_stdev / np.sqrt(51.)) self.plus_unc_upper.append(pre_average + 1.645 * pre_stdev) self.minus_unc_upper.append(pre_average - 1.645 * pre_stdev)
[docs] def count(self): return len(self.averages)
[docs] def best_estimate(self): return np.mean(self.averages)
[docs] def range(self): if self.expand: if self.widest: return np.max(self.plus_unc_upper) - np.min(self.minus_unc_upper) else: return np.max(self.plus_unc_lower) - np.min(self.minus_unc_lower) else: return np.max(self.averages) - np.min(self.averages)
[docs] def lower_range(self): if self.expand: if self.widest: return np.min(self.minus_unc_upper) else: return np.min(self.minus_unc_lower) else: return np.min(self.averages)
[docs] def upper_range(self): if self.expand: if self.widest: return np.max(self.plus_unc_upper) else: return np.max(self.plus_unc_lower) else: return np.max(self.averages)