Source code for pyoomph.equations.advection_diffusion

#  @file
#  @author Christian Diddens <c.diddens@utwente.nl>
#  @author Duarte Rocha <d.rocha@utwente.nl>
#  
#  @section LICENSE
# 
#  pyoomph - a multi-physics finite element framework based on oomph-lib and GiNaC 
#  Copyright (C) 2021-2025  Christian Diddens & Duarte Rocha
# 
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
# 
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# 
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>. 
#
#  The authors may be contacted at c.diddens@utwente.nl and d.rocha@utwente.nl
#
# ========================================================================
from ..materials.generic import MixtureLiquidProperties,MixtureGasProperties
from .. import GlobalLagrangeMultiplier, WeakContribution
from ..generic import Equations,InterfaceEquations
from ..expressions import * #Import grad et al

from ..typings import *

if TYPE_CHECKING:
   from ..generic import Problem

[docs] class AdvectionDiffusionEquations(Equations): r""" .. _AdvectionDiffusionEquations: Represents the advection-diffusion equation in the form: .. math:: \partial_t u + \mathbf{b} \cdot \nabla u - \nabla \cdot (D \nabla u) = f where :math:`u` is the dependent scalar variable, :math:`D` is the diffusion coefficient, :math:`\mathbf{b}` is the advection velocity, and :math:`f` is the source term. In the weak form, the equation reads: .. math:: (\partial_t u, v) + (\mathbf{b} \cdot \nabla u, v) + (D \nabla u, \nabla v) - \langle \nabla u \cdot \mathbf{n} , v \rangle = (f, v) Args: fieldnames(Union[str,List[str]]): The name(s) of the dependent variable(s). Default is "advdiffu". diffusivity(ExpressionOrNum): The diffusion coefficient. Default is 1. space(FiniteElementSpaceEnum): The finite element space. Default is "C2", i.e. second order continuous Lagrangian elements. consider_scaling(bool): Whether to consider scaling. Default is True. fluid_props(Optional[Union[MixtureLiquidProperties,MixtureGasProperties]]): The fluid properties. Default is None. wind(ExpressionOrNum): The advection velocity. Default is var("velocity"). dt_factor(ExpressionOrNum): Multiplicative time step factor. Default is 1. time_scheme(Optional[TimeSteppingScheme]): The time stepping scheme. Default is None. source(Union[ExpressionOrNum,Dict[str,ExpressionOrNum]]): The source term. Default is {}. advection_by_parts(Union[bool,Literal["skew"]]): Whether to integrate by partsthe weak form of the advective term. Default is False. velocity_name_for_scaling(str): The name of the velocity for scaling. Default is "velocity". """ def __init__(self,fieldnames:Union[str,List[str]]="advdiffu",*,diffusivity:ExpressionOrNum=1,space:"FiniteElementSpaceEnum"="C2",consider_scaling:bool=True,fluid_props:Optional[Union[MixtureLiquidProperties,MixtureGasProperties]]=None,wind:ExpressionOrNum=var("velocity"),dt_factor:ExpressionOrNum=1,time_scheme:Optional[TimeSteppingScheme]=None,source:Union[ExpressionOrNum,Dict[str,ExpressionOrNum]]={},advection_by_parts:Union[bool,Literal["skew"]]=False,velocity_name_for_scaling="velocity"): super().__init__() self.dt_factor=dt_factor self.diffusivity=diffusivity self.space:"FiniteElementSpaceEnum"=space self.wind=wind self.velocity_name_for_scaling=velocity_name_for_scaling self.time_scheme:Optional[TimeSteppingScheme]=time_scheme self.advection_by_parts=advection_by_parts if isinstance(fieldnames,str): self.fieldnames=[fieldnames] else: self.fieldnames=fieldnames if not isinstance(source,dict): self.source={n:source for n in self.fieldnames} else: self.source=source self.consider_scaling=consider_scaling self.fluid_props=fluid_props self.component_names:Dict[str,str]={} if self.fluid_props is not None: self.fieldnames:List[str]=[] for n in self.fluid_props.required_adv_diff_fields: self.component_names["massfrac_"+n]=n self.fieldnames.append("massfrac_"+n) #print(self.fluid_props.required_adv_diff_fields) #print(dir(self.fluid_props)) #exit() #self.diffusivity=fluid_props.diffusivity self.spatial_error_estimators=True def define_fields(self): remaining:Expression=Expression(1) remaining_test:Optional[Expression]=None mydom=self.get_my_domain() for f in self.fieldnames: ts=scale_factor("spatial")/scale_factor(self.velocity_name_for_scaling)/scale_factor(f) if self.consider_scaling else 1 self.define_scalar_field(f,self.space,testscale=ts) remaining-=var(f,domain=mydom) if remaining_test is None: remaining_test=-testfunction(f,domain=mydom) if self.fluid_props is not None: assert self.fluid_props.passive_field is not None and isinstance(self.fluid_props.passive_field,str) assert remaining_test is not None self.define_field_by_substitution("massfrac_"+self.fluid_props.passive_field,remaining,also_on_interface=True) self.define_testfunction_by_substitution("massfrac_"+self.fluid_props.passive_field,remaining_test,also_on_interface=True) sum_massfrac_by_molar_mass=0 for n,c in self.fluid_props.pure_properties.items(): sum_massfrac_by_molar_mass+=var("massfrac_"+n,domain=mydom)/c.molar_mass self.define_field_by_substitution("molefrac_"+n, (var("massfrac_"+n,domain=mydom)/c.molar_mass)/var("_sum_massfrac_by_molar_mass",domain=mydom),also_on_interface=True) sum_massfrac_by_molar_mass=subexpression(sum_massfrac_by_molar_mass) self.define_field_by_substitution("_sum_massfrac_by_molar_mass",sum_massfrac_by_molar_mass,also_on_interface=True) def get_diffusion_coefficient(self,f1:str,f2:Optional[str]=None) -> ExpressionNumOrNone: if f2 is None: f2=f1 if self.fluid_props is not None: return self.fluid_props.get_diffusion_coefficient(f1,f2,default=0) if f1!=f2: raise RuntimeError("Implement mixed diffusion between "+str(f1)+" and "+str(f2) ) return self.diffusivity def define_residuals(self): if self.time_scheme is None: ts:Callable[[Expression],Expression]=lambda x :x else: ts:Callable[[Expression],Expression]=lambda x: time_scheme(cast(TimeSteppingScheme,self.time_scheme),x) if self.fluid_props is not None: for fn in self.fieldnames: f, f_test = var_and_test(fn) self.add_residual(weak(ts(self.dt_factor*partial_t(f)-self.source.get(fn,0)),f_test)) if self.advection_by_parts=="skew": self.add_residual(-weak( ts(self.wind* f),grad(f_test))/2) self.add_residual(weak(ts(dot(self.wind, grad(f))), f_test)/2) elif self.advection_by_parts: self.add_residual(-weak( ts(self.wind* f),grad(f_test))) else: self.add_residual(weak(ts(dot(self.wind, grad(f))), f_test)) for fn2 in self.fieldnames: f2,_=var_and_test(fn2) diffuD=self.get_diffusion_coefficient(self.component_names[fn],self.component_names[fn2]) assert diffuD is not None self.add_residual(weak(ts(diffuD*grad(f2)),grad(f_test))) else: for fn in self.fieldnames: f, f_test = var_and_test(fn) self.add_residual(weak(ts(self.dt_factor * partial_t(f)),f_test)-weak(ts(self.source.get(fn, 0)),f_test)) if self.advection_by_parts=="skew": self.add_residual(-weak( ts(self.wind* f),grad(f_test))/2) self.add_residual(weak(ts(dot(self.wind, grad(f))), f_test)/2) elif self.advection_by_parts: self.add_residual(-weak(ts(self.wind*f), grad(f_test))) else: self.add_residual(weak(ts(dot(self.wind,grad(f))),f_test)) diffuD=self.diffusivity self.add_residual(weak(ts(diffuD*grad(f)),grad(f_test))) # Use this to either fix the average or the total integral of the field, i.e. add eqs+=AdvectionDiffusionEquations(...).with_integral_constraint(...) def with_integral_constraint(self,problem:"Problem",*,average:Optional[Union[Dict[str,ExpressionOrNum],ExpressionOrNum]]=None,integral:Optional[Union[Dict[str,ExpressionOrNum],ExpressionOrNum]]=None,ode_domain_name:str="globals",lagrange_prefix:Union[str,Dict[str,str]]="lagr_intconstr_",set_zero_on_normal_mode_eigensolve:bool=True) -> Equations: eq_additions=self if average is None and integral is None: raise ValueError("Please either specify average= or integral=") if average is None: average={} elif isinstance(average,dict): average=average.copy() else: if len(self.fieldnames)==1: average={self.fieldnames[0]:average} else: raise RuntimeError("Cannot set all averages like this") if integral is None: integral={} elif isinstance(integral,dict): integral=integral.copy() else: integral = {self.fieldnames[0]: integral} possible_fields=self.fieldnames lagr_mults:Dict[str,ExpressionOrNum]={} lagr_names:Dict[str,str]={} for k in possible_fields: if k in average.keys() and k in integral.keys(): raise ValueError("Cannot set simultaneously average and integral for the field "+str(k)) if k in average.keys(): lagr_names[k]=(lagrange_prefix+k) if isinstance(lagrange_prefix,str) else lagrange_prefix[k] lagr_mults[lagr_names[k]]=0 eq_additions+=WeakContribution(var(k)-average[k],testfunction(lagr_names[k],domain=ode_domain_name)) eq_additions+=WeakContribution(var(lagr_names[k],domain=ode_domain_name),testfunction(k)) elif k in integral.keys(): lagr_names[k]=(lagrange_prefix+k) if isinstance(lagrange_prefix,str) else lagrange_prefix[k] lagr_mults[lagr_names[k]]=-integral[k] eq_additions+=WeakContribution(var(k),testfunction(lagr_names[k],domain=ode_domain_name),dimensional_dx=True) eq_additions+=WeakContribution(var(lagr_names[k],domain=ode_domain_name),testfunction(k),dimensional_dx=True) ode_additions=GlobalLagrangeMultiplier(**lagr_mults,set_zero_on_normal_mode_eigensolve=set_zero_on_normal_mode_eigensolve) problem.add_equations(ode_additions@ode_domain_name) return eq_additions
[docs] class AdvectionDiffusionFluxInterface(Equations): """ Represents the flux through the interface that naturally arises from the integration by parts of the diffusion term in the advection-diffusion equation. Args: **kwargs: name of the flux and its value. """ def __init__(self, **kwargs:ExpressionOrNum): super(AdvectionDiffusionFluxInterface, self).__init__() self.fluxes=kwargs.copy() def define_residuals(self): for name,flux in self.fluxes.items(): test=testfunction(name) self.add_residual(weak(flux,test))
# In the weak form, the equation reads: # .. math:: # (D (u - u_{\\infty}) \dfrac{(r-R) \cdot \mathbf{n}}{||(r-R)||^2}, v)
[docs] class AdvectionDiffusionInfinity(InterfaceEquations): """ Represents the condition at infinity for the advection-diffusion equation in the form: .. math:: u + R \\dfrac{\\partial u}{\\partial r} = u_{\\infty} where :math:`u` is the dependent variable, :math:`u_{\\infty}` is the value at infinity, and :math:`R` is the distance from the origin and :math:`r` is the radial coordinate. This class requires the parent equations to be of type :ref:`AdvectionDiffusionEquations <AdvectionDiffusionEquations>`, meaning that if :ref:`AdvectionDiffusionEquations <AdvectionDiffusionEquations>` (or subclasses) are not defined in the parent domain, an error will be raised. Args: **kwargs: name of the field and its value. """ required_parent_type = AdvectionDiffusionEquations def __init__(self,**kwargs:ExpressionOrNum): super(AdvectionDiffusionInfinity, self).__init__() self.inftyvals={**kwargs} self.origin=vector([0]) def define_residuals(self): n = self.get_normal() d = var("coordinate") - self.origin parents=self.get_parent_equations(AdvectionDiffusionEquations) assert parents is not None if not isinstance(parents,(list,tuple)): parents=[parents] for fn,val in self.inftyvals.items(): diffuD=None for p in parents: assert isinstance(p,AdvectionDiffusionEquations) if fn in p.fieldnames: diffuD = p.get_diffusion_coefficient(fn) break if diffuD is None: raise RuntimeError("Cannot find any diffusion coefficient for field "+fn) y, y_test = var_and_test(fn) self.add_residual(weak(diffuD * (y - val) * dot(n, d) / dot(d, d) , y_test) )