# @file
# @author Christian Diddens <c.diddens@utwente.nl>
# @author Duarte Rocha <d.rocha@utwente.nl>
#
# @section LICENSE
#
# pyoomph - a multi-physics finite element framework based on oomph-lib and GiNaC
# Copyright (C) 2021-2025 Christian Diddens & Duarte Rocha
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The authors may be contacted at c.diddens@utwente.nl and d.rocha@utwente.nl
#
# ========================================================================
import _pyoomph
import inspect
from ..expressions.generic import Expression, ExpressionOrNum, FiniteElementSpaceEnum
from ..meshes.mesh import InterfaceMesh, MeshFromTemplate1d,MeshFromTemplate2d,MeshFromTemplate3d, assert_spatial_mesh
from ..generic.codegen import Equations, InterfaceEquations, BaseEquations
from ..expressions import testfunction, weak, var_and_test, test_scale_factor, scale_factor
from ..utils.smallest_circle import make_circle
import scipy.spatial #type:ignore
import numpy
from ..expressions.coordsys import AxisymmetryBreakingCoordinateSystem,AxisymmetricCoordinateSystem
from ..typings import *
if TYPE_CHECKING:
from ..meshes.mesh import AnySpatialMesh,Node,AnyMesh
from ..solvers.generic import GenericEigenSolver
from ..generic.codegen import EquationTree,FiniteElementCodeGenerator
class BoundaryCondition(Equations):
def __init__(self):
super(BoundaryCondition, self).__init__()
self.mesh = None
self.active = True
self.mesh:Optional["AnySpatialMesh"]=None
def setup(self):
pass
def set_mesh(self, mesh:"AnySpatialMesh"):
self.mesh = mesh
self.setup()
def apply(self):
pass
def on_apply_boundary_conditions(self, mesh:"AnyMesh"):
mesh=assert_spatial_mesh(mesh)
if (self.mesh is None) or (self.mesh != mesh):
self.set_mesh(mesh)
self.apply()
[docs]
class NeumannBC(InterfaceEquations):
"""
Class to impose Neumann boundary condition. The particular meaning of the Neumann flux depends on the bulk equations, i.e. how the integration by parts was performed for the weak formulation of the bulk equations.
For a Poisson equation implemented by the residual weak(grad(u),grad(utest)), the Neumann condition
``NeumannBC(u=1) @ "boundary"``
does not neither mean setting ``u=1``, but rather dot(grad(u),var("normal"))=-1, where normal vector is pointing outward the domain at the boundary.
Parameters:
**fluxes: Dictionary of fluxes, where the keys are the names of the fluxes and the values are expressions or numbers.
"""
def __init__(self, **fluxes:ExpressionOrNum):
super(NeumannBC, self).__init__()
self.fluxes = fluxes.copy()
def define_residuals(self):
for name, flux in self.fluxes.items():
test = testfunction(name)
self.add_residual(weak(flux, test))
# Automatic Neumann condition either takes a single flux as pos. arg (when there is only one equation of the required_parent_type)
# or by named fluxes
class AutomaticNeumannCondition(InterfaceEquations):
neumann_sign = 1
def __init__(self, flux:Optional[ExpressionOrNum]=None, **named_fluxes:ExpressionOrNum):
super(AutomaticNeumannCondition, self).__init__()
self.flux = flux
self.named_fluxes = named_fluxes.copy()
if (self.flux is None) and (len(self.named_fluxes) != 0):
raise RuntimeError("AutomaticNeumannCondition must be either constructed with a single pos. arg or with **kwargs")
def get_parent_var_name(self, parent_eq:BaseEquations)->str:
assert hasattr(parent_eq,"name")
assert isinstance(parent_eq.name,str) #type:ignore
return parent_eq.name #type:ignore
def define_residuals(self):
bulkeqs = self.get_parent_equations()
if isinstance(bulkeqs, (list, tuple)):
if self.flux is not None:
raise RuntimeError(
"Cannot use " + self.__class__.__name__ + " with a positional argument only if there are multiple bulk equations of type " + str(
self.required_parent_type))
else:
for k, v in self.named_fluxes.items():
_, u_test = var_and_test(k)
self.add_residual(self.neumann_sign * weak(v, u_test))
else:
assert self.flux is not None
assert bulkeqs is not None
_, u_test = var_and_test(self.get_parent_var_name(bulkeqs))
self.add_residual(self.neumann_sign * weak(self.flux, u_test))
[docs]
class EnforcedBC(InterfaceEquations):
"""
Enforce rather arbitrary boundary conditions by a field of Lagrange multipliers.
As an example,
EnforcedBC(u=var("u")-var("v")) @ "boundary"
will set u=v on the boundary by adjusting u. The rhs must be a constraint in residual formulation, here u-v=0.
Args:
only_for_stationary_solve (bool, optional): Flag indicating if the enforced boundary conditions should only be applied during stationary solves. Defaults to False.
set_zero_on_normal_mode_eigensolve (bool, optional): Flag indicating if the enforced boundary conditions should be set to zero during azimuthal eigensolves. Defaults to False.
**constraints (Expression): Keyword arguments representing the enforced boundary conditions as pair of variable name to adjust and constraint expression to fulfill in residual form.
"""
def __init__(self,*, only_for_stationary_solve:bool=False, set_zero_on_normal_mode_eigensolve=False,**constraints:Expression):
super(EnforcedBC, self).__init__()
self.constraints = constraints.copy()
self.lagrangian:bool = False
self.only_for_stationary_solve=only_for_stationary_solve
self.set_zero_on_normal_mode_eigensolve=set_zero_on_normal_mode_eigensolve
def get_lagrange_multiplier_name(self, varname:str)->str:
return "_lagr_enf_bc_" + varname
def define_fields(self):
allowed_spaces= {"C1","C1TB","C2","C2TB","D1","D1TB","D2","D2TB"}
for k, _ in self.constraints.items():
sp = self.get_parent_domain().get_space_of_field(k)
if sp == "":
ppdom=self.get_parent_domain().get_parent_domain()
if ppdom is not None:
sp = ppdom.get_space_of_field(k)
if sp == "":
# Test if it is a vector field
# print(dir(self.get_parent_domain().get_equations()))
# expanded = self.expand_additional_field(k, True, 0, self.get_current_code_generator(),False,False)
# peqs = self.get_parent_domain().get_equations()
raise RuntimeError("Cannot use EnforcedBC on an unknown field " + k)
if sp not in allowed_spaces:
if sp == "Pos":
sp = self.get_current_code_generator()._coordinate_space
else:
raise RuntimeError("EnforcedBC only works the following bulk spaces:"+", ".join(allowed_spaces)+". problem for field " + k + " on space " + sp)
self.define_scalar_field(self.get_lagrange_multiplier_name(k), cast(FiniteElementSpaceEnum, sp), scale=1 / test_scale_factor(k),testscale=1 / scale_factor(k))
aziinfo=self.get_azimuthal_r0_info()
for k in self.constraints.keys():
ln=self.get_lagrange_multiplier_name(k)
for i in [0,1,2]:
if k in aziinfo[i]:
aziinfo[i].add(ln)
else:
if ln in aziinfo[i]:
aziinfo[i].remove(ln)
def define_residuals(self):
for k, v in self.constraints.items():
lagr_name=self.get_lagrange_multiplier_name(k)
l, ltest = var_and_test(lagr_name) # get the Lagrange multiplier
utest = testfunction(k)
self.add_residual(weak(v, ltest, lagrangian=self.lagrangian))
self.add_residual(weak(l, utest,lagrangian=self.lagrangian)) # Lagrange multiplier pair to enforce it
if self.only_for_stationary_solve:
self.set_Dirichlet_condition(lagr_name,0)
def before_assigning_equations_postorder(self, mesh:"AnyMesh"):
# Pin redundant Lagrange multipliers
assert isinstance(mesh,InterfaceMesh)
assert mesh._eqtree._parent is not None #type: ignore
bulkmesh = mesh._eqtree._parent._mesh #type: ignore
assert bulkmesh is not None
codeinst_inside = mesh.element_pt(0).get_code_instance()
for k, _ in self.constraints.items():
index = [codeinst_inside.get_nodal_field_index(k)] # TODO: Vectors
psindex = None
nfi=None
spaceDG=None
if any(i < 0 for i in index):
if k == "mesh_x":
psindex = 0
elif k == "mesh_y":
psindex = 1
elif k == "mesh_z":
psindex = 2
elif mesh.has_interface_dof_id(k)>=0:
nfi=mesh.has_interface_dof_id(k)
else:
spc=mesh._eqtree.get_code_gen().get_space_of_field(k)
if spc in {"D2TB","D2","D1"}:
spaceDG=spc
else:
raise RuntimeError("Cannot find a nodal index for field " + k+". Defined on space: "+spc)
lname = self.get_lagrange_multiplier_name(k)
if spaceDG is None:
interfid = bulkmesh.has_interface_dof_id(lname)
if interfid < 0:
raise RuntimeError(
f"Something strange here. We have the bulk mesh '{bulkmesh.get_name()}' and it does not have the interface id '{lname}'") #
for n in mesh.nodes():
if psindex is not None:
if n.variable_position_pt().is_pinned(psindex):
lind = n.additional_value_index(interfid)
n.pin(lind)
n.set_value(lind, 0)
elif nfi is not None:
nfind = n.additional_value_index(nfi)
if n.is_pinned(nfind):
lind = n.additional_value_index(interfid)
n.pin(lind)
n.set_value(lind, 0)
elif all(n.is_pinned(i) for i in index):
lind = n.additional_value_index(interfid)
n.pin(lind)
n.set_value(lind, 0)
else:
for e in mesh.elements():
dg_data=e.get_field_data_list(k,False)
l_data=e.get_field_data_list(lname,False)
for dg,l in zip(dg_data,l_data):
if dg[0].is_pinned(dg[1]):
l[0].pin(l[1])
l[0].set_value(l[1],0)
def after_compilation(self,codegen:"FiniteElementCodeGenerator"):
super().after_compilation(codegen)
assert codegen._mesh is not None
if self.only_for_stationary_solve:
for k, _ in self.constraints.items():
# Do not activate by default to allow for initial conditions
lagr_name=self.get_lagrange_multiplier_name(k)
codegen._mesh._set_dirichlet_active(lagr_name,False)
def _before_stationary_or_transient_solve(self, eqtree:"EquationTree", stationary:bool)->bool:
must_reapply=False
if self.set_zero_on_normal_mode_eigensolve:
pr=self.get_mesh()._problem
from ..generic.bifurcation_tools import _NormalModeBifurcationTrackerBase
if pr.get_bifurcation_tracking_mode() == "azimuthal" or (pr.get_custom_assembler() is not None and isinstance(pr.get_custom_assembler(),_NormalModeBifurcationTrackerBase)):
#if self.get_mesh()._problem._azimuthal_mode_param_m.value!=0:
return False # Don't do anything in this case. It would mess up everything!
mesh=eqtree._mesh
assert mesh is not None
for k in self.constraints.keys():
lagr_name=self.get_lagrange_multiplier_name(k)
if self.only_for_stationary_solve:
if mesh._get_dirichlet_active(lagr_name) == stationary:
mesh._set_dirichlet_active(lagr_name, not stationary)
must_reapply = True
else:
if mesh._get_dirichlet_active(lagr_name)==True:
mesh._set_dirichlet_active(lagr_name,False)
must_reapply=True
return must_reapply
def _get_forced_zero_dofs_for_eigenproblem(self, eqtree:"EquationTree", eigensolver:"GenericEigenSolver", angular_mode:Optional[int],normal_k:Optional[float])->Set[Union[str,int]]:
if (not self.set_zero_on_normal_mode_eigensolve) or (angular_mode is None and normal_k is None):
return cast(Set[str],set())
else:
if angular_mode is not None and normal_k is not None:
raise RuntimeError("Cannot have both angular and normal mode set")
if angular_mode is not None:
mode=int(angular_mode)
elif normal_k is not None:
mode=normal_k
fullpath = eqtree.get_full_path().lstrip("/")
if mode == 0:
return cast(Set[str],set())
else:
for_my_m = [self.get_lagrange_multiplier_name(k) for k in self.constraints.keys()]
lst=[fullpath + "/" + k for k in for_my_m]
res:Set[str] = set(lst)
return res
[docs]
class DirichletBC(BaseEquations):
"""
Class to impose one or more Dirichlet boundary condition.
Args:
prefer_weak_for_DG (bool, optional): Flag indicating whether to prefer weak contributions for Discontinuous Galerkin (DG) methods. If set and the bulk equations provide a specific implementation of get_weak_dirichlet_terms_for_DG, these terms are used to enforce the condition in a weak sense. Otherwise, just stronly. Defaults to True.
**kwargs (ExpressionOrNum): Keyword arguments representing the Dirichlet conditions, where the keys are the variable names and the values are the corresponding expressions or numbers. Expressions for strong DirichletBCs may not depend on unknowns.
"""
def __init__(self, *, prefer_weak_for_DG: bool = True, **kwargs: ExpressionOrNum):
super(DirichletBC, self).__init__()
self._dcs: Dict[str, ExpressionOrNum] = {}
self._dcs.update(kwargs)
self.prefer_weak_for_DG = prefer_weak_for_DG
def define_residuals(self):
pdom = self.get_parent_domain()
peqs = pdom.get_equations() if pdom is not None else None
if not isinstance(peqs, Equations):
peqs = None
for n, val in self._dcs.items():
if self.prefer_weak_for_DG and (peqs is not None) and (val is not True):
# Check if some equation is defining weak contributions instead
weak_DBC = peqs.get_weak_dirichlet_terms_for_DG(n, val)
if weak_DBC is not None:
self.add_residual(weak_DBC) # Add the weak Dirichlet
continue
# Otherwise, strong Dirichlet
self.set_Dirichlet_condition(n, val)
def get_information_string(self) -> str:
return ", ".join([str(n) + "=" + str(v) for n, v in self._dcs.items()])
[docs]
class EnforcedDirichlet(EnforcedBC):
"""
Enforces a DirichletBC by Lagrange multipliers.
As an example,
``EnforcedDirichlet(u=var("v")) @ "boundary"``
will just be the same as :py:class:`~pyoomph.meshes.bcs.EnforcedBC` ``(u=var("u")-var("v")) @ "boundary"``
Args:
only_for_stationary_solve (bool, optional): Flag indicating if the enforced boundary conditions should only be applied during stationary solves. Defaults to False.
set_zero_on_normal_mode_eigensolve (bool, optional): Flag indicating if the enforced boundary conditions should be set to zero during azimuthal eigensolves. Defaults to False.
**constraints (Expression): Keyword arguments representing the enforced boundary conditions as pair of variable name to adjust and constraint expression to fulfill in residual form.
"""
def __init__(self,*, only_for_stationary_solve:bool=False, set_zero_on_normal_mode_eigensolve=False,**constraints:Expression):
from ..expressions import var
new_kwargs={k:var(k)-v for k,v in constraints.items()}
super(EnforcedDirichlet, self).__init__(only_for_stationary_solve=only_for_stationary_solve, set_zero_on_normal_mode_eigensolve=set_zero_on_normal_mode_eigensolve,**new_kwargs.copy())
[docs]
class InactiveDirichletBC(DirichletBC):
"""
Same as 'DirichletBC', but it starts deactivated, i.e. the Neumann term will be active by default.
To activate the Dirichlet condition, you must call the set_dirichlet_active(...) method of the Mesh class, which you can obtain by problem.get_mesh(...).
Afterwards, it is important to call Problem.reapply_boundary_conditions(), e.g.
problem.get_mesh("domain/interface").set_dirichlet_active(u=True) # activate the BC
problem.reapply_boundary_conditions() # Renumber the equations and apply BCs
problem.solve() # solve with active DirichletBC
"""
def __init__(self, **kwargs: ExpressionOrNum):
super().__init__(**kwargs)
self._init_setup_for_mesh:Set[AnyMesh]=set()
def before_assigning_equations_preorder(self, mesh: "AnyMesh"):
if mesh in self._init_setup_for_mesh: # Only init it once during problem init. Someone might have switched it later on
return super().before_assigning_equations_preorder(mesh)
mesh.set_dirichlet_active(**{k:False for k in self._dcs.keys()})
self._init_setup_for_mesh.add(mesh) # Don't ever set it again for this mesh
# TODO: Check redefine_problem and/or remeshing
return super().before_assigning_equations_preorder(mesh)
def after_remeshing(self, eqtree: "EquationTree"):
raise RuntimeError("Check remeshing settings here...")
return super().after_remeshing(eqtree)
[docs]
class AxisymmetryBC(InterfaceEquations):
r"""
Add this to the axis of symmetry to automatically enforce the boundary condition required by symmetry.
Also automatically sets the correct boundary conditions for azimuthal eigenvalue problems.
For normal solving, it sets radial (and azimuthal) components of vector fields (also mesh_x) to zero.
For azimuthal eigenvalue problems, it depends on the azimuthal mode m:
:math:`m=0`: As for normal solving.
:math:`|m|=1`: scalar fields and axial vector components are set to zero, radial and azimuthal components are not.
:math:`|m|\geq 2`: scalar fields and all vector components are set to zero
If you write an equation, where you want to change this behavior, you can manually change the conditions by obtaining the (writeable) field information via :py:meth:`~pyoomph.generic.codegen.Equations.get_azimuthal_r0_info` after the definition via :py:meth:`~pyoomph.generic.codegen.Equations.define_scalar_field` or :py:meth:`~pyoomph.generic.codegen.Equations.define_vector_field`.
Notes:
Must be also added to intersections of other boundaries with the axis of symmetry, when the other boundaries define additional fields, e.g.
bulk=NavierStokesEquations(...)
bulk+=AxisymmetryBC()@"axis"
bulk+=NavierStokesFreeSurface()@"interface"
bulk+=AxisymmetryBC()@"interface/axis" # This is important, since the free surface introduces new fields at the interface
This is, however, done automatically if the recurse flag is set to True.
"""
def __init__(self,verbose:bool=True,recurse:bool=True):
super().__init__()
self.verbose=verbose
self.recurse=recurse
def _fill_interinter_connections(self, eqtree:"EquationTree", interinter):
if self.recurse:
from ..generic.codegen import EquationTree
# Now find the reversed connections. We get e.g. domain/axis/interface, but we must add it to domain/interface/axis
revconns=list()
trunk=eqtree.get_parent().get_full_path().lstrip("/")
myname=eqtree.get_my_path_name()
for conn in interinter:
rest=conn[len(eqtree.get_full_path().lstrip("/")):].lstrip("/")
path=trunk+"/"+rest+"/"+myname
revconns.append(path)
revconns.sort(key=lambda x: x.count("/")) # Sort by number of slashes to get it in good order
root=eqtree
while root.get_parent() is not None:
root=root.get_parent()
for rc in revconns:
splt=rc.split("/")
dom=root
is_present=True
for s in splt[:-1]:
if s in dom._children:
dom=dom.get_child(s)
else:
is_present=False
break
if not is_present:
continue # Nothing to be done. There is no interface added
if splt[-1] in dom._children:
iface=dom.get_child(splt[-1])
if iface.get_equations() is not None:
axieq_list=iface.get_equations().get_equation_of_type(AxisymmetryBC,always_as_list=True)
if len(axieq_list)>0:
continue # Already added
else:
oldeqs=dom._children[splt[-1]]._equations
dom._children[splt[-1]]._equations+=AxisymmetryBC(verbose=self.verbose,recurse=self.recurse)
dom._children[splt[-1]]._equations._problem=oldeqs._problem
else:
dom._children[splt[-1]]=EquationTree(AxisymmetryBC(verbose=self.verbose,recurse=self.recurse),dom)
dom._children[splt[-1]]._equations._problem=dom._equations._problem
return super()._fill_interinter_connections(eqtree, interinter)
def define_residuals(self):
if self.verbose:
print("AxisymmetryBC: Setting zero DirichletBCs at",self.get_current_code_generator().get_full_name(),"for",self.get_azimuthal_r0_info()[0])
for k in self.get_azimuthal_r0_info()[0]:
self.set_Dirichlet_condition(k,0)
def _before_stationary_or_transient_solve(self, eqtree:"EquationTree", stationary:bool)->bool:
must_reapply=False
#if self.get_mesh()._problem.get_bifurcation_tracking_mode() == "azimuthal":
from ..generic.bifurcation_tools import _NormalModeBifurcationTrackerBase
pr=self.get_mesh()._problem
if pr.get_bifurcation_tracking_mode() == "azimuthal" or (pr.get_custom_assembler() is not None and isinstance(pr.get_custom_assembler(),_NormalModeBifurcationTrackerBase)):
return False # Don't do anything in this case. It would mess up everything!
mesh=eqtree._mesh
assert mesh is not None
activated_bcs=set()
for k in self.get_azimuthal_r0_info()[0]:
if mesh._get_dirichlet_active(k) == False:
activated_bcs.add(k)
mesh._set_dirichlet_active(k, True)
must_reapply = True
if len(activated_bcs)>0 and self.verbose:
print("AxisymmetryBC: Activating zero DirichletBCs at",self.get_current_code_generator().get_full_name(),"for",activated_bcs)
return must_reapply
def _before_eigen_solve(self, eqtree:"EquationTree", eigensolver:"GenericEigenSolver",angular_m:Optional[float]=None,normal_k:Optional[float]=None) -> bool:
if angular_m is None or angular_m==0:
return False
must_reapply = False
assert eqtree._mesh is not None
deactivated_bcs=set()
for k in self.get_azimuthal_r0_info()[0]:
if eqtree._mesh._get_dirichlet_active(k):
deactivated_bcs.add(k)
eqtree._mesh._set_dirichlet_active(k, False)
must_reapply = True
if len(deactivated_bcs)>0 and self.verbose:
print("AxisymmetryBC: Deactivating strong zero DirichletBCs at",self.get_current_code_generator().get_full_name(),"for",deactivated_bcs)
return must_reapply
def _get_forced_zero_dofs_for_eigenproblem(self, eqtree, eigensolver, angular_mode, normal_k):
if angular_mode is None:
return set()
angular_mode=int(angular_mode)
info=None
if angular_mode==0:
info=self.get_azimuthal_r0_info()[0]
elif abs(angular_mode)==1:
info=self.get_azimuthal_r0_info()[1]
elif abs(angular_mode)>1:
info=self.get_azimuthal_r0_info()[2]
if info is None:
res=set()
else:
res=set([eqtree.get_full_path().lstrip("/")+"/"+m for m in info])
if len(info)>0 and self.verbose:
print("AxisymmetryBC (mode m="+str(angular_mode)+"): Imposed zero by matrix manipulation at",self.get_current_code_generator().get_full_name(),"for",info)
return res
# Scalar fields on DG space => set the corresponding eigenfunctions
class AxisymmetryBCForScalarD0Field(InterfaceEquations):
def __init__(self,*fields:str):
super().__init__()
self.fields=[f for f in fields]
def _get_forced_zero_dofs_for_eigenproblem(self, eqtree: "EquationTree", eigensolver: "GenericEigenSolver", angular_mode: Union[int,None],normal_k:Optional[float]) -> Set[Union[str,int]]:
eqs=set()
if angular_mode!=0:
for ie in eqtree._mesh.elements():
be=ie.get_bulk_element()
for f in self.fields:
fi=be.get_code_instance().get_discontinuous_field_index(f)
if fi<0:
raise RuntimeError("Discontinuous parent field '"+str(f)+"' not known here")
eqs.add(be.internal_data_pt(fi).eqn_number(0))
return eqs
[docs]
class PeriodicBC(InterfaceEquations):
"""
Introduces a periodic boundary condition between two interfaces. It will hold for all continuous fields!
The mesh must be generated that way that for each node on this interface, there is a corresponding node on the other interface when adding offset to the position.
Attributes:
other_interface (str): The name of the other interface to which this boundary is periodic.
offset (Optional[List[ExpressionOrNum]]): The offset to find the corresponding nodes on the other interface.
"""
def __init__(self, other_interface: str, offset: Optional[List[ExpressionOrNum]] = None):
super(PeriodicBC, self).__init__()
self.other_interface = other_interface
if offset is None:
raise RuntimeError("Please supply an offset")
elif not isinstance(offset,(list,tuple)):
self.offset=[offset]
else:
self.offset = offset
def before_finalization(self, codegen: "FiniteElementCodeGenerator"):
bulkdom = self.get_parent_domain()
if bulkdom.get_nodal_dimension()!=len(self.offset):
raise RuntimeError("The offset of the PeriodicBC must have the same dimension as the nodal dimension of the mesh")
while bulkdom.get_parent_domain() is not None:
raise RuntimeError("Cannot yet apply periodic boundaries on interfaces on interfaces")
pmesh = bulkdom.get_equations().get_mesh()
assert isinstance(pmesh, (MeshFromTemplate1d, MeshFromTemplate2d, MeshFromTemplate3d))
bnames = pmesh.get_boundary_names()
my_name = self.get_mesh().get_name()
ss = self.get_scaling("spatial")
offs = [float(o / ss) for o in self.offset]
if my_name not in bnames:
raise RuntimeError("Cannot find boundary '" + my_name + "' in bulk mesh")
if self.other_interface not in bnames:
raise RuntimeError("Cannot find boundary '" + self.other_interface + "' in bulk mesh")
my_nodes_by_pos: Dict[Tuple[float, ...], _pyoomph.Node] = {}
for n in pmesh.boundary_nodes(my_name):
ps = [n.x(i) + offs[i] for i in range(n.ndim())]
my_nodes_by_pos[tuple(ps)] = n
dataG: List[List[float]] = []
master_nodes: List[_pyoomph.Node] = []
for n in pmesh.boundary_nodes(self.other_interface):
ps = [n.x(i) for i in range(n.ndim())]
dataG.append(ps)
master_nodes.append(n)
data = numpy.array(dataG) # type:ignore
if len(data) != len(my_nodes_by_pos): # type:ignore
raise RuntimeError("Mismatch in number of nodes for a periodic boundary")
kdtree = scipy.spatial.KDTree(data) # type:ignore
slave_to_master:Dict[_pyoomph.Node,_pyoomph.Node]=dict()
master_to_slave:Dict[_pyoomph.Node,_pyoomph.Node]=dict()
for ps, nslave in my_nodes_by_pos.items():
qres = kdtree.query(ps) # type:ignore
if qres[0] > 1e-6: # type:ignore
raise RuntimeError("Cannot find a periodic node at the position " + str(ps))
nmaster: Node = master_nodes[qres[1]] # type:ignore
if len(nmaster.get_boundary_indices()) >= 2:
if not len(nslave.get_boundary_indices()) >= 2:
raise RuntimeError(
"A periodic node on a single boundary shall be copied from a master that lies on more than one boundary")
pmesh._periodic_corner_node_info[nslave] = nmaster # type:ignore
slave_to_master[nslave]=nmaster
master_to_slave[nmaster]=nslave
continue
elif len(nslave.get_boundary_indices()) >= 2:
raise RuntimeError(
"A periodic corner node located on multiple boundaries shall be attached to a periodic master on a single boundary")
slave_to_master[nslave]=nmaster
master_to_slave[nmaster]=nslave
nslave._make_periodic(nmaster, pmesh)
# If we have a quad tree, we must also connect the quad tree here
if pmesh.refinement_possible():
myind=bnames.index(my_name)
oppind=bnames.index(self.other_interface)
oppnodes_to_oppelem:Dict[_pyoomph.Node,_pyoomph.Element]=dict()
for oppelem,direct in pmesh.boundary_elements(self.other_interface,with_directions=True):
oppnodes_to_oppelem[tuple(oppelem.boundary_nodes(oppind))]=(oppelem,direct)
for myelem,direct in pmesh.boundary_elements(my_name,with_directions=True):
my_nodes_on_bind=myelem.boundary_nodes(myind)
search_for=tuple(slave_to_master[n] for n in my_nodes_on_bind)
oppelem=oppnodes_to_oppelem.get(search_for,None)
if oppelem is None:
raise RuntimeError("Cannot identify the corresponding periodic boundary element on the other interface.")
myelem._connect_periodic_tree(oppelem[0],direct,oppelem[1])
class PythonDirichletBC(BoundaryCondition):
def __init__(self, **kwargs:Union[ExpressionOrNum,Literal[True],Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]):
super(PythonDirichletBC, self).__init__()
self.vals = kwargs.copy()
self.unpin_instead:bool = False
def _is_ode(self):
return None
def get_information_string(self) -> str:
return ", ".join([str(k) + "=" + str(v) for k, v in self.vals.items()])
def setup(self):
assert self.mesh is not None
self.indexvals:Dict[int,Union[float,Literal[True],Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]] = {}
self.indexval_arginds = {}
self.additional_vals:Dict[int,Union[float,Literal[True],Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]] = {}
self.pinnedpositions:Dict[int,Union[float,Literal[True],Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]] = {}
self.internal_vals:Dict[int,Union[float,Literal[True],Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]] = {}
codeinst = self.mesh.element_pt(0).get_code_instance()
currcodegen = self.get_current_code_generator()
vals:Dict[str,Union[ExpressionOrNum,Literal[True],Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]] = {}
for k, val in self.vals.items():
if k == "mesh_x" or k == "mesh_y" or k == "mesh_z":
vals[k] = val
continue
nodalfield = codeinst.get_nodal_field_index(k)
if nodalfield < 0:
internalfield = codeinst.get_discontinuous_field_index(k)
if internalfield < 0:
interfid = self.mesh.has_interface_dof_id(k)
if interfid == -1:
# Last chance: is is an additional field, e.g a vector:
if k == "mesh":
dim = currcodegen.get_nodal_dimension()
if dim == 1:
vals[k + "_x"] = val
continue
elif dim == 2:
vals[k + "_x"] = val
vals[k + "_y"] = val
continue
elif dim == 3:
vals[k + "_x"] = val
vals[k + "_y"] = val
vals[k + "_z"] = val
continue
replaced = None
if k in currcodegen.get_equations()._additional_fields.keys():
replaced = self._additional_fields[k]
elif self.get_parent_domain() is not None:
bulk = self.get_parent_domain()
while bulk is not None:
bulkeq = bulk.get_equations()
if k in bulkeq._additional_fields_also_on_interface.keys():
replaced = bulkeq._additional_fields_also_on_interface[k]
break
bulk = bulk.get_parent_domain()
if replaced is not None:
if not isinstance(replaced,Expression):
replaced=Expression(replaced)
if _pyoomph.GiNaC_is_a_matrix(replaced):
for index in range(replaced.nops()):
if not replaced[index].is_zero():
print(replaced[index])
raise RuntimeError("Cannot set a boundary condition by expanding yet")
else:
raise RuntimeError("Cannot set a boundary condition for an unknown field " + str(k))
else:
raise RuntimeError("Cannot set a boundary condition for an unknown field " + str(k))
else:
vals[k] = val
else:
vals[k] = val
else:
vals[k] = val
assert self.mesh._codegen is not None
for k, val in vals.items():
if k == "mesh_x" or k == "mesh_y" or k == "mesh_z":
scal = self.mesh._codegen.get_scaling("spatial")
if val is True:
fval = True
else:
assert not isinstance(val,tuple)
fval = float(val / scal)
if k == "mesh_x":
self.pinnedpositions[0] = fval
continue
elif k == "mesh_y":
self.pinnedpositions[1] = fval
continue
elif k == "mesh_z":
self.pinnedpositions[2] = fval
continue
nodalfield = codeinst.get_nodal_field_index(k)
scal = self.mesh._codegen.get_scaling(k)
fval:Union[float,bool,Tuple[Callable[...,ExpressionOrNum],List[int],Expression]]
if not isinstance(val, bool) or val != True:
try:
fval = float(val / scal) #type:ignore ##TODO Functions here
except:
if callable(val):
arg_inds:List[int] = []
for a in inspect.signature(val).parameters:
if a == "mesh_x" or a == "coordinate_x":
arg_inds.append(-1)
elif a == "mesh_y" or a == "coordinate_y":
arg_inds.append(-2)
else:
raise RuntimeError("Lambda argument " + a + " not yet resolved")
fval = (val, arg_inds, scal)
else:
raise
else:
fval=True
if nodalfield < 0:
internalfield = codeinst.get_discontinuous_field_index(k)
if internalfield < 0:
# Last chance: Get the index from an additional interface field
interfid = self.mesh.has_interface_dof_id(k)
if interfid == -1:
raise RuntimeError(
"Cannot find a nodal field, and elemental field or and additional interface field with name '" + k + "' to set a DirichletBC")
else:
self.additional_vals[interfid] = True if (isinstance(val, bool) and val == True) else fval
else:
self.internal_vals[internalfield] = True if (isinstance(val, bool) and val == True) else fval
else:
self.indexvals[nodalfield] = True if (isinstance(val, bool) and val == True) else fval
def apply(self):
if not self.active:
return
assert self.mesh is not None
for n in self.mesh.nodes():
for i, val in self.indexvals.items():
if self.unpin_instead:
n.unpin(i)
else:
n.pin(i)
if not (isinstance(val, bool) and val == True):
if isinstance(val, tuple) and callable(val[0]):
arglst = [0.0] * len(val[1])
for j, ind in enumerate(val[1]):
if ind == -1:
arglst[j] = n.x(0)
elif ind == -2:
arglst[j] = n.x(1)
elif ind == -3:
arglst[j] = n.x(2)
v = float(val[0](*arglst) / val[2])
n.set_value(i, v)
else:
assert isinstance(val,float)
n.set_value(i, val)
for i, val in self.pinnedpositions.items():
assert not isinstance(val,tuple)
if self.unpin_instead:
n.unpin_position(i)
else:
n.pin_position(i)
if not (isinstance(val, bool) and val == True):
n.set_x(i, val)
for id, val in self.additional_vals.items():
i = n.additional_value_index(id)
assert not isinstance(val,tuple)
if i >= 0:
if self.unpin_instead:
n.unpin(i)
else:
n.pin(i)
if not (isinstance(val, bool) and val == True):
n.set_value(i, val)
if len(self.internal_vals) > 0:
for ei in range(self.mesh.nelement()):
e = self.mesh.element_pt(ei)
# TODO: Where
for idi, val in self.internal_vals.items():
d = e.internal_data_pt(idi)
for vi in range(d.nvalue()):
if self.unpin_instead:
d.unpin(vi)
else:
d.pin(vi)
if not (isinstance(val, bool) and val == True):
raise RuntimeError("TODO: Setting DL values")
class PinWhere(PythonDirichletBC):
def __init__(self, where:Callable[...,bool], **kwargs:Union[ExpressionOrNum,Literal[True]]):
super(PinWhere, self).__init__(**kwargs)
self.where = where
def apply(self):
if not self.active:
return
assert self.mesh is not None
if len(self.additional_vals) > 0:
raise RuntimeError("Cannot use PinWhere yet on interface fields")
for n in self.mesh.nodes():
# Check the where condition
xv:List[float] = []
for xi in range(n.ndim()):
xv.append(n.x(xi))
if self.where(*xv) == False:
continue
for i, val in self.indexvals.items():
if self.unpin_instead:
n.unpin(i)
else:
n.pin(i)
if not (isinstance(val, bool) and val == True):
assert isinstance(val,float)
n.set_value(i, val)
for i, val in self.pinnedpositions.items():
if self.unpin_instead:
n.unpin_position(i)
else:
n.pin_position(i)
if not (isinstance(val, bool) and val == True) and isinstance(val,float):
n.set_x(i, val)
# Pin the mesh (2d only) for points that are further away than distance from the interface
# At the moment, we only allow to make the smallest circle around all interface points, add the distance to the radius
# and take this as reference. However, it will not co-move (we always have to do a setup_pinning to refresh)
# Further modes (e.g. "pointwise" may follow)
class PinMeshAtDistanceToInterface(PinWhere):
def __init__(self, interface_names:Union[str,Set[str]], distance:ExpressionOrNum, mode:str="smallest_circle"):
super(PinMeshAtDistanceToInterface, self).__init__(where=lambda : False, mesh_x=True, mesh_y=True)
if isinstance(interface_names, str):
self.interface_names = {interface_names}
else:
self.interface_names = set(self.interface_names)
self.distance = distance
self._circle_x = 0
self._circle_y = 0
self._circle_radius = 0
def _build_where_func(self):
assert self.mesh is not None
pts:List[Tuple[float,float]] = []
for inter in self.interface_names:
for n in self.mesh.boundary_nodes(inter):
pts.append((n.x(0), n.x(1),))
self._circle_x, self._circle_y, self._circle_radius = make_circle(pts)
self._circle_radius += float(self.distance / self.mesh.get_problem().get_scaling("spatial"))
self.where:Callable[[float,float],bool] = lambda x, y: (x - self._circle_x) ** 2 + (y - self._circle_y) ** 2 > self._circle_radius ** 2
def apply(self):
if not self.active:
return
self._build_where_func()
super(PinMeshAtDistanceToInterface, self).apply()
class UnpinDofs(PythonDirichletBC):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.unpin_instead=True
[docs]
class InteriorBoundaryOrientation(InterfaceEquations):
"""
Named interior boundaries within a domain are by default double-layered, i.e. interface elements are added from both sides.
This can usually cause problems. In order to avoid this, we have to specify the orientation of the boundary, i.e. only interface elements are added from one side, namely where the indicator function is positive.
For a unit circle ``"circle"`` embedded in a domain, you could e.g. add ``InteriorBoundaryOrientation(dot(var("coordinate"),var("normal)))@"circle"`` to only add interface elements with an outward pointing normal.
Args:
InterfaceEquations (_type_): _description_
"""
def __init__(self,indicator:ExpressionOrNum):
super().__init__()
self.indicator=indicator
def define_residuals(self):
self.add_local_function("__interface_constraint",self.indicator)