Source code for pyoomph.equations.navier_stokes

#  @file
#  @author Christian Diddens <c.diddens@utwente.nl>
#  @author Duarte Rocha <d.rocha@utwente.nl>
#  
#  @section LICENSE
# 
#  pyoomph - a multi-physics finite element framework based on oomph-lib and GiNaC 
#  Copyright (C) 2021-2025  Christian Diddens & Duarte Rocha
# 
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
# 
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# 
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>. 
#
#  The authors may be contacted at c.diddens@utwente.nl and d.rocha@utwente.nl
#
# ========================================================================
 


from .. import WeakContribution, GlobalLagrangeMultiplier
from ..generic import Equations, InterfaceEquations
from .generic import get_interface_field_connection_space, TestScaling, Scaling
from ..meshes.bcs import BoundaryCondition,DirichletBC
from ..meshes.mesh import AnyMesh, InterfaceMesh
from ..expressions import *  # Import grad et al
from ..expressions.units import degree


if TYPE_CHECKING:
    from _pyoomph import Node
    from ..solvers.generic import GenericEigenSolver
    from ..generic.codegen import EquationTree
    from ..materials.generic import AnyFluidProperties,PureLiquidProperties
    from ..generic.problem import Problem



###################################

# PRESSURE FIXATIONS ##
# if you have a Navier-Stokes domain with pure Dirichlet-BCs,
# you need exactly one degree of pressure to be pinned to remove the null-space
# These will take care of it by either pinning one nodal pressure dof (TH) or one elemental pressure dof (CR)
# The corresponding BC can be created via StokesElement.create_pressure_fixation()

class PressureFixationTaylorHood(BoundaryCondition):
    def __init__(self, pname,value:Optional[float]):
        super().__init__()
        self.value:Optional[float] = value
        self.node:Optional["Node"] = None
        self.pname=pname
        

    def setup(self):
        assert self.mesh is not None
        self.pindex = self.mesh.element_pt(0).get_code_instance().get_nodal_field_index(self.pname)
        if self.pindex < 0:
            allfields = self.mesh.element_pt(0).get_code_instance().get_nodal_field_indices()
            for k,v in allfields.items():
                print(k,v,self.mesh.element_pt(0).get_code_instance().get_nodal_field_index(k))
            raise RuntimeError("Missing nodal data for 'pressure'. Found only: "+str(allfields))
        if self.node is None:
            self.node = self.mesh.element_pt(0).node_pt(0)  # Is definitely a C1 node
            ps=[self.node.x(i) for i in range(self.node.ndim())]
            print("Got Node at " + str(ps))

    def apply(self):
        assert self.node is not None
        self.node.pin(self.pindex)
        if self.value is not None:
            self.node.set_value(self.pindex, self.value)
        print("PINNING some pressure with value",self.value)

    def _before_eigen_solve(self, eqtree:"EquationTree", eigensolver:"GenericEigenSolver",angular_m:Optional[int]) -> bool:
        if angular_m is not None:
            raise RuntimeError("Do not use pressure_fixation with angular eigensolving. Use [Navier]StokesEquation(...).with_pressure_integral_constraint(problem) instead...")
        return False



class PressureFixationScottVogelius(BoundaryCondition):
    def __init__(self, pname,value:Optional[float]):
        super().__init__()
        self.value = value
        self.pname= pname
    
    def apply(self):
        assert self.mesh is not None
        for e in self.mesh.elements():
            fl=e.get_field_data_list(self.pname,False)
            for d,ind in fl:
                d.unpin(ind)
        fl=self.mesh.element_pt(0).get_field_data_list(self.pname,False)[0]
        fl[0].pin(fl[1])
        if self.value is not None:
            fl[0].set_value(fl[1], self.value)

    def _before_eigen_solve(self, eqtree:"EquationTree", eigensolver:"GenericEigenSolver",angular_m:Optional[float]) -> bool:
        if angular_m is not None:
            raise RuntimeError("Do not use pressure_fixation with angular eigensolving. Use [Navier]StokesEquation(...).with_pressure_integral_constraint(problem) instead...")
        return False

