Source code for pyoomph.generic.assembly

#  @file
#  @author Christian Diddens <c.diddens@utwente.nl>
#  @author Duarte Rocha <d.rocha@utwente.nl>
#  
#  @section LICENSE
# 
#  pyoomph - a multi-physics finite element framework based on oomph-lib and GiNaC 
#  Copyright (C) 2021-2025  Christian Diddens & Duarte Rocha
# 
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
# 
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# 
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>. 
#
#  The authors may be contacted at c.diddens@utwente.nl and d.rocha@utwente.nl
#
# ========================================================================
 
from ..typings import *
import numpy
from scipy.sparse import csr_matrix #type:ignore
import time

if TYPE_CHECKING:
    from .problem import Problem
    import _pyoomph

class CustomAssemblyBase:    
    def __init__(self) -> None:
        self.problem:Optional["Problem"]=None            

    def _set_problem(self,problem:"Problem"):
        self.problem=problem
        
    def has_custom_solve_routine(self)->bool:
        # You can override the default solve routine for J*dU=R
        return False
    
    def custom_solve_routine(self, solve_Jx_b:Callable[[NPFloatArray],NPFloatArray], b:NPFloatArray) -> NPFloatArray:
        # You can override the default solve routine for J*dU=R
        # This is only called if has_custom_solve_routine() returns True
        raise NotImplementedError("Custom solve routine not implemented")

    def invalidate_cache(self)->None:
        pass

    def actions_after_adapt(self)->None:
        self.invalidate_cache()

    def actions_after_remeshing(self)->None:
        self.invalidate_cache()

    def actions_after_equation_numbering(self)->None:
        self.invalidate_cache()

    def actions_after_setting_initial_condition(self)->None:
        self.invalidate_cache()

    def actions_after_successful_newton_solve(self)->None:
        pass

    def initialize(self)->None:
        pass
    
    def finalize(self)->None:
        pass

    @overload
    def get_residuals_and_jacobian(self,require_jacobian:Literal[False],dparameter:Optional[str]=None)->NPFloatArray: ...

    @overload
    def get_residuals_and_jacobian(self,require_jacobian:Literal[True],dparameter:Optional[str]=None)->Tuple[NPFloatArray,csr_matrix]: ...

    def get_residuals_and_jacobian(self,require_jacobian:bool,dparameter:Optional[str]=None)->Union[NPFloatArray,Tuple[NPFloatArray,csr_matrix]]:
        raise RuntimeError("Must be implemented")
        pass

    # Optionally: Return mass and jacobian matrices of the last step, can be used e.g. for Lyapunov exponent calculation
    def get_last_mass_and_jacobian_matrices(self)->Tuple[Optional[csr_matrix],Optional[csr_matrix]]:
        return None,None
        


