Source code for ttim.fit

import re
import warnings
from typing import Iterable

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.optimize import least_squares, leastsq


[docs] class Calibrate: def __init__(self, model): """Initialize Calibration class. Parameters ---------- model : ttim.Model model to calibrate """ self.model = model self.parameters = pd.DataFrame( columns=[ "layers", "optimal", "std", "perc_std", "pmin", "pmax", "initial", "inhoms", "parray", ] ) self.seriesdict = {} self.seriesinwelldict = {}
[docs] def set_parameter( self, name=None, layers=None, initial=0, pmin=-np.inf, pmax=np.inf, inhoms=None ): """Set parameter to be optimized. Parameters ---------- name : str name can be 'kaq', 'Saq', 'c', 'Sll' or 'kzoverkh'. layers : int or list of ints layer number(s) for which the parameter is set. If an integer is passed, parameter is associated with a single layer. If a list of layers is passed, layers must be consecutive and parameter is set for each layer from min(layers) up to and including max(layers). initial : float, optional initial value for the parameter (the default is 0) pmin : float, optional lower bound for parameter value (the default is -np.inf) pmax : float, optional upper bound for paramater value (the default is np.inf) inhoms : str, list string with name of inhomogeneity, list with string names of inhomogeneities or list of inhomogeneities inhomogeneity(ies) for which the parameter is set. If a string is passed, parameter is associated with a single inhomogeneity. If a list of strings of inhomogeneities (or list of inhomogeneities) is passed, parameter is set for each inhomogeneity in the list. This allows linking of parameters across inhomogeneities. """ assert isinstance(name, str), "Error: name must be string" if isinstance(layers, Iterable): from_lay = min(layers) to_lay = max(layers) if (np.diff(layers) > 1).any(): warnings.warn( "Non-consecutive layers are not supported. " f"Setting parameter '{name}' for layers {from_lay} - {to_lay}.", stacklevel=1, ) elif isinstance(layers, int): from_lay = layers to_lay = layers else: warnings.warn( "Setting layers in the parameter name is deprecated. " f"Set the layers= keyword argument for parameter '{name}' to silence " "this warning. The parameter name can still include layer info, but " "this will be ignored in a future version of TTim.", DeprecationWarning, stacklevel=2, ) # find numbers in name str for support layer ranges layers_from_name = re.findall(r"\d+", name) if len(layers_from_name) == 0: raise ValueError( "No layer information found in parameter name. " "Please specify layers explicitly." ) elif len(layers_from_name) == 1: from_lay = int(layers_from_name[0]) to_lay = from_lay elif len(layers_from_name) == 2: from_lay, to_lay = map(int, layers_from_name) # get aquifer information and create list if necessary if inhoms is None: aq = [self.model.aq] elif isinstance(inhoms, tuple): aq = list(inhoms) elif not isinstance(inhoms, list): aq = [inhoms] else: aq = inhoms # convert aquifer names to aquifer objects for i, iaq in enumerate(aq): if isinstance(iaq, str): aq[i] = self.model.aq.inhomdict[iaq] plist = [] for iaq in aq: if name[:3] == "kaq": p = iaq.kaq[from_lay : to_lay + 1] elif name[:3] == "Saq": p = iaq.Saq[from_lay : to_lay + 1] elif name[0] == "c": p = iaq.c[from_lay : to_lay + 1] elif name[:3] == "Sll": p = iaq.Sll[from_lay : to_lay + 1] elif name[0:8] == "kzoverkh": p = iaq.kzoverkh[from_lay : to_lay + 1] else: raise ValueError( f"Parameter name '{name}' not recognized. " "Supported parameters are 'kaq', 'Saq', 'c', 'Sll' or 'kzoverkh'." ) plist.append(p[:]) if p is None: # no parameter set print("parameter name not recognized or no parameter ref supplied") return if inhoms is None: pname = f"{name}_{from_lay}_{to_lay}" else: pname = f"{name}_{from_lay}_{to_lay}_{'_'.join([iaq.name for iaq in aq])}" self.parameters.loc[pname] = { "layers": layers, "optimal": float(initial), "std": None, "perc_std": None, "pmin": float(pmin), "pmax": float(pmax), "initial": float(initial), "inhoms": aq if inhoms is not None else None, "parray": plist, }
[docs] def set_parameter_by_reference( self, name=None, parameter=None, initial=0, pmin=-np.inf, pmax=np.inf ): """Set parameter to be optimized. Parameters ---------- name : str parameter name parameter : np.array array reference containing the parameter to be optimized. must be specified as reference, i.e. w.rc[0:] initial : float, optional initial value for the parameter (the default is 0) pmin : float, optional lower bound for parameter value (the default is -np.inf) pmax : float, optional upper bound for paramater value (the default is np.inf) """ assert isinstance(name, str), "Error: name must be string" if parameter is not None: assert isinstance(parameter, np.ndarray), ( "Error: parameter needs to be numpy array" ) p = parameter self.parameters.loc[name] = { "optimal": float(initial), "std": None, "perc_std": None, "pmin": float(pmin), "pmax": float(pmax), "initial": float(initial), "parray": [p[:]], }
[docs] def series(self, name, x, y, layer, t, h, weights=None): """Method to add observations to Calibration object. Parameters ---------- name : str name of series x : float x-coordinate y : float y-coordinate layer : int layer number, 0-indexed t : np.array array containing timestamps of timeseries h : np.array array containing timeseries values, i.e. head observations """ s = Series(x, y, layer, t, h, weights=weights) self.seriesdict[name] = s
[docs] def seriesinwell(self, name, element, t, h): """Method to add observations to Calibration object. Parameters ---------- name : str name of series element: element object with headinside function t : np.array array containing timestamps of timeseries h : np.array array containing timeseries values, i.e. head observations """ e = SeriesInWell(element, t, h) self.seriesinwelldict[name] = e
[docs] def residuals(self, p, printdot=False, weighted=True, layers=None, series=None): """Method to calculate residuals given certain parameters. Parameters ---------- p : np.array array containing parameter values printdot : bool, optional print dot for each function call Returns ------- np.array array containing all residuals """ if printdot: print(".", end="") # set the values of the variables if printdot == 7: print(p) if layers is None: layers = range(self.model.aq.naq) for i, k in enumerate(self.parameters.index): parraylist = self.parameters.loc[k, "parray"] for parray in parraylist: # [:] needed to do set value in array parray[:] = p[i] self.model.solve(silent=True) rv = np.empty(0) cal_series = self.seriesdict.keys() if series is None else series for key in cal_series: s = self.seriesdict[key] if s.layer not in layers: continue h = self.model.head(s.x, s.y, s.t, layers=s.layer) w = s.weights if ((s.weights is not None) and weighted) else np.ones_like(h) rv = np.append(rv, (s.h - h) * w) for key in self.seriesinwelldict: s = self.seriesinwelldict[key] h = s.element.headinside(s.t)[0] rv = np.append(rv, s.h - h) return rv
[docs] def residuals_lmfit(self, lmfitparams, printdot=False): vals = lmfitparams.valuesdict() p = np.array([vals[k] for k in self.parameters.index]) return self.residuals(p, printdot)
[docs] def fit_least_squares(self, report=True, diff_step=1e-4, xtol=1e-8, method="lm"): self.fitresult = least_squares( self.residuals, self.parameters.initial.values, args=(True,), bounds=(self.parameters.pmin.values, self.parameters.pmax.values), method=method, diff_step=diff_step, xtol=xtol, x_scale="jac", ) print("", flush=True) # Call residuals to specify optimal values for model res = self.residuals(self.fitresult.x) for ipar in self.parameters.index: self.parameters.loc[ipar, "optimal"] = self.parameters.loc[ipar, "parray"][0] nparam = len(self.fitresult.x) H = self.fitresult.jac.T @ self.fitresult.jac sigsq = np.var(res, ddof=nparam) self.covmat = np.linalg.inv(H) * sigsq self.sig = np.sqrt(np.diag(self.covmat)) D = np.diag(1 / self.sig) self.cormat = D @ self.covmat @ D self.parameters["std"] = self.sig self.parameters["perc_std"] = self.sig / self.parameters["optimal"] * 100 if report: print(self.parameters) print(self.sig) print(self.covmat) print(self.cormat)
[docs] def fit_lmfit(self, report=False, printdot=True, **kwargs): import lmfit self.lmfitparams = lmfit.Parameters() for name in self.parameters.index: p = self.parameters.loc[name] self.lmfitparams.add(name, value=p["initial"], min=p["pmin"], max=p["pmax"]) fit_kws = {"epsfcn": 1e-4} # this is essential to specify step for the Jacobian self.fitresult = lmfit.minimize( self.residuals_lmfit, self.lmfitparams, method="leastsq", kws={"printdot": printdot}, **fit_kws, **kwargs, ) print("", flush=True) print(self.fitresult.message) if self.fitresult.success: for name in self.parameters.index: self.parameters.loc[name, "optimal"] = self.fitresult.params.valuesdict()[ name ] if hasattr(self.fitresult, "covar"): self.parameters["std"] = np.sqrt(np.diag(self.fitresult.covar)) self.parameters["perc_std"] = ( 100 * self.parameters["std"] / np.abs(self.parameters["optimal"]) ) else: self.parameters["std"] = np.nan self.parameters["perc_std"] = np.nan if report: print(lmfit.fit_report(self.fitresult))
[docs] def residuals_leastsq(self, logparams, printdot=False): params = 10**logparams print("params ", params) return self.residuals(params, printdot)
[docs] def fit_leastsq(self, report=True, diff_step=1e-4, xtol=1e-8): params_initial = np.log10(self.parameters.initial.values) print("params_initial ", params_initial) plog, mes = leastsq(self.residuals_leastsq, params_initial, epsfcn=1e-3) print("", flush=True) params = 10**plog # Call residuals to specify optimal values for model self.residuals(params) for ipar in self.parameters.index: self.parameters.loc[ipar, "optimal"] = self.parameters.loc[ipar, "parray"][0]
# nparam = len(self.fitresult.x) # H = self.fitresult.jac.T @ self.fitresult.jac # sigsq = np.var(res, ddof=nparam) # self.covmat = np.linalg.inv(H) * sigsq # self.sig = np.sqrt(np.diag(self.covmat)) # D = np.diag(1 / self.sig) # self.cormat = D @ self.covmat @ D # self.parameters["std"] = self.sig # self.parameters["perc_std"] = self.sig / self.parameters["optimal"] * 100 # if report: # print(self.parameters) # print(self.sig) # print(self.covmat) # print(self.cormat)
[docs] def fit(self, report=False, printdot=True, **kwargs): # current default fitting routine is lmfit # return self.fit_least_squares(report) # does not support bounds by default return self.fit_lmfit(report, printdot, **kwargs)
[docs] def rmse(self, weighted=True, layers=None): """Calculate root-mean-squared-error. Returns ------- float return rmse value """ r = self.residuals( self.parameters["optimal"].values, weighted=weighted, layers=layers ) return np.sqrt(np.mean(r**2))
[docs] def topview(self, ax=None, layers=None, labels=True): """Plot topview of model with calibration points. Parameters ---------- ax : matplotlib.axes.Axes, optional axes to plot on (the default is None, which creates a new figure) """ if ax is None: _, ax = plt.subplots() # self.model.plots.topview(ax=ax) for key, s in self.seriesdict.items(): if layers is None or s.layer in layers: ax.plot(s.x, s.y, "ko") if labels: ax.text(s.x, s.y, key, ha="left", va="bottom") return ax
[docs] def xsection(self, ax=None, labels=True): """Plot cross-section of model with calibration points. Parameters ---------- ax : matplotlib.axes.Axes, optional axes to plot on (the default is None, which creates a new figure) """ if ax is None: _, ax = plt.subplots() for key, s in self.seriesdict.items(): aq = self.model.aq.find_aquifer_data(s.x, s.y) ztop = aq.z[0] zpb_top = aq.zaqtop[s.layer] zpb_bot = aq.zaqbot[s.layer] ax.plot([s.x, s.x], [zpb_top, zpb_bot], c="k", ls="dotted") ax.plot([s.x, s.x], [ztop + 1, zpb_top], c="k", ls="solid", lw=1.0) if labels: ax.text(s.x, ztop + 1, key, ha="left", va="bottom", rotation=45) return ax
[docs] class Series: def __init__(self, x, y, layer, t, h, weights=None): self.x = x self.y = y self.layer = layer self.t = t self.h = h self.weights = weights
[docs] class SeriesInWell: def __init__(self, element, t, h): self.element = element self.t = t self.h = h