class PressureFixationCrouzeixRaviart(BoundaryCondition):
    def __init__(self, pname, value:Optional[float]):
        super().__init__()
        self.value = value
        self.pname=pname

    def setup(self):
        assert self.mesh is not None
        self.pindex = self.mesh.element_pt(0).get_code_instance().get_discontinuous_field_index(self.pname)
        if self.pindex < 0:
            raise RuntimeError("Missing internal data for 'pressure'")

    def apply(self):
        assert self.mesh is not None
        for ei in range(self.mesh.nelement()):
            self.mesh.element_pt(ei).internal_data_pt(self.pindex).unpin(0)
        self.mesh.element_pt(0).internal_data_pt(self.pindex).pin(0)
        if self.value is not None:
            self.mesh.element_pt(0).internal_data_pt(self.pindex).set_value(0, self.value)

    def _before_eigen_solve(self, eqtree:"EquationTree", eigensolver:"GenericEigenSolver",angular_m:Optional[float]) -> bool:
        if angular_m is not None:
            raise RuntimeError("Do not use pressure_fixation with angular eigensolving. Use [Navier]StokesEquation(...).with_pressure_integral_constraint(problem) instead...")
        return False


###################################

class PFEMOptions:
    def __init__(self,*,active:bool=True,first_order_system:bool=False,direct_position_update:bool=False) -> None:
        self.active=active
        self.first_order_system=first_order_system
        self.direct_position_update=direct_position_update

###################################