[docs] class FixedMeshMaxQuadraticNonlinearAssembly(CustomAssemblyBase): """ For a problem with max. quadratic non-linearities with non-moving, non-adaptive meshes, this assembly hander saves a lot of time. It only works for first order time stepping and assumes BDF2 time-stepping, where the first step is degraded. The residuals must be writeable as :math:`\\vec{R}(\\vec{U})=\\vec{R}_0+\\mathbf{M}_0\\partial_t \\vec{U}+\\mathbf{J}_0\\vec{U}+\\frac{1}{2}\\vec{U}\\cdot\\mathbf{H}_0\\cdot\\vec{U}` where :math:`\\vec{R}_0`, :math:`\\mathbf{M}_0`, :math:`\\mathbf{J}_0` and :math:`\\mathbf{H}_0` are the residual vector, mass matrix, Jacobian and Hessian rank-3-tensor evaluated at the trivial dof vector U=0. In particular, :math:`\\mathbf{H}_0` must be independent of :math:`\\vec{U}`. Parameters may appear in all terms, however, it depends on the passed parameter of `cache_at_fixed_parameters`. This controlls whether the tensor cache is evaluated at fixed or zero parameters. In the latter case, the parameter contribution is added, which requires some overhead, but allows to vary the parameter without the demanding update of the tensor cache. If a global parameter frequenly changes its value, it is better to use cache_at_fixed_parameters=False or to pass a set of all other (rather constant) parameters to cache_at_fixed_parameters. However, frequently varying parameters excluded by cache_at_fixed_parameters (either by passing False or a set not including those) may not appear in the Hessian term H, i.e. they may appear at maximum as factor for linear or constant terms. Furthermore, you must activate the analytical Hessian via problem.set_analytic_hessian_products(True). In particular, the mesh(es) must be non-moving, as moving meshes are usually highly nonlinear. """ def __init__(self,cache_at_fixed_parameters:Union[bool,Set[str]]=True) -> None: super().__init__() self._tensor_cache_valid=False self._M0:csr_matrix self._J0:csr_matrix self._R0:NPFloatArray self._HMat:csr_matrix self._HTens:_pyoomph.SparseRank3Tensor self._ndof:int=-1 self._history_dofs:Dict[float,NPFloatArray]={} self._last_current_dofs=None self._cache_at_fixed_parameters=cache_at_fixed_parameters self._param_values:Dict[str,float]={} # Parameter values at cache assembly self._param_contribs:Dict[str,Tuple[NPFloatArray,csr_matrix,csr_matrix]] # Parameter contributions R,J,M self._lastMatM:Optional[csr_matrix]=None self._lastMatJ:Optional[csr_matrix]=None
[docs] def invalidate_time_history(self)->None: """Since obtaining the history dofs takes some time, we store them in a ring buffer. This routine clears the buffer """ self._history_dofs={} self._last_current_dofs=None
[docs] def invalidate_cache(self)->None: """Called when the system changes. This invokes a demanding recalculation of all cached tensors. """ self._tensor_cache_valid=False self.invalidate_time_history()
[docs] def actions_after_successful_newton_solve(self) -> None: """Whenever we successfully take a time step, we can backup the degrees of freedom for the next step to save some time """ if self._last_current_dofs is not None: assert self.problem t=self.problem.time_stepper_pt().time_pt().time(0) self._history_dofs[t]=self._last_current_dofs # Store the degrees at the current time
[docs] def update_tensor_cache(self)->None: """Recalculate all tensors R0, J0, M0 and H0 and potential parameter contributions to R, J and M (but not H)""" assert self.problem oldsetting=self.problem.use_custom_residual_jacobian self.problem.use_custom_residual_jacobian=False # Set it to false to get the right matrices by elemental assembly t0=time.time() dofs=self.problem.get_history_dofs(0) #store the dofs self.problem.set_current_dofs([0.0]*len(dofs)) # zero dofs ntstep=self.problem.ntime_stepper() was_steady=[False]*ntstep for i in range(ntstep): ts=self.problem.time_stepper_pt(i) was_steady[i]=ts.is_steady() ts.make_steady() pnames=self.problem.get_global_parameter_names() paramvals:Dict[str,float]={} self._param_values={} self._param_contribs={} needs_contrib:List[str]=[] for pname in pnames: paramvals[pname]=self.problem.get_global_parameter(pname).value for pname in pnames: param=self.problem.get_global_parameter(pname) if isinstance(self._cache_at_fixed_parameters,set): if pname in self._cache_at_fixed_parameters: self._param_values[pname]=param.value continue elif self._cache_at_fixed_parameters: self._param_values[pname]=param.value continue param.value=0.0 needs_contrib.append(pname) t1=time.time() print("UPDATING TENSOR CACHE") self._R0=numpy.array(self.problem.get_residuals()) #type:ignore # Initial residual n, M_nzz, M_nr, M_val, M_ci, M_rs, J_nzz, J_nr, J_val, J_ci, J_rs = self.problem.assemble_eigenproblem_matrices(0.0) #type:ignore # Mass and zero Jacobian self._M0=csr_matrix((M_val, M_ci, M_rs), shape=(n, n)).copy() #type:ignore self._J0=csr_matrix((J_val, J_ci, J_rs), shape=(n, n)).copy() #type:ignore self._J0.eliminate_zeros() #type:ignore self._M0.eliminate_zeros() #type:ignore self._J0.sort_indices() #type:ignore self._M0.sort_indices() #type:ignore self._M0._has_canonical_format=True self._J0._has_canonical_format=True t2=time.time() print("JACOBIAN AND MASS MATRIX DONE IN "+str(t2-t1)+" s") print("ASSEMBLING HESSIAN TENSOR") self._HTens=self.problem._assemble_hessian_tensor(False) print("PREPARE FOR HESSIAN VECTOR PRODUCT") colinds,rowstart=self._HTens.finalize_for_vector_product() tempvals=numpy.zeros((rowstart[-1],),dtype=numpy.float64) #type:ignore self._HMat=csr_matrix((tempvals,colinds,rowstart),shape=self._J0.shape) #type:ignore self._HMat._has_canonical_format=True #type:ignore t3=time.time() print("HESSIAN TENSOR DONE in "+str(t3-t2)+" s") for pname in needs_contrib: print("CALCULATING CONTRIBUTION OF PARAMTER "+pname) self.problem._replace_RJM_by_param_deriv(pname,True) dRdp:NPFloatArray=numpy.array(self.problem.get_residuals()) #type:ignore # Initial residual n, M_nzz, M_nr, M_val, M_ci, M_rs, J_nzz, J_nr, J_val, J_ci, J_rs = self.problem.assemble_eigenproblem_matrices(0.0) #type:ignore # Mass and zero Jacobian dMdp:csr_matrix=csr_matrix((M_val, M_ci, M_rs), shape=(n, n)).copy() #type:ignore dJdp:csr_matrix=csr_matrix((J_val, J_ci, J_rs), shape=(n, n)).copy() #type:ignore dMdp.eliminate_zeros() dJdp.eliminate_zeros() dMdp.sort_indices() dJdp.sort_indices() dMdp._has_canonical_format=True dJdp._has_canonical_format=True self._param_contribs[pname]=(dRdp,dJdp,dMdp) #type:ignore self.problem._replace_RJM_by_param_deriv(pname,False) self.problem.set_current_dofs(dofs) #type:ignore # restore dofs for i in range(ntstep): if not was_steady[i]: self.problem.time_stepper_pt(i).undo_make_steady() for pname in pnames: self.problem.get_global_parameter(pname).value=paramvals[pname] self.problem.use_custom_residual_jacobian=oldsetting #reset setting self._ndof=n self._tensor_cache_valid=True t4=time.time() print("TENSOR CACHE UPDATED in "+str(t4-t0)+" s")
[docs] def get_history_dofs(self,index:int) -> NPFloatArray: """Get the history dofs at previous time step 'index'. These are buffered to save some time, Args: index (int): 0 means current step, 1 means last step and 2 means the degrees of freedom two time steps ago. Returns: NPFloatArray: Degrees of freedom of the system. """ assert self.problem t=self.problem.timestepper.time_pt().time(index) if index==0: res=self.problem.get_history_dofs(index) # Index 0 can never be be cached. They change in each Newton step. But these are quickly obtainable self._last_current_dofs=res else: t=self.problem.timestepper.time_pt().time(index) if t in self._history_dofs.keys(): # We have buffered the dofs at this time res=self._history_dofs[t] else: res=self.problem.get_history_dofs(index) # We have to obtain them, which might require some time self._history_dofs[t]=res if len(self._history_dofs)>3: #Get rid of some history to not overcrowd memory times:List[float]=list(sorted(self._history_dofs.keys())) times_to_rem=times[:-3] # Let 3 entries alive for trem in times_to_rem: del self._history_dofs[trem] return res
[docs] def get_residuals_and_jacobian(self,require_jacobian:bool,dparameter:Optional[str]=None)->Union[NPFloatArray,Tuple[NPFloatArray,csr_matrix]]: """Get the residual vector (and potentially the Jacobian) based on the current and history dofs using the cached tensors. When a parameter changes, for which we have evaluted the tensor, we have to recalculate the cache. Args: require_jacobian (bool): Do we require the Jacobian or not? Returns: Union[NPFloatArray,Tuple[NPFloatArray,csr_matrix]]: Either the residual vector or the residual vector and the Jacobian. """ assert self.problem assert dparameter is None for pname,oldval in self._param_values.items(): param=self.problem.get_global_parameter(pname) if param.value!=oldval: self._tensor_cache_valid=False print("Invalidating Tensor cache, since parameter "+pname+" has changed. For often changing parameters, consider using cache_at_fixed_parameters=False or set a set of constant parameters") break if not self._tensor_cache_valid: self.update_tensor_cache() #t0=time.time() dofs0=self.get_history_dofs(0) if len(dofs0)!=self._ndof: self.invalidate_cache() self.update_tensor_cache() dofs0=self.get_history_dofs(0) dofs1=self.get_history_dofs(1) dofs2=self.get_history_dofs(2) #t1=time.time(); print("Getting history: "+str(t1-t0)); t0=t1 if self.problem.timestepper.get_num_unsteady_steps_done()==0: # The first step is degraded to BDF1 by default w0=self.problem.timestepper.weightBDF1(1,0) w1=self.problem.timestepper.weightBDF1(1,1) w2=0 else: w0=self.problem.timestepper.weightBDF2(1,0) w1=self.problem.timestepper.weightBDF2(1,1) w2=self.problem.timestepper.weightBDF2(1,2) matM=self._M0 matJ=self._J0 vecR=self._R0 for pname,contrib in self._param_contribs.items(): param=self.problem.get_global_parameter(pname) val=param.value vecR+=contrib[0]*val matJ+=contrib[1]*val #type:ignore matM+=contrib[2]*val #type:ignore #t1=time.time(); print("Before tens mult "+str(t1-t0)); t0=t1 self._HMat.data=self._HTens.right_vector_mult(dofs0) #t1=time.time(); print("Tens mult: "+str(t1-t0)); t0=t1 half_H_times_dof0:csr_matrix=0.5*self._HMat #type:ignore #print("HESSCONTRIB",H_times_dof0) #r=r0+JU+1/2*HUU #J=J+HU #t1=time.time(); print("Before mat asm: "+str(t1-t0)); t0=t1 matJ:csr_matrix=matJ+w0*matM+half_H_times_dof0 #type:ignore dtUold:NPFloatArray=matM@(w1*dofs1+w2*dofs2) #type:ignore residual:NPFloatArray=vecR+dtUold+matJ@dofs0 #type:ignore #t1=time.time(); print("RES asm: "+str(t1-t0)); t0=t1 #self.use_custom_residual_jacobian=False if not require_jacobian: return residual #type:ignore else: matJ+=half_H_times_dof0 #type:ignore #t1=time.time(); print("Mat asm: "+str(t1-t0)); t0=t1 self._lastMatJ=matJ self._lastMatM=matM return residual,matJ #type:ignore
def get_last_mass_and_jacobian_matrices(self)->Tuple[Optional[csr_matrix],Optional[csr_matrix]]: return self._lastMatM,self._lastMatJ