import numpy as np
import csv
import sys
import os
from . import toolBox
[docs]
class mapper:
'''
**TBE**
Due to several reasons (e.g. coarse resolutions) the river corridors
within TSMP (as all other models) does not have to match the real corridor
for all pixels along the river.
[...]
'''
###########################################################################
############################# Definition ##################################
###########################################################################
def __init__(self, SimLons=None, SimLats=None,
ObsLons=None, ObsLats=None,
ObsIDs=None,
SimMeanQ=None, ObsMeanQ=None,
SimMeanArea=None, ObsMeanArea=None):
''' Default constructor of mapper-class
Parameters
----------
SimLons : 2D ndarray
Two dimensional ndarray containing the lon value for each point of
SimGrid
SimLats : 2D ndarray
Two dimensional ndarray containing the lat value for each point of
SimGrid
ObsLons : 1D ndarray
One dimensional ndarray containing the lon values for each individual
GRDC station stored with the object
ObsLats : 1D ndarray
One dimensional ndarray containing the lat values for each individual
GRDC station stored with the object
ObsID : 1D ndarray
One dimensional ndarray containing the station ID for each individual
GRDC station stored with the object
SimMeanQ : 2D ndarray
Two dimensional ndarray containing the mean discharge for each point
of SimGrid (mean Q for entire period or ref period)
ObsMeanQ : 1D ndarray
One dimensional ndarray containing the mean discharge for each individual
GRDC station stored with the object
SimMeanArea : 1D ndarray
One dimensional ndarray containing the best fitting catchment area
for each individual GRDC station stored with the object (calculation
based on Sim data)
ObsMeanArea : 1D ndarray
One dimensional ndarray containing the mean catchment area of each individual
GRDC station stored with the object
'''
# input
self.SimLons = SimLons
self.SimLats = SimLats
self.ObsLons = ObsLons
self.ObsLats = ObsLats
self.ObsIDs = ObsIDs
self.SimMeanQ = SimMeanQ
self.ObsMeanQ = ObsMeanQ
# output
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
self.ObsMeanArea = None
self.SimMeanArea = None
@property
def SimLons(self):
return self.__SimLons
@SimLons.setter
def SimLons(self, SimLons):
""" SimLons is expected to be an 2D ndarray """
if SimLons is None:
print(f'Initialize SimLons as NoneType')
self.__SimLons = None
return None
if not isinstance(SimLons, np.ndarray):
print(f'SimLons is of type {type(SimLons)} but <class "numpy.ndarray"> is required!')
self.__SimLons = None
return None
if not SimLons.ndim == 2:
print(f'SimLons is of dimension {SimLons.ndim} but dimension 2 is required!')
self.__SimLons = None
return None
self.__SimLons = SimLons
# set mapped idx to None again, since the SimLons was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
@property
def SimLats(self):
return self.__SimLats
@SimLats.setter
def SimLats(self, SimLats):
""" SimLats is expected to be an 2D ndarray """
if SimLats is None:
print(f'Initialize SimLats as NoneType')
self.__SimLats = None
return None
if not isinstance(SimLats, np.ndarray):
print(f'SimLats is of type {type(SimLats)} but <class "numpy.ndarray"> is required!')
self.__SimLats = None
return None
if not SimLats.ndim == 2:
print(f'SimLats is of dimension {SimLats.ndim} but dimension 2 is required!')
self.__SimLats = None
return None
self.__SimLats = SimLats
# set mapped idx to None again, since the SimLats was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
@property
def ObsLons(self):
return self.__ObsLons
@ObsLons.setter
def ObsLons(self, ObsLons):
""" ObsLons is expected to be an 2D ndarray """
if ObsLons is None:
print(f'Initialize ObsLons as NoneType')
self.__ObsLons = None
return None
if not isinstance(ObsLons, np.ndarray):
print(f'ObsLons is of type {type(ObsLons)} but <class "numpy.ndarray"> is required!')
self.__ObsLons = None
return None
if not ObsLons.ndim == 1:
print(f'ObsLons is of dimension {ObsLons.ndim} but dimension 1 is required!')
self.__ObsLons = None
return None
self.__ObsLons = ObsLons
# set mapped idx to None again, since the ObsLons was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
@property
def ObsLats(self):
return self.__ObsLats
@ObsLats.setter
def ObsLats(self, ObsLats):
""" ObsLats is expected to be an 2D ndarray """
if ObsLats is None:
print(f'Initialize ObsLats as NoneType')
self.__ObsLats = None
return None
if not isinstance(ObsLats, np.ndarray):
print(f'ObsLats is of type {type(ObsLats)} but <class "numpy.ndarray"> is required!')
self.__ObsLats = None
return None
if not ObsLats.ndim == 1:
print(f'ObsLats is of dimension {ObsLats.ndim} but dimension 1 is required!')
self.__ObsLats = None
return None
self.__ObsLats = ObsLats
# set mapped idx to None again, since the ObsLats was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
@property
def ObsIDs(self):
return self.__ObsIDs
@ObsIDs.setter
def ObsIDs(self, ObsIDs):
""" ObsIDs is expected to be an 2D ndarray """
if ObsIDs is None:
print(f'Initialize ObsIDs as NoneType')
self.__ObsIDs = None
return None
if not isinstance(ObsIDs, np.ndarray):
print(f'ObsIDs is of type {type(ObsIDs)} but <class "numpy.ndarray"> is required!')
self.__ObsIDs = None
return None
if not ObsIDs.ndim == 1:
print(f'ObsIDs is of dimension {ObsIDs.ndim} but dimension 1 is required!')
self.__ObsIDs = None
return None
self.__ObsIDs = ObsIDs
# set mapped idx to None again, since the ObsIDs was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
@property
def SimMeanQ(self):
return self.__SimMeanQ
@SimMeanQ.setter
def SimMeanQ(self, SimMeanQ):
""" SimMeanQ is expected to be an 2D ndarray """
if SimMeanQ is None:
print(f'Initialize SimMeanQ as NoneType')
self.__SimMeanQ = None
return None
if not isinstance(SimMeanQ, np.ndarray):
print(f'SimMeanQ is of type {type(SimMeanQ)} but <class "numpy.ndarray"> is required!')
self.__SimMeanQ = None
return None
if not SimMeanQ.ndim == 2:
print(f'SimMeanQ is of dimension {SimMeanQ.ndim} but dimension 2 is required!')
self.__SimMeanQ = None
return None
self.__SimMeanQ = SimMeanQ
# set mapped idx to None again, since the SimMeanQ was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
@property
def ObsMeanQ(self):
return self.__ObsMeanQ
@ObsMeanQ.setter
def ObsMeanQ(self, ObsMeanQ):
""" ObsMeanQ is expected to be an 2D ndarray """
if ObsMeanQ is None:
print(f'Initialize ObsMeanQ as NoneType')
self.__ObsMeanQ = None
return None
if not isinstance(ObsMeanQ, np.ndarray):
print(f'ObsMeanQ is of type {type(ObsMeanQ)} but <class "numpy.ndarray"> is required!')
self.__ObsMeanQ = None
return None
if not ObsMeanQ.ndim == 1:
print(f'ObsMeanQ is of dimension {ObsMeanQ.ndim} but dimension 1 is required!')
self.__ObsMeanQ = None
return None
self.__ObsMeanQ = ObsMeanQ
# set mapped idx to None again, since the ObsMeanQ was changed!
self.MapXIdx_raw = None
self.MapYIdx_raw = None
self.MapXIdx_fit = None
self.MapYIdx_fit = None
"""
One can think about additional functions like:
setGridFromDef
readNetCDF
[...]
or similar
"""
###########################################################################
########################## Auxiliary tools ################################
###########################################################################
[docs]
def spher_dist(self, lon1, lat1, lon2, lat2, Rearth=6373):
""" calculate the spherical / haversine distance
Source: https://www.kompf.de/gps/distcalc.html
This function is supposed to proper handle different shaped coords
latX and lonX is supposed to be passed in rad
return 2D ndarray
"""
term1 = np.sin(lat1) * np.sin(lat2)
term2 = np.cos(lat1) * np.cos(lat2)
term3 = np.cos(lon2 - lon1)
# tmp_bool = ( (-1 <= (term1+term2*term3)) & ((term1+term2*term3) >= 1) & ((term1+term2*term3) == 0) & (np.isnan(term1+term2*term3)) ).all()
# print(f'arccos: {tmp_bool}')
return Rearth * np.arccos(term1+term2*term3)
###########################################################################
########################### Core functions ################################
###########################################################################
def __checkt4MapRaw(self):
''' This is a separate function to keep MapRaw() functions readable.
This function checks if the passed data fulfills some basic
requirements as e.g. if all needed data are defined and if
SimMeanQ and SimLons are of same shape.
This is basically to avoid trivial errors while using this class.
Returns
-------
boolean
True if check is passed, False if some errors are detected.
'''
# are all variables defined?
if self.SimLons is None:
print('self.SimLons is not defined yet, but required by function self.MapRaw()!')
return False
if self.SimLats is None:
print('self.SimLats is not defined yet, but required by function self.MapRaw()!')
return False
if self.ObsLons is None:
print('self.ObsLons is not defined yet, but required by function self.MapRaw()!')
return False
if self.ObsLats is None:
print('self.ObsLats is not defined yet, but required by function self.MapRaw()!')
return False
if self.ObsIDs is None:
print('self.ObsIDs is not defined yet, but required by function self.MapRaw()!')
return False
# are the variables / shapes reasonable?
if self.SimLons.shape != self.SimLats.shape:
print(f'The shape of self.SimLons {self.SimLons.shape} is not equal the shape of self.SimLats {self.SimLats.shape}!')
return False
if self.ObsLons.shape != self.ObsLats.shape:
print(f'The shape of self.ObsLons {self.ObsLons.shape} is not equal the shape of self.ObsLats {self.ObsLats.shape}!')
return False
if self.ObsLons.shape != self.ObsIDs.shape:
print(f'The shape of self.ObsLons / self.ObsLats {self.ObsLons.shape} is not equal the shape of self.ObsIDs {self.ObsIDs.shape}!')
return False
return True
[docs]
def MapRaw(self):
''' This functions straight forward maps OBS on SimGrid.
This function maps OBS data according to its Lat/Lon values
to SimGrid, by calculating the 'real' distance between OBS location
and all points in SimGrid. The point with the smallest distance is
choose as the correct location of OBS on SimGrid.
The 'real' distance is hereby the distance in [m] between OBS and SimGrind
calculated on a sphere.
Returns
-------------
None
Notes
-----
This function sets / updates the object variables self.MapYIdx_fit,
and self.MapXIdx_fit directly
'''
#check if all needed data are already defined
if not self.__checkt4MapRaw():
print('checkt4MapRaw() failed --> self.MapRaw() canceled!')
return None
# Create empty lists for results
tmp_MapXIdx_raw = []
tmp_MapYIdx_raw = []
# Loop over all ObsIDs stored with the object
print('MapRaw start', end='')
numObsIDs = len(self.ObsIDs)
for idx, ObsID in enumerate(self.ObsIDs):
print(f'\rMapRaw ObsID: {ObsID} ({idx} / {numObsIDs})', end='', flush=True)
ObsLon = self.ObsLons[idx]
ObsLat = self.ObsLats[idx]
# Calculate distance between Obs and SimGrid on Earth surface
dist = self.spher_dist(np.deg2rad(self.SimLons),
np.deg2rad(self.SimLats),
np.deg2rad(ObsLon),
np.deg2rad(ObsLat))
# Find index of smallest distance
mapped_idx = np.unravel_index(np.argmin(dist, axis=None),dist.shape)
# Temporally store found idx in list
tmp_MapYIdx_raw.append(mapped_idx[0])
tmp_MapXIdx_raw.append(mapped_idx[1])
print('MapRaw done', flush=True)
# update object-variables with found information
self.MapYIdx_raw = np.array(tmp_MapYIdx_raw)
self.MapXIdx_raw = np.array(tmp_MapXIdx_raw)
def __check4MapQ(self):
''' This is a separate function to keep MapXXX functions readable.
This function checks if the passed data fulfills some basic
requirements as e.g. SimMeanQ and SimLons are of same shape.
This is basically to avoid trivial errors while using this class.
Because self.MapRaw() is called as part of all self.MapBestXXX()
functions, this function only checks additional requirements
Returns
-------
boolean
True if check is passed, False if some errors are detected.
'''
# are all variables defined?
if self.SimMeanQ is None:
print('self.SimMeanQ is not defined yet, but required by function self.check4MapQ()!')
return False
if self.ObsMeanQ is None:
print('self.ObsMeanQ is not defined yet, but required by function self.check4MapQ()!')
return False
if self.SimMeanQ.shape != self.SimLons.shape:
print(f'The shape of self.SimMeanQ {self.SimMeanQ.shape} is not equal the shape of self.SimLons / self.SimLats {self.SimLons.shape}!')
return False
if self.ObsMeanQ.shape != self.ObsLons.shape:
print(f'The shape of self.ObsMeanQ {self.ObsMeanQ.shape} is not equal the shape of self.ObsLons / self.ObsLats {self.ObsLons.shape}!')
return False
return True
[docs]
def MapBestQ(self, search_rad=1):
''' This functions maps OBS on SimGrid by choosing that pixel with best
fitting discharge (Q) within given radius.
The best fitting Q is found by calculating the discharge of
each pixel within a given radius around the origin pixel.
The origin pixel is hereby defined by MapRaw().
That pixel with Q closes to GRDC data is than set.
Parameters
----------
search_rad : int
defining the radius around the origin pixel to search for best fitting Q.
Returns
-------
None
Notes
-----
This function sets / updates the object variables self.MapYIdx_fit,
and self.MapXIdx_fit directly
'''
# First MapRaw(), than adjust according to best fitting Q
self.MapRaw()
#check if all needed data are already defined
if not self.__check4MapQ():
print('check4MapQ() failed --> self.MapBestQ() canceled!')
return None
# Create empty lists for results
tmp_MapXIdx_fit = []
tmp_MapYIdx_fit = []
# Loop over all ObsIDs stored with the object
for idx, ObsID in enumerate(self.ObsIDs):
# Set the original pixel around which to search
y = self.MapYIdx_raw[idx]
x = self.MapXIdx_raw[idx]
# extract subset of self.SimMeanQ around origin pixel
# and search radius search_rad (+1 cause last is exclusive)
sub_SimMeanQ = self.SimMeanQ[y-search_rad:y+search_rad+1, x-search_rad:x+search_rad+1]
sub_ObsMeanQ = self.ObsMeanQ[idx]
# Calculate difference between SimMeanQ and ObsMeanQ (GRDC) within subset
dist = np.abs(sub_SimMeanQ - sub_ObsMeanQ)
# Get index of best fitting Q (min(dist)) within subset
mapped_idx = np.unravel_index(np.argmin(dist, axis=None),dist.shape)
# Convert subset-index to global index and
# store (x|y) of best fitting Q in results list
tmp_realYidx = (y - search_rad) + mapped_idx[0]
tmp_realXidx = (x - search_rad) + mapped_idx[1]
tmp_MapYIdx_fit.append(tmp_realYidx)
tmp_MapXIdx_fit.append(tmp_realXidx)
# update object-variables with found information
self.MapYIdx_fit = np.array(tmp_MapYIdx_fit)
self.MapXIdx_fit = np.array(tmp_MapXIdx_fit)
[docs]
def MapHighQ(self, search_rad=1):
''' This functions maps OBS on SimGrid by choosing that pixel with highest
discharge (Q) within given radius.
The highest discharge (Q) is simply found by applying np.argmax() to
the subset (search_rad around origin pixel) of SimMeanQ.
The origin pixel is hereby defined by MapRaw()
SimMeanQ is part of the objects-variables
Parameters
----------
search_rad : int
defining the radius around the origin pixel to search for highest Q.
Returns
-------
None
Notes
-----
This function sets / updates the object variables self.MapYIdx_fit,
and self.MapXIdx_fit directly
'''
# First MapRaw(), than adjust according to highest Q
self.MapRaw()
#check if all needed data are already defined
if not self.__check4MapQ():
print('check4MapQ() failed --> self.MapBestQ() canceled!')
return None
# Create empty lists for results
tmp_MapXIdx_fit = []
tmp_MapYIdx_fit = []
# Loop over all ObsIDs stored with the object
for idx, ObsID in enumerate(self.ObsIDs):
# Set the original pixel around which to search
y = self.MapYIdx_raw[idx]
x = self.MapXIdx_raw[idx]
# extract sub area of self.SimMeanQ according to current ObsID
# and search radius search_rad (+1 because last is exclusive)
sub_SimMeanQ = self.SimMeanQ[y-search_rad:y+search_rad+1, x-search_rad:x+search_rad+1]
# sub_ObsMeanQ = self.ObsMeanQ[idx] # REMOVE
# Get index of Q within subset
mapped_idx = np.unravel_index(np.argmax(sub_SimMeanQ, axis=None),sub_SimMeanQ.shape)
# Convert subset-index to global index and
# store (x|y) of highest Q in results list
tmp_realYidx = (y - search_rad) + mapped_idx[0]
tmp_realXidx = (x - search_rad) + mapped_idx[1]
tmp_MapYIdx_fit.append(tmp_realYidx)
tmp_MapXIdx_fit.append(tmp_realXidx)
# update object-variables with found information
self.MapYIdx_fit = np.array(tmp_MapYIdx_fit)
self.MapXIdx_fit = np.array(tmp_MapXIdx_fit)
def __check4MapArea(self):
''' This is a separate function to keep MapXXX functions readable.
This function checks if the passed data fulfills some basic
requirements as e.g. self.ObsMeanArea is defnined.
This is basically to avoid trivial errors while using this class.
Because self.MapRaw() is called as part of all self.MapBestXXX()
functions, this function only checks additional requirements
Returns
-------
boolean
True if check is passed, False if some errors are detected.
'''
# are all variables defined?
if self.ObsMeanArea is None:
print('self.ObsMeanArea is not defined yet, but required by function self.check4MapArea()!')
return False
return True
[docs]
def MapBestCatchment(self, search_rad=1, dx=12500., dy=12500., slopey=None, slopex=None):
''' This functions maps OBS on SimGrid by choosing that pixel which related
catchment area fits best to GRDC.
The best fitting catchment is found by calculating the catchment of
each pixel within a given radius around the origin pixel.
The origin pixel is hereby defined by MapRaw().
That pixel with catchment area closes to GRDC data is than set.
Parameters
----------
search_rad : int
defining the radius around the origin pixel to search for best fitting catchment.
dy: float
defining the y-resolution of slope-grid (in [m])
dx: float
defining the x-resolution of slope-grid (in [m])
slopey: 2D ndarray
defining the ParFlow slopes in y-direction used to calculate the catchment
slopex: 2D ndarray
defining the ParFlow slopes in x-direction used to calculate the catchment
Returns
-------
None
Notes
-----
This function sets / updates the object variables self.MapYIdx_fit,
self.MapXIdx_fit, and self.SimMeanArea directly
'''
# First MapRaw(), than adjust according to best fitting catchment-size
self.MapRaw()
#check if all needed data are already defined
if not self.__check4MapArea():
print('check4MapArea() failed --> self.MapBestCatchment() canceled!')
return None
# Create empty lists for results
tmp_MapXIdx_fit = []
tmp_MapYIdx_fit = []
tmp_SimManArea = []
# Loop over all ObsIDs stored with the object
print(f'MapBestCatchment ObsID: start', end='')
numObsIDs = len(self.ObsIDs)
for idx, ObsID in enumerate(self.ObsIDs):
print(f'\rMapBestCatchment ObsID: {ObsID} ({idx} / {numObsIDs})', end='', flush=True)
# Set the original pixel around which to search
y = self.MapYIdx_raw[idx]
x = self.MapXIdx_raw[idx]
# Set catchment area defined by GRDC to compare with
meanArea = self.ObsMeanArea[idx]
# Create lists for temporary results
tmp_catchmentAreaList = []
tmp_catchmentXList = []
tmp_catchmentYList = []
# Loop over all pixel within given search-radius around the
# origin pixel, including the original pixel.
for x_inc in range(-search_rad, search_rad+1):
for y_inc in range(-search_rad, search_rad+1):
tmp_x = x + x_inc
tmp_y = y + y_inc
# find catchment for (tmp_x|tmp_y)
# for more information on calculation of catchment see:
# catcyNAME/toolBox.py --> calc_catchment()
tmp_catchmentMask = toolBox.calc_catchment(slopex, slopey, tmp_x, tmp_y)
# Calculate area of catchment by multiplying with dx and dy
# Than change units from [m^2] to [km^2] to stay compatible to GRDC
tmp_catchmentMask *= dx*dy
tmp_catchmentMask *= 1./(1000.*1000.)
tmp_catchmentArea= np.nansum(tmp_catchmentMask)
# Add information for current inspected pixel in
# temporary results list
tmp_catchmentAreaList.append(tmp_catchmentArea)
tmp_catchmentXList.append(tmp_x)
tmp_catchmentYList.append(tmp_y)
# Convert temporary results list to ndarray
tmp_catchmentArea = np.asarray(tmp_catchmentAreaList)
tmp_catchmentX = np.asarray(tmp_catchmentXList)
tmp_catchmentY = np.asarray(tmp_catchmentYList)
# Now tmp_catchmentArea is a numpy.ndarray holding the catchment-size
# of the pixel related to X and Y values stored in tmp_catchmentX
# and tmp_catchmentY;
# X and Y are absolute pixels of passed slopex and slopey and NOT of
# a subset.
# Calculate difference between calculated catchment areas and GRDC data
dist = np.abs( tmp_catchmentArea - meanArea )
# Get index of best fitting catchment (min(dist)) within temporary
# result list
mapped_idx = np.unravel_index(np.argmin(dist, axis=None),dist.shape)
# Store information of best fitting pixel with results list
tmp_best_fitting_area_x = tmp_catchmentX[mapped_idx[0]]
tmp_best_fitting_area_y = tmp_catchmentY[mapped_idx[0]]
tmp_best_fitting_area = tmp_catchmentArea[mapped_idx[0]]
tmp_MapYIdx_fit.append(tmp_best_fitting_area_y)
tmp_MapXIdx_fit.append(tmp_best_fitting_area_x)
tmp_SimManArea.append(tmp_best_fitting_area)
print(f'MapBestCatchment done', flush=True)
# update object-variables with found information
self.MapYIdx_fit = np.array(tmp_MapYIdx_fit)
self.MapXIdx_fit = np.array(tmp_MapXIdx_fit)
self.SimMeanArea = np.array(tmp_SimManArea)
[docs]
def writeMap2File(self, file):
''' Write mapped coordinates to a given file.
To save mapped coordinates for later use or better comparison,
one can write / dump those to a file.
Currently this function does only write data to CSV-format.
Currently already existing files are overwritten.
Parameters
----------
file : str
defining the file-path to which data should be written.
Returns
-------
None
'''
with open(file,'w', newline='') as outFile:
writer = csv.writer(outFile, delimiter=',')
header=['ObsID',
'ObsLon', 'ObsLat',
'MapXIdx_raw', 'MapYIdx_raw',
'MapXIdx_fit', 'MapYIdx_fit',
'related SimLon', 'related SimLat']
writer.writerow(header)
for idx, ObsID in enumerate(self.ObsIDs):
row = [ObsID,
self.ObsLons[idx], self.ObsLats[idx],
self.MapXIdx_raw[idx], self.MapYIdx_raw[idx],
self.MapXIdx_fit[idx], self.MapYIdx_fit[idx],
# NWR error? Y=X and X=Y?
self.SimLons[self.MapYIdx_raw[idx], self.MapXIdx_raw[idx]], self.SimLats[self.MapYIdx_raw[idx], self.MapXIdx_raw[idx]]]
#self.SimLons[self.MapXIdx_raw[idx], self.MapYIdx_raw[idx]], self.SimLats[self.MapXIdx_raw[idx], self.MapYIdx_raw[idx]]]
writer.writerow(row)