[docs] class StokesEquations(Equations): """ .. _StokesEquations: Represents the Stokes equations, defined by the second-order partial differential equations (PDEs): .. math:: \\partial_t \\rho + \\nabla \\cdot (\\rho \\vec{u}) = 0 .. math:: \\nabla \\cdot [-\\nabla p \\vec{\\vec{I}} + \\mu (\\nabla \\vec{u} + (\\nabla \\vec{u})^\\text{T})] + f = 0 \\, where :math:`p` is the pressure, :math:`\\vec{\\vec{I}}` is the identity matrix, :math:`\\vec{u}` the velocity, :math:`\\mu` the dynamic viscosity, :math:`\\rho` is the mass density and :math:`f` the bulk force. In the weak form, the equations are written as: .. math:: (\\partial_t \\rho, q) + (\\rho \\vec{u}, \\nabla q) = 0 .. math:: - (-p \\vec{\\vec{I}} + \\mu [\\nabla \\vec{u} + (\\nabla \\vec{u})^\\text{T}], \\nabla \\vec{v} + (\\nabla \\vec{v})^\\text{T}) + (f,\\vec{v}) + \\langle \\vec{n} \\cdot [-p I + \\mu (\\nabla \\vec{u} + (\\nabla \\vec{u})^\\text{T})], \\vec{v} \\rangle = 0 \\, where :math:`\\vec{v}` and :math:`q` are test functions of the velocity and pressure, respectively. Args: dynamic_viscosity (ExpressionOrNum, optional): Dynamic viscosity of the fluid (if `fluid_props==None`). Defaults to 1.0. mode (Literal["TH","CR"], optional): Use Taylor-Hood ("TH") or Crouzeix-Raviart ("CR") elements. Defaults to "TH". bulkforce (ExpressionNumOrNone, optional): Bulk force term. Defaults to None. fluid_props (Optional[AnyFluidProperties], optional): Fluid properties. Defaults to None, meaning mass density and dynamic viscosity will be set by the "mass_density" and "dynamic_viscosity" arguments. gravity (ExpressionNumOrNone, optional): Gravity term in vector form. Defaults to None. boussinesq (bool, optional): Use Boussinesq approximation, i.e. use constant density in continuity equation. Defaults to False. mass_density (ExpressionNumOrNone, optional): Mass density of the fluid (if 'fluid_props==None'). Only used for gravity and the continuity equation. Defaults to None. pressure_sign_flip (bool, optional): Reverse the pressure sign to get a symmetric matrix. Defaults to False. momentum_scheme (TimeSteppingScheme, optional): Time-stepping scheme for the momentum equation. Defaults to "BDF2". continuity_scheme (TimeSteppingScheme, optional): Time-stepping scheme for the continuity equation. Defaults to "BDF2". wrong_strain (bool, optional): Use mu*grad(u) instead of 2*mu*sym(grad(u)) for strain. Defaults to False. pressure_factor (ExpressionOrNum, optional): Multiplicative factor for the pressure in the momentum equation. Defaults to 1. PFEM (Union[PFEMOptions,bool], optional): Options for Particle Finite Element Method (PFEM). Defaults to False. stress_tensor (ExpressionNumOrNone, optional): Custom stress tensor. Defaults to None. velocity_name (str, optional): Name of the velocity field. Defaults to "velocity". pressure_name (str, optional): Name of the pressure field. Defaults to "pressure". DG_alpha (ExpressionNumOrNone, optional): If using Discontinuous Galerkin discretisation, set penalty coefficient alpha for jump terms of the stress tensor. Defaults to None. symmetric_test_function (Union[Literal['auto'],bool], optional): Use symmetric test functions for the momentum equation. Defaults to 'auto'. pressure_test_scaling_factor (float, optional): Multiplicative scaling factor for the pressure test function. Defaults to 1.0. """ def __init__(self, *, dynamic_viscosity:ExpressionOrNum=1.0, mode:Literal["TH","CR","SV","C1","D2D1","D1D0","D2TBD1","mini","C2DL"]="TH", bulkforce:ExpressionNumOrNone=None, fluid_props:Optional["AnyFluidProperties"]=None, gravity:ExpressionNumOrNone=None, boussinesq:bool=False, mass_density:ExpressionNumOrNone=None, pressure_sign_flip:bool=False,momentum_scheme:TimeSteppingScheme="BDF2",continuity_scheme:TimeSteppingScheme="BDF2",wrong_strain:bool=False,pressure_factor:ExpressionOrNum=1, PFEM:Union[PFEMOptions,bool]=False, stress_tensor:ExpressionNumOrNone=None,velocity_name="velocity",pressure_name="pressure",DG_alpha:ExpressionNumOrNone=None,symmetric_test_function:Union[Literal['auto'],bool]='auto',pressure_test_scaling_factor:float=1): super().__init__() self.bulkforce = bulkforce # Some arbitrary bulk-force vector self.gravity = gravity # Some gravity direction, i.e. g*<unit vector of direction> if mode not in {"CR","TH","C1","C2","SV","D2TBD1","D2D1","D1D0","mini","C2DL"}: raise ValueError( "(Navier-)Stokes equations argument 'mode' needs to be 'CR' for Crouzeix-Raviart element or 'TH' for Taylor-Hood equations. Experimentally also 'C1' and 'C2' are possible. If the mesh is constructed correctly, 'SV' for Scott-Vogelius also works. 'mini' elements are only possible on triangles. Also, discontinuous variants 'D2D1' and 'D1D0' are currently in development, i.e. experimental") self.mode:Literal["TH","CR","C1","C2","SV","D2D1","D1D0","mini","C2DL"] = mode self.requires_interior_facet_terms=self.mode in {"D2D1","D1D0","D2TBD1"} self.DG_alpha=DG_alpha self.pressure_test_scaling_factor=pressure_test_scaling_factor if self.mode in {"D2D1","D1D0"}: if self.DG_alpha is None: raise RuntimeError(f"Must set DG_alpha if mode=='{self.mode}'") if symmetric_test_function=="auto": symmetric_test_function=True else: if symmetric_test_function=="auto": symmetric_test_function=False self.symmetric_test_function:bool=symmetric_test_function self.boussinesq = boussinesq # If set, we only solve div(u)=0, else we solve div(u)=-1/rho*(partial_t(rho)+u*grad(rho)) if fluid_props is not None: self.fluid_props = fluid_props self.dynamic_viscosity = fluid_props.dynamic_viscosity self.mass_density = fluid_props.mass_density else: self.fluid_props = None self.dynamic_viscosity = dynamic_viscosity self.mass_density = mass_density self.pressure_sign_flip=pressure_sign_flip self.momentum_scheme:TimeSteppingScheme=momentum_scheme self.continuity_scheme:TimeSteppingScheme=continuity_scheme if PFEM: if PFEM==True: self.PFEM_options=PFEMOptions() else: self.PFEM_options=PFEM if not self.PFEM_options.first_order_system: self.momentum_scheme="Newmark2" self.continuity_scheme="Newmark2" else: self.PFEM_options=False self.wrong_strain=wrong_strain self.pressure_factor=pressure_factor self.stress_tensor=stress_tensor self.velocity_name=velocity_name self.pressure_name=pressure_name def get_velocity_space_from_mode(self,for_interface=False): velospace={"C1":"C1","CR":"C2TB","TH":"C2","SV":"C2","D2D1":"D2","D1D0":"D1","D2TBD1":"D2TB","mini":"C1TB","C2DL":"C2","C2":"C2"} res=velospace[self.mode] if for_interface: if res=="C2TB": res="C2" elif res=="C1TB": res="C1" elif res[0]=="D": raise RuntimeError("Discont here") return res def get_pressure_space_from_mode(self): pspace={"C1":"C1","CR":"DL","TH":"C1","SV":"D1","D2D1":"D1","D1D0":"D0","D2TBD1":"D1","mini":"C1","C2DL":"DL","C2":"C2"} return pspace[self.mode] def define_fields(self): vspace=self.get_velocity_space_from_mode() pspace=self.get_pressure_space_from_mode() if self.PFEM_options and self.PFEM_options.active: self.activate_coordinates_as_dofs(coordinate_space=vspace) if self.PFEM_options.first_order_system: self.define_vector_field(self.velocity_name, vspace) else: self.define_vector_field(self.velocity_name, vspace) self.define_scalar_field(self.pressure_name, pspace) def define_scaling(self): U = self.get_scaling(self.velocity_name) X = self.get_scaling("spatial") P = self.get_scaling(self.pressure_name) self.set_test_scaling(**{self.velocity_name:X / P}) if self.PFEM_options and self.PFEM_options.active: if self.PFEM_options.first_order_system: self.set_test_scaling(mesh_x=scale_factor("temporal")/scale_factor(self.velocity_name)) self.set_test_scaling(mesh_y=scale_factor("temporal")/scale_factor(self.velocity_name)) self.set_test_scaling(mesh_z=scale_factor("temporal")/scale_factor(self.velocity_name)) else: self.set_test_scaling(mesh_x=self.velocity_name) self.set_test_scaling(mesh_y=self.velocity_name) self.set_test_scaling(mesh_z=self.velocity_name) self.set_test_scaling(pressure=X / U*self.pressure_test_scaling_factor) self.add_named_numerical_factor(p_in_momentum_eq=scale_factor(self.pressure_name)*test_scale_factor(self.velocity_name)/scale_factor("spatial")*self.pressure_test_scaling_factor) self.add_named_numerical_factor(div_u__in_conti_eq=scale_factor(self.velocity_name) * test_scale_factor(self.pressure_name) / scale_factor("spatial")) def define_stress_tensor(self): u = var(self.velocity_name) p = var(self.pressure_name) visc = self.dynamic_viscosity if visc is None: raise RuntimeError("viscosity not set") if self.wrong_strain: strain=grad(u)/2 else: strain = sym(grad(u)) # 1/2*(grad(u)+grad(u)^t) # Newtonian fluid if self.stress_tensor==None: stress_tensor = 2 * visc * strain - identity_matrix() * self.pressure_factor*p*(-1 if self.pressure_sign_flip else 1) else: stress_tensor = self.stress_tensor return stress_tensor def define_residuals(self): if self.PFEM_options and self.PFEM_options.active: if not self.PFEM_options.first_order_system: x, u_test = var_and_test("mesh") u=mesh_velocity(scheme=self.momentum_scheme) vectcomps:List[str]=[] for i,direct in enumerate((["x","y","z"])[:self.get_nodal_dimension()]): vectcomps.append("velocity_"+direct) self.define_field_by_substitution("velocity_"+direct,mesh_velocity(scheme=self.momentum_scheme)[i],also_on_interface=True) self.define_testfunction_by_substitution("velocity_"+direct,testfunction("mesh_"+direct),also_on_interface=True) self.define_testfunction_by_substitution(self.velocity_name,vector(*[testfunction(f)/test_scale_factor(self.velocity_name) for f in vectcomps]),also_on_interface=True) self.define_testfunction_by_substitution("mesh",vector(*[testfunction(f)/test_scale_factor(self.velocity_name) for f in vectcomps]),also_on_interface=True) self.define_field_by_substitution(self.velocity_name,vector(*[var(f)/scale_factor(self.velocity_name) for f in vectcomps]),also_on_interface=True) self.add_local_function(self.velocity_name,var(self.velocity_name)) self._get_combined_element()._vectorfields[self.velocity_name]=vectcomps else: u, u_test = var_and_test(self.velocity_name) if self.PFEM_options.direct_position_update: self.add_residual(weak(var("mesh")-evaluate_in_past(var("mesh"))-var(self.velocity_name)*timestepper_weight(1,0,"BDF2"),testfunction("mesh"))) else: self.add_residual(weak(partial_t(var("mesh"))-var(self.velocity_name),testfunction("mesh"))) else: u, u_test = var_and_test(self.velocity_name) p, p_test = var_and_test(self.pressure_name) # Get stress tensor stress_tensor = self.define_stress_tensor() # Residuals if self.symmetric_test_function: Dv=sym(grad(u_test)) else: Dv=grad(u_test) self.add_residual(weak(time_scheme(self.momentum_scheme,stress_tensor), Dv)) # total stress self.add_residual(weak(time_scheme(self.continuity_scheme,div(u)), p_test)) # Incompressibility if not self.boussinesq and self.mass_density is not None: rho = self.mass_density self.add_residual(weak(time_scheme(self.continuity_scheme,partial_t(rho, ALE=False) / rho + dot(u, grad(rho)) / rho), p_test)) # Incompressibility if self.bulkforce is not None: self.add_residual(-weak(time_scheme(self.momentum_scheme,self.bulkforce), u_test)) # bulk force if self.gravity is not None: if self.mass_density is None: raise RuntimeError("Must set mass_density if using gravity") self.add_residual(-weak(time_scheme(self.momentum_scheme,self.gravity * self.mass_density), u_test)) # gravity force force if self.requires_interior_facet_terms: n,h=var(["normal","element_length_h"]) # optionally, at_facet=True can be added to all jumps/averages that don't have grad in their argument Du=grad(u)/2 if self.wrong_strain else sym(grad(u)) Dv=grad(u_test) if not self.symmetric_test_function else sym(grad(u_test)) facet_terms=weak(-matproduct(2*self.dynamic_viscosity*avg(Du),n),jump(u_test)) facet_terms+=weak(-jump(u),matproduct(avg(Dv),n)) facet_terms+=weak(self.DG_alpha*2**(2 if self.mode=="D2D1" else 1)/h*jump(u),jump(u_test)) facet_terms+=weak(dot(jump(u),n),avg(p_test)) # okay facet_terms+=weak(avg(p),dot(jump(u_test),n)) self.add_interior_facet_residual(facet_terms) # In case of complete Dirichlet velocity conditions, we need to fix a single dof of pressure # A single node is selected in case of Taylor hood, otherwise a single element is selected def create_pressure_fixation(self, *, value:Optional[float]=None)->Union[PressureFixationTaylorHood,PressureFixationCrouzeixRaviart,PressureFixationScottVogelius]: if self.mode in {"TH","C1","mini"}: return PressureFixationTaylorHood(self.pressure_name,value) elif self.mode == "CR": return PressureFixationCrouzeixRaviart(self.pressure_name,value) elif self.mode == "SV": return PressureFixationScottVogelius(self.pressure_name,value) else: raise RuntimeError("Cannot add a pressure fixation for this mode") # To be used a StokesEquation(...).with_pressure_fixation()
[docs] def with_pressure_fixation(self,*,nondim_p_value:Optional[float]=None) -> Equations: """ Instead of adding ``StokesEquation``, add ``StokesEquation.with_pressure_fixation(...)`` to remove the pressure nullspace in case of pure Dirichlet boundary conditions for the normal flow. With this method, a single pressure dof is pinned to remove the nullspace. Args: problem: The problem where the equations are added. integral_value: The integral value of the pressure over the domain. ode_domain_name: Domain name for the Lagrange multiplier enforcing the pressure integral. Defaults to "globals". lagrange_name: Name of the global Lagrange multiplier. Defaults to "lagr_intconstr_pressure". set_zero_on_normal_mode_eigensolve: Deactivate when solving angular eigenvalue problems. Defaults to True. Returns: The (Navier-)Stokes equations with the pressure integral constraint. """ fix=self.create_pressure_fixation(value=nondim_p_value) return self+fix
[docs] def with_pressure_integral_constraint(self, problem:"Problem", integral_value:ExpressionOrNum=0, *, ode_domain_name:str="globals",lagrange_name:str="lagr_intconstr_pressure", set_zero_on_normal_mode_eigensolve:bool=True) -> Equations: """ Instead of adding ``StokesEquation``, add ``StokesEquation.with_pressure_integral_constraint(...)`` to remove the pressure nullspace in case of pure Dirichlet boundary conditions for the normal flow. With this method, the integral of the pressure is constrained to a given value via a global Lagrange multiplier. Args: problem: The problem where the equations are added. integral_value: The integral value of the pressure over the domain. ode_domain_name: Domain name for the Lagrange multiplier enforcing the pressure integral. Defaults to "globals". lagrange_name: Name of the global Lagrange multiplier. Defaults to "lagr_intconstr_pressure". set_zero_on_normal_mode_eigensolve: Deactivate when solving angular eigenvalue problems. Defaults to True. Returns: The (Navier-)Stokes equations with the pressure integral constraint. """ eq_additions = self eq_additions += WeakContribution(var(self.pressure_name), testfunction(lagrange_name, domain=ode_domain_name),dimensional_dx=False) eq_additions += WeakContribution(var(lagrange_name, domain=ode_domain_name), testfunction(self.pressure_name),dimensional_dx=False) ode_additions = GlobalLagrangeMultiplier(**{lagrange_name:integral_value},set_zero_on_normal_mode_eigensolve=set_zero_on_normal_mode_eigensolve) ode_additions +=TestScaling(**{lagrange_name:1/scale_factor(self.pressure_name)}) ode_additions += Scaling(**{lagrange_name: 1 / test_scale_factor(self.pressure_name)}) problem.add_equations(ode_additions @ ode_domain_name) return eq_additions
################################## class NavierStokesNormalTraction(InterfaceEquations): required_parent_type = StokesEquations def __init__(self, normal_traction:ExpressionOrNum): super(NavierStokesNormalTraction, self).__init__() self.normal_traction = normal_traction def define_residuals(self): peqs=self.get_parent_equations(StokesEquations) assert isinstance(peqs,StokesEquations) _, utest = var_and_test(peqs.velocity_name) n = self.get_normal() self.add_residual(weak(self.normal_traction, dot(n, utest))) # TODO: Merge this with the free surface and activate it if there is an opposite side
[docs] class ConnectVelocityAtInterface(InterfaceEquations): """ Connects the velocity field at the interface between two domain. This is done by introducing Lagrange multipliers for each velocity component. The Lagrange multipliers are then used to enforce the continuity of the velocity field across the interface. This class requires the parent equations to be of type StokesEquations, meaning that if StokesEquations (or subclasses) are not defined in the parent domain, an error will be raised. Args: lagr_mult_prefix (str, optional): Prefix for name of the Lagrange multipliers. Defaults to "_lagr_velo_conn". mass_transfer_rate (ExpressionOrNum, optional): Mass transfer rate in case there is mass transfer across the interface. Defaults to 0. use_highest_space (bool, optional): If True, the highest space is used for the Lagrange multipliers. Defaults to False. """ required_parent_type = StokesEquations def __init__(self, lagr_mult_prefix:str="_lagr_velo_conn", mass_transfer_rate:ExpressionOrNum=0,use_highest_space:bool=False,normal_velocity_jump:ExpressionOrNum=0): super(ConnectVelocityAtInterface, self).__init__() self.lagr_mult_prefix = lagr_mult_prefix self.mass_transfer_rate = mass_transfer_rate self.use_highest_space=use_highest_space self.normal_velocity_jump=normal_velocity_jump def get_required_fields(self) -> List[str]: flow_eqs=self.get_parent_equations(StokesEquations) assert isinstance(flow_eqs,StokesEquations) fields = [flow_eqs.velocity_name+ "_x", flow_eqs.velocity_name+"_y", flow_eqs.velocity_name+"_z"] if isinstance(self.get_coordinate_system(),AxisymmetryBreakingCoordinateSystem): return fields[0:self.get_nodal_dimension()]+[flow_eqs.velocity_name+"_phi"] elif isinstance(self.get_coordinate_system(),CartesianCoordinateSystemWithAdditionalNormalMode): return fields[0:self.get_nodal_dimension()]+[flow_eqs.velocity_name+"_normal"] else: return fields[0:self.get_nodal_dimension()] def define_fields(self): fields=self.get_required_fields() for f in fields: if self.get_opposite_side_of_interface(raise_error_if_none=False) is None: raise RuntimeError("Cannot connect any fields at the interface if no opposite side is present") inside_space = cast(Union[FiniteElementSpaceEnum,Literal[""]], self.get_parent_domain().get_space_of_field(f)) if inside_space == "": raise RuntimeError( "Cannot connect field " + f + " at the interface, since it cannot find in the inner domain") opp_parent=self.get_opposite_side_of_interface().get_parent_domain() assert opp_parent is not None outside_space = cast(Union[FiniteElementSpaceEnum,Literal[""]], opp_parent.get_space_of_field(f)) if outside_space == "": raise RuntimeError( "Cannot connect field " + f + " at the interface, since it cannot find in the outer domain") space=get_interface_field_connection_space(inside_space,outside_space,self.use_highest_space) assert space!="" self.define_scalar_field(self.lagr_mult_prefix + f, space) # However, we must remove the m=1 connection for the radial component aziinfo=self.get_azimuthal_r0_info() csys=self.get_coordinate_system() for f in fields: for i in [0,1,2]: if f in aziinfo[i]: aziinfo[i].add(self.lagr_mult_prefix + f) else: if self.lagr_mult_prefix + f in aziinfo[i]: aziinfo[i].remove(self.lagr_mult_prefix + f) def define_scaling(self): fields=self.get_required_fields() flow_eqs=self.get_parent_equations(StokesEquations) assert isinstance(flow_eqs,StokesEquations) for f in fields: self.set_test_scaling(**{self.lagr_mult_prefix + f:1/scale_factor(flow_eqs.velocity_name)}) self.set_scaling(**{self.lagr_mult_prefix + f: 1 / test_scale_factor(flow_eqs.velocity_name)}) def define_residuals(self): fields = self.get_required_fields() n = self.get_normal() ins_ns=self.get_parent_domain().get_equations().get_equation_of_type(StokesEquations) opp_par=self.get_opposite_side_of_interface().get_parent_domain() assert opp_par is not None out_ns=opp_par.get_equations().get_equation_of_type(StokesEquations) assert isinstance(ins_ns,StokesEquations) assert isinstance(out_ns,StokesEquations) rho_inside = ins_ns.mass_density rho_outside = out_ns.mass_density if rho_outside is not None: rho_outside = evaluate_in_domain(rho_outside, self.get_opposite_side_of_interface()) else: rho_outside=0 if (self.mass_transfer_rate is not None) and self.mass_transfer_rate != 0: assert rho_inside is not None masstrans = subexpression(self.mass_transfer_rate / rho_inside - self.mass_transfer_rate / rho_outside) else: masstrans = 0 for i, f in enumerate(fields): l, l_test = var_and_test(self.lagr_mult_prefix + f) inside, inside_test = var_and_test(f) outside, outside_test = var_and_test(f, domain=self.get_opposite_side_of_interface()) self.add_residual(weak(inside - outside - masstrans * n[i]+self.normal_velocity_jump*n[i], l_test )) self.add_residual(weak(l, inside_test) ) self.add_residual(-weak(l , outside_test)) def before_assigning_equations_postorder(self, mesh:"AnyMesh"): assert isinstance(mesh,InterfaceMesh) assert mesh._opposite_interface_mesh is not None if mesh.nelement() == 0 or mesh._opposite_interface_mesh.nelement() == 0: return fields=self.get_required_fields() for i,_ in enumerate(fields): self.pin_redundant_lagrange_multipliers(mesh,self.lagr_mult_prefix + fields[i],[fields[i]],opposite_interface=[fields[i]])
[docs] class NoSlipBC(DirichletBC): """ No-slip boundary condition for the Navier-Stokes equations. This is enforced by setting the degrees of freedom of the velocity field to zero at the specified boundary. It is a subclass of DirichletBC and inherits all its arguments. Args: velocity_name (str, optional): Name of the velocity field. Defaults to "velocity". """ def __init__(self): super().__init__() self.veloname="velocity" def define_residuals(self): self._dcs={} dim = self.get_nodal_dimension() dirs = ["x", "y", "z"] ns=self.get_parent_domain().get_equations().get_equation_of_type(StokesEquations) lagr=False if isinstance(ns,StokesEquations) and ns.PFEM_options and ns.PFEM_options.active: for i in range(dim): self._dcs["mesh_"+dirs[i]]=True lagr=True if ns.PFEM_options.first_order_system: for i in range(dim): self._dcs[self.veloname+"_"+dirs[i]]=0 else: for i in range(dim): self._dcs[self.veloname+"_"+dirs[i]]=0 cs=self.get_coordinate_system() if cs is not None: if isinstance(cs,AxisymmetryBreakingCoordinateSystem): if lagr: raise RuntimeError("TODO") self._dcs[self.veloname+"_phi"]=0 elif isinstance(cs,CartesianCoordinateSystemWithAdditionalNormalMode): if lagr: raise RuntimeError("TODO") self._dcs[self.veloname+"_normal"]=0 super(NoSlipBC, self).define_residuals()
[docs] class StokesFlowRadialFarField(InterfaceEquations): """ Enforces a far-field condition for a radialsymmetric inward or outward flow. Args: infinity_pressure (ExpressionOrNum): Pressure at infinity. Defaults to 0. """ def __init__(self,infinity_pressure:ExpressionOrNum=0): super().__init__() self.pinfty=infinity_pressure required_parent_type=StokesEquations def define_residuals(self): stokes_eqs=self.get_parent_equations() if not isinstance(stokes_eqs,StokesEquations): raise RuntimeError("Must be applied on StokesEquations") u,utest=var_and_test(stokes_eqs.velocity_name) uB,uBtest=var_and_test(stokes_eqs.velocity_name,domain="..") strain=2*stokes_eqs.dynamic_viscosity*sym(grad(uB)) n=var("normal") normstrain=matproduct(strain,n) p,ptest=var_and_test(stokes_eqs.pressure_name) self.add_weak(-normstrain+self.pinfty*n,utest)