import inspect # Used for storing the input
from abc import ABC, abstractmethod
import numpy as np
from .invlapnumba import invlapcomp
[docs]
class Element(ABC):
def __init__(
self,
model,
nparam=1,
nunknowns=0,
layers=0,
tsandbc=[(0, 0)],
type="z",
name="",
label=None,
inhomelement=False,
):
"""Element base class.
Types of elements:
* 'g': strength is given through time
* 'v': boundary condition is variable through time
* 'z': boundary condition is zero through time
Definition of nlayers, Ncp, Npar, nunknowns:
* nlayers: Number of layers that the element is screened in,
as set in Element
* Ncp: Number of control points along the element
* nparam: Number of parameters, commonly nlayers * Ncp
* nunknowns: Number of unknown parameters, commonly zero or Npar
"""
self.model = model
self.aq = None # Set in the initialization function
self.nparam = nparam # Number of parameters
self.nunknowns = nunknowns
self.layers = np.atleast_1d(layers)
self.nlayers = len(self.layers)
self.inhomelement = inhomelement # tag inhomelements
#
tsandbc = np.atleast_2d(tsandbc).astype(float)
tsandbc_error = (
"tsandQ or tsandh need to be 2D lists"
+ " or arrays, like [(0, 1), (2, 5), (8, 0)] "
)
assert tsandbc.shape[1] == 2, tsandbc_error
self.tstart, self.bcin = tsandbc[:, 0] - self.model.tstart, tsandbc[:, 1]
if self.tstart[0] > 0:
self.tstart = np.hstack((np.zeros(1), self.tstart))
self.bcin = np.hstack((np.zeros(1), self.bcin))
#
# 'z' boundary condition thru time or 'v' boundary condition thru time
self.type = type
self.name = name
self.label = label
if self.label is not None:
assert self.label not in self.model.elementdict.keys(), (
"TTim error: label " + self.label + " already exists"
)
self.rzero = 30
[docs]
def setbc(self):
if len(self.tstart) > 1:
self.bc = np.zeros_like(self.bcin)
self.bc[0] = self.bcin[0]
self.bc[1:] = self.bcin[1:] - self.bcin[:-1]
else:
self.bc = self.bcin.copy()
self.ntstart = len(self.tstart)
[docs]
@abstractmethod
def initialize(self):
"""Initialize the element.
Initialization of terms that cannot be initialized before other elements or
the aquifer is defined.
As we don't want to require a certain order of entering elements, these terms
are initialized when Model.solve is called The initialization class needs to be
overloaded by all derived classes
"""
[docs]
@abstractmethod
def potinf(self, x, y, aq=None):
"""Returns complex array of size (nparam, naq, npval)."""
[docs]
def potential(self, x, y, aq=None):
"""Returns complex array of size (ngvbc, naq, npval)."""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
return np.sum(
self.parameters[:, :, np.newaxis, :] * self.potinf(x, y, aq), axis=1
)
[docs]
def unitpotential(self, x, y, aq=None):
"""Returns complex array of size (naq, npval).
Can be more efficient for given elements.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
return np.sum(self.potinf(x, y, aq), 0)
[docs]
def unitpotentialone(self, x, y, jtime, aq=None):
"""Returns complex array of size (naq, npval).
Can be more efficient for given elements.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
return np.sum(self.potinfone(x, y, jtime, aq), 0)
[docs]
@abstractmethod
def disvecinf(self, x, y, aq=None):
"""Returns 2 complex arrays of size (nparam, naq, npval)."""
[docs]
def disvec(self, x, y, aq=None):
"""Returns 2 complex arrays of size (ngvbc, naq, npval)."""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
qx, qy = self.disvecinf(x, y, aq)
return np.sum(self.parameters[:, :, np.newaxis, :] * qx, 1), np.sum(
self.parameters[:, :, np.newaxis, :] * qy, 1
)
[docs]
def unitdisvec(self, x, y, aq=None):
"""Returns 2 complex arrays of size (naq, npval).
Can be more efficient for given elements.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
qx, qy = self.disvecinf(x, y, aq)
return np.sum(qx, 0), np.sum(qy, 0)
# Functions used to build equations
[docs]
def potinflayers(self, x, y, layers=0, aq=None):
"""Layers can be scalar, list, or array.
returns array of size (len(layers),nparam,npval) only used in building equations
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
pot = self.potinf(x, y, aq)
rv = np.sum(pot[:, np.newaxis, :, :] * aq.eigvec, 2)
# first axis needs to be the number of layers
rv = rv.swapaxes(0, 1)
return rv[layers, :]
[docs]
def potentiallayers(self, x, y, layers=0, aq=None):
"""Returns complex array of size (ngvbc, len(layers),npval).
Only used in building equations.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
pot = self.potential(x, y, aq)
phi = np.sum(pot[:, np.newaxis, :, :] * aq.eigvec, 2)
return phi[:, layers, :]
[docs]
def unitpotentiallayers(self, x, y, layers=0, aq=None):
"""Returns complex array of size (len(layers), npval).
Only used in building equations.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
pot = self.unitpotential(x, y, aq)
phi = np.sum(pot[np.newaxis, :, :] * aq.eigvec, 1)
return phi[layers, :]
[docs]
def disvecinflayers(self, x, y, layers=0, aq=None):
"""Layers can be scalar, list, or array.
returns 2 arrays of size (len(layers),nparam,npval) only used in building
equations
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
qx, qy = self.disvecinf(x, y, aq)
rvx = np.sum(qx[:, np.newaxis, :, :] * aq.eigvec, 2)
rvy = np.sum(qy[:, np.newaxis, :, :] * aq.eigvec, 2)
# first axis needs to be the number of layers
rvx = rvx.swapaxes(0, 1)
rvy = rvy.swapaxes(0, 1)
return rvx[layers, :], rvy[layers, :]
[docs]
def disveclayers(self, x, y, layers=0, aq=None):
"""Returns 2 complex array of size (ngvbc, len(layers), npval).
Only used in building equations.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
qx, qy = self.disvec(x, y, aq)
rvx = np.sum(qx[:, np.newaxis, :, :] * aq.eigvec, 2)
rvy = np.sum(qy[:, np.newaxis, :, :] * aq.eigvec, 2)
return rvx[:, layers, :], rvy[:, layers, :]
[docs]
def unitdisveclayers(self, x, y, layers=0, aq=None):
"""Returns complex array of size (len(layers), npval).
Only used in building equations.
"""
if aq is None:
aq = self.model.aq.find_aquifer_data(x, y)
qx, qy = self.unitdisvec(x, y, aq)
rvx = np.sum(qx[np.newaxis, :, :] * aq.eigvec, 1)
rvy = np.sum(qy[np.newaxis, :, :] * aq.eigvec, 1)
return rvx[layers, :], rvy[layers, :]
[docs]
def discharge(self, t, derivative=0):
"""The discharge in each layer.
Parameters
----------
t : scalar, list or array
times at which discharge is computed.
t must be ordered and tmin <= t <= tmax
Returns
-------
array of discharges (nlayers,len(t))
Discharge in each screen with zeros for layers that are not
screened
"""
# Could potentially be more efficient if s is pre-computed for
# all elements, but may not be worthwhile to store as it is quick now
time = np.atleast_1d(t).astype(float)
rv = np.zeros((self.nlayers, len(time)))
if self.type == "g":
s = self.dischargeinflayers * self.model.p**derivative
rv = invlapcomp(
time,
s[np.newaxis, :],
self.model.npint,
self.model.M,
self.model.tintervals,
np.zeros(self.ntstart, dtype="int"),
self.tstart,
self.bc,
self.nlayers,
)
else:
s = np.sum(self.parameters[:, :, np.newaxis, :] * self.dischargeinf, 1)
s = np.sum(s[:, np.newaxis, :, :] * self.aq.eigvec, 2)
s = s[:, self.layers, :] * self.model.p**derivative
rv = invlapcomp(
time,
s,
self.model.npint,
self.model.M,
self.model.tintervals,
self.model.enumber,
self.model.etstart,
self.model.ebc,
self.nlayers,
)
return rv
# this function is kept for testing of version 0.6.6
[docs]
def dischargeold(self, t, derivative=0):
"""The discharge in each layer.
Parameters
----------
t : scalar, list or array
times at which discharge is computed.
t must be ordered and tmin <= t <= tmax
Returns
-------
array of discharges (nlayers,len(t))
Discharge in each screen with zeros for layers that are not
screened
"""
# Could potentially be more efficient if s is pre-computed for
# all elements, but may not be worthwhile to store as it is quick now
time = np.atleast_1d(t).astype(float)
if (time[0] < self.model.tmin) or (time[-1] > self.model.tmax):
print(
"Warning, some of the times are smaller than tmin or"
+ "larger than tmax; zeros are substituted"
)
rv = np.zeros((self.nlayers, np.size(time)))
if self.type == "g":
s = self.dischargeinflayers * self.model.p**derivative
for itime in range(self.ntstart):
t = time - self.tstart[itime]
for i in range(self.nlayers):
rv[i] += self.bc[itime] * self.model.inverseLapTran(s[i], t)
else:
s = np.sum(self.parameters[:, :, np.newaxis, :] * self.dischargeinf, 1)
s = np.sum(s[:, np.newaxis, :, :] * self.aq.eigvec, 2)
s = s[:, self.layers, :] * self.model.p**derivative
for k in range(self.model.ngvbc):
e = self.model.gvbclist[k]
for itime in range(e.ntstart):
t = time - e.tstart[itime]
if t[-1] >= self.model.tmin: # Otherwise all zero
for i in range(self.nlayers):
rv[i] += e.bc[itime] * self.model.inverseLapTran(s[k, i], t)
return rv
[docs]
def headinside(self, t):
print("This function not implemented for this element")
return
[docs]
def write(self):
rv = self.name + "(" + self.model.modelname + ",\n"
for key in self.inputargs[2:]: # The first two are ignored
if isinstance(self.inputvalues[key], np.ndarray):
rv += (
key
+ " = "
+ np.array2string(self.inputvalues[key], separator=",")
+ ",\n"
)
elif isinstance(self.inputvalues[key], str):
rv += key + " = '" + self.inputvalues[key] + "',\n"
else:
rv += key + " = " + str(self.inputvalues[key]) + ",\n"
rv += ")\n"
return rv
[docs]
def run_after_solve(self): # noqa: B027
"""Function to run after a solution is completed.
For most elements nothing needs to be done, but for strings of elements some
arrays may need to be filled.
"""
[docs]
@abstractmethod
def plot(self, ax=None):
"""Plot the element."""