Source code for climind.readers.reader_gistemp_ts

#  Climate indicator manager - a package for managing and building climate indicator dashboards.
#  Copyright (c) 2022 John Kennedy
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import itertools
import xarray as xa
import numpy as np
from typing import List

import climind.data_types.grid as gd
import climind.data_types.timeseries as ts

from climind.data_manager.metadata import CombinedMetadata

from climind.readers.generic_reader import read_ts


[docs] def build_transfer(xx: int, yy: int): """ Build the transfer matrix for this 5x5 grid cell Parameters ---------- xx: int Longitudinal index of grid cell in range 0, 71 yy: int Latitudinal index of grid cell in range 0, 35 Returns ------- """ transfer = np.zeros((3, 3)) + 1.0 if xx % 2 == 0: transfer[:, 2] = transfer[:, 2] * 0.5 lox = int(5 * (xx / 2)) hix = lox + 2 else: transfer[:, 0] = transfer[:, 0] * 0.5 lox = int(5 * ((xx - 1) / 2) + 2) hix = lox + 2 if yy % 2 == 0: transfer[2, :] = transfer[2, :] * 0.5 loy = int(5 * (yy / 2)) hiy = loy + 2 else: transfer[0, :] = transfer[0, :] * 0.5 loy = int(5 * ((yy - 1) / 2) + 2) hiy = loy + 2 return transfer, lox, hix, loy, hiy
[docs] def read_monthly_grid(filename: List[Path], metadata: CombinedMetadata, **kwargs) -> gd.GridMonthly: df = xa.open_dataset(filename[0]) df = df.rename({'tempanomaly': 'tas_mean', 'lat': 'latitude', 'lon': 'longitude'}) metadata.creation_message() return gd.GridMonthly(df, metadata)
[docs] def read_monthly_1x1_grid(filename: List[Path], metadata: CombinedMetadata, **kwargs) -> gd.GridMonthly: """ Convert 2x2 grid to 1x1 grid by copying 2x2 value into all 4 1x1 grid cells it contains Parameters ---------- filename metadata Returns ------- """ gistemp = xa.open_dataset(filename[0]) target_grid = np.repeat(gistemp.tempanomaly, 2, 1) target_grid = np.repeat(target_grid, 2, 2) latitudes = np.linspace(-89.5, 89.5, 180) longitudes = np.linspace(-179.5, 179.5, 360) times = gistemp.time.data ds = gd.make_xarray(target_grid, times, latitudes, longitudes) # update encoding for key in ds.data_vars: ds[key].encoding.update({'zlib': True, '_FillValue': -1e30}) metadata.creation_message() metadata['history'].append("Regridded to 1 degree latitude-longitude resolution") return gd.GridMonthly(ds, metadata)
[docs] def read_monthly_5x5_grid(filename: List[Path], metadata: CombinedMetadata, **kwargs) -> gd.GridMonthly: gistemp = xa.open_dataset(filename[0]) number_of_months = len(gistemp.time.data) target_grid = np.zeros((number_of_months, 36, 72)) for m, xx, yy in itertools.product(range(number_of_months), range(72), range(36)): transfer, lox, hix, loy, hiy = build_transfer(xx, yy) selection = gistemp.tempanomaly.data[m, loy:hiy + 1, lox:hix + 1] index = (~np.isnan(selection)) if np.count_nonzero(index) > 0: weighted = transfer[index] * selection[index] grid_mean = np.sum(weighted) / np.sum(transfer[index]) else: grid_mean = np.nan target_grid[m, yy, xx] = grid_mean latitudes = np.linspace(-87.5, 87.5, 36) longitudes = np.linspace(-177.5, 177.5, 72) times = gistemp.time.data ds = gd.make_xarray(target_grid, times, latitudes, longitudes) # update encoding for key in ds.data_vars: ds[key].encoding.update({'zlib': True, '_FillValue': -1e30}) metadata.creation_message() metadata['history'].append("Regridded to 5 degree latitude-longitude resolution") return gd.GridMonthly(ds, metadata)
[docs] def read_monthly_ts(filename: List[Path], metadata: CombinedMetadata) -> ts.TimeSeriesMonthly: years = [] months = [] anomalies = [] with open(filename[0], 'r') as f: for _ in range(2): f.readline() for line in f: columns = line.split(',') for i in range(1, 13): if columns[i] != '***': years.append(int(columns[0])) months.append(int(i)) anomalies.append(float(columns[i])) metadata.creation_message() return ts.TimeSeriesMonthly(years, months, anomalies, metadata=metadata)
[docs] def read_annual_ts(filename: List[Path], metadata: CombinedMetadata) -> ts.TimeSeriesAnnual: years1 = [] uncertainties = [] with open(filename[1], 'r') as f: f.readline() for line in f: columns = line.split(',') years1.append(int(columns[0])) uncertainties.append(float(columns[2])) years = [] anomalies = [] uncs = [] with open(filename[0], 'r') as f: for _ in range(2): f.readline() for line in f: columns = line.split(',') if columns[13] != '***': years.append(int(columns[0])) anomalies.append(float(columns[13])) if int(columns[0]) in years1: index = years1.index(int(columns[0])) uncs.append(uncertainties[index]) else: index = years1.index(2018) uncs.append(uncertainties[index]) metadata.creation_message() return ts.TimeSeriesAnnual(years, anomalies, metadata=metadata, uncertainty=uncs)