# @file
# @author Christian Diddens <c.diddens@utwente.nl>
# @author Duarte Rocha <d.rocha@utwente.nl>
#
# @section LICENSE
#
# pyoomph - a multi-physics finite element framework based on oomph-lib and GiNaC
# Copyright (C) 2021-2025 Christian Diddens & Duarte Rocha
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The authors may be contacted at c.diddens@utwente.nl and d.rocha@utwente.nl
#
# ========================================================================
import os
from pathlib import Path,PurePath
from ..expressions.generic import Expression, ExpressionOrNum, GlobalParameter
from ..expressions.units import unit_to_string
from ..meshes.mesh import ODEStorageMesh
from ..generic.codegen import BaseEquations
import inspect
import _pyoomph
from scipy.io import savemat,loadmat #type:ignore
from ..typings import *
import numpy
if TYPE_CHECKING:
from ..generic.codegen import EquationTree
from ..generic.problem import Problem
from ..meshes.mesh import AnySpatialMesh
from ..meshes.meshdatacache import MeshDataCacheEntry, MeshDataCacheOperatorBase, MeshDataEigenModes
class _BaseOutputter:
def __init__(self):
self._stages:Optional[Set[str]]=None
self._eqtree:"EquationTree"
self._mpi_rank:int
self.problem:"Problem"
pass
def after_remeshing(self,eqtree:"EquationTree"):
pass
def init(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]]=None,rank:int=0):
self._eqtree=eqtree
self._mpi_rank=rank
pass
def output(self,step:int)->None:
raise NotImplementedError("Not implemented")
def delete_files_from_previous_simulation(self)->None:
pass
def get_time(self,nondimensional:bool=False)->float:
return self.problem.get_current_time(dimensional=not nondimensional,as_float=True)
def clean_up(self)->None:
pass
def set_active_on_stages(self,stages:Optional[Union[str,Set[str]]]):
if stages is not None:
if isinstance(stages,str):
stages={stages}
elif isinstance(stages,set): #type:ignore
try:
stages=set(stages)
except:
stages=set({stages}) #type:ignore
self._stages=stages #type:ignore
def get_active_on_stages(self) -> Optional[Set[str]]:
return self._stages
class _BaseNumpyOutput(_BaseOutputter):
def __init__(self,mesh:"AnySpatialMesh"):
super().__init__()
self.mesh=mesh
def clean_up(self):
pass
def after_remeshing(self,eqtree:"EquationTree"):
#Refresh the mesh!
m=eqtree.get_mesh()
assert not isinstance(m,ODEStorageMesh)
self.mesh=m
def get_cached_mesh_data(self,mesh:"AnySpatialMesh",nondimensional:bool=False,tesselate_tri:bool=False,eigenvector:Optional[Union[int,Sequence[int]]]=None,eigenmode:"MeshDataEigenModes"="abs",history_index:int=0,with_halos:bool=False,operator:Optional["MeshDataCacheOperatorBase"]=None,discontinuous:bool=False,add_eigen_to_mesh_positions:bool=True)->"MeshDataCacheEntry":
pr = self.mesh.get_problem()
cache = pr.get_cached_mesh_data(mesh, tesselate_tri=tesselate_tri, nondimensional=nondimensional,eigenvector=eigenvector,eigenmode=eigenmode,history_index=history_index,with_halos=with_halos,operator=operator,discontinuous=discontinuous,add_eigen_to_mesh_positions=add_eigen_to_mesh_positions)
return cache
####################
class _TextOutput(_BaseNumpyOutput):
def __init__(self,mesh:"AnySpatialMesh",*fields:str,ftrunk:str="txtout",in_subdir:bool=True,file_ext:Optional[Union[str,List[str]]]=None,eigenvector:Optional[int]=None,eigenmode:"MeshDataEigenModes"="abs",nondimensional:bool=False,hide_lagrangian:bool=True,hide_underscore:bool=True,reverse_segment_if:Optional[Callable[[List[int],NPFloatArray],bool]]=None,sort_segments_by:Optional[Callable[[List[int],NPFloatArray],float]]=None,discontinuous:bool=False,add_eigen_to_mesh_positions:bool=True,operator:Optional["MeshDataCacheOperatorBase"]=None,tesselate_tri:bool=True):
super().__init__(mesh)
self.fname_trunk=ftrunk
self._orbit_subdir=None
self.in_subdir=in_subdir
self.file_ext=file_ext
self.fields:List[str]=[*fields]
#self._additional_outs=[]
self.eigenvector=eigenvector
self.eigenvector_mode:"MeshDataEigenModes"=eigenmode
self.nondimensional=nondimensional
self.hide_lagrangian=hide_lagrangian
self.hide_underscore = hide_underscore
self.reverse_segment_if=reverse_segment_if
self.sort_segments_by=sort_segments_by
self.discontinuous=discontinuous
self.add_eigen_to_mesh_positions=add_eigen_to_mesh_positions
self.operator=operator
self.tesselate_tri=tesselate_tri
def init(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]]=None,rank:int=0):
super().init(eqtree,continue_info,rank)
if isinstance(self.mesh,str):
self.mesh=self.problem.get_mesh(self.mesh)
if self.in_subdir and rank==0:
Path(os.path.join(self.problem.get_output_directory()),self.fname_trunk).mkdir(parents=True, exist_ok=True)
if self.file_ext is None:
self.file_ext=self.problem.default_1d_file_extension
def get_filename(self, step:int):
assert self.file_ext is not None
if isinstance(self.file_ext, (list,set)):
res:List[str] = []
for e in self.file_ext:
fname = self.fname_trunk + "_{:06d}".format(step) + "." + e
if self.in_subdir:
fname = os.path.join(self.problem.get_output_directory(self._orbit_subdir), self.fname_trunk, fname)
else:
fname = os.path.join(self.problem.get_output_directory(self._orbit_subdir), fname)
res.append(fname)
return res
else:
fname = self.fname_trunk + "_{:06d}".format(step) + "." + self.file_ext
if self.in_subdir:
fname = os.path.join(self.problem.get_output_directory(self._orbit_subdir), self.fname_trunk, fname)
else:
fname = os.path.join(self.problem.get_output_directory(self._orbit_subdir), fname)
return fname
def change_output_directory(self,newdir:str,eqtree:"EquationTree"):
basedir=self.problem.get_output_directory()
if Path(basedir).samefile(newdir):
self._orbit_subdir=None
else:
self._orbit_subdir=Path.relative_to(Path(newdir),Path(basedir))
Path(os.path.join(self.problem.get_output_directory(self._orbit_subdir)),self.fname_trunk).mkdir(parents=True, exist_ok=True)
def output(self,step:int):
mesh = self.mesh
if (not mesh.is_mesh_distributed()) and self._mpi_rank > 0:
return
if self.eigenvector is not None:
if self.eigenvector >= len(self.mesh.get_problem()._last_eigenvectors): #type:ignore
return # No output hrere
cache=self.get_cached_mesh_data(self.mesh,nondimensional=self.nondimensional,tesselate_tri=self.tesselate_tri,eigenvector=self.eigenvector,eigenmode=self.eigenvector_mode,discontinuous=self.discontinuous,add_eigen_to_mesh_positions=self.add_eigen_to_mesh_positions,operator=self.operator)
if len(self.fields)==0:
self.fields=cache.get_default_output_fields(rem_lagrangian=self.hide_lagrangian,rem_underscore=self.hide_underscore)
header:List[str] = []
timeinfo = self.get_time(nondimensional=self.nondimensional)
fname = self.get_filename(step)
datag:List[NPFloatArray]=[]
for i,f in enumerate(self.fields):
d=cache.get_data(f)
if d is not None:
datag.append(d)
header.append(f + cache.get_unit(f,as_string=True))
if self.discontinuous:
numDL=cache.DL_data.shape[1]
for k,v in cache.elemental_field_inds.items():
if k in self.fields:
header.append(k + cache.get_unit(k,as_string=True))
if v>=numDL:
datag.append(cache.D0_data[:,v-numDL])
else:
datag.append(cache.DL_data[:,v])
data:NPFloatArray=numpy.array(datag).transpose() #type:ignore
if mesh.get_element_dimension()==1 and not self.discontinuous:
lsegs_in,_=cache.get_interface_line_segments()
lsegs=lsegs_in.copy()
coords:List[NPFloatArray]=[]
for c in ["x","y","z"]:
if "coordinate_"+c in self.fields:
coords.append(data[:,self.fields.index("coordinate_"+c)])
coordsA:NPFloatArray=numpy.array(coords)
if self.reverse_segment_if is not None:
for i,l in enumerate(lsegs):
if self.reverse_segment_if(l,coordsA):
lsegs[i]=list(reversed(l))
if self.sort_segments_by is not None:
lsegs=list(sorted(lsegs,key=lambda k : self.sort_segments_by(k,coordsA)))
sortdata:List[NPFloatArray]=[]
for i,ls in enumerate(lsegs):
sortdata.append(data[ls])
if i+1<len(lsegs):
sortdata.append([[numpy.NAN]*len(data[0,:])]) #type:ignore
data:NPFloatArray=numpy.vstack(sortdata) #type:ignore
params:Dict[str,str] = {}
for n in self.mesh.get_problem().get_global_parameter_names():
params[n] =str(self.mesh.get_problem().get_global_parameter(n).value)
if self.eigenvector is not None:
eigeninfostr="OUTPUT_IS_"+str(self.eigenvector_mode)+"_OF_EIGENVALUE_"+str(self.eigenvector)
if self.mesh.get_problem()._last_eigenvalues_m is not None and self.eigenvector<len(self.mesh.get_problem()._last_eigenvalues_m): #type:ignore
eigeninfostr+="_AND_ANGULAR_MODE_"+str(self.mesh.get_problem()._last_eigenvalues_m[self.eigenvector]) #type:ignore
params[eigeninfostr]=str(self.mesh.get_problem()._last_eigenvalues[self.eigenvector]) #type:ignore
if isinstance(fname, list):
for f in fname:
save_by_extension(f, data, header, timeinfo,params,cache.elem_indices if cache.discontinuous else None)
else:
save_by_extension(fname, data, header, timeinfo,params,cache.elem_indices if cache.discontinuous else None)
self.clean_up()
def delete_files_from_previous_simulation(self):
try:
step=0
while True:
fn=self.get_filename(step)
if isinstance(fn,list):
for f in fn:
os.remove(f)
else:
os.remove(fn)
step+=1
except:
pass
####################
def save_by_extension(fname:str,data:NPFloatArray,header:List[str],timeinfo:float,params:Dict[str,str],discontinuous_elem_indices:Optional[NPInt32Array]=None):
_,ext=os.path.splitext(fname)
if ext in [".mat",".MAT"]:
mdict={}
for i,fn in enumerate(header):
if "[" in fn:
fn=fn[0:fn.find("[")]
mdict[fn]=data[:,i] #type:ignore
if timeinfo is not None:
mdict["current_time"]=timeinfo
for pn,v in params.items():
mdict[pn]=v
if discontinuous_elem_indices:
raise RuntimeError("Cannot output MATLAB in discontinuous mode yet")
savemat(fname,mdict,appendmat=False)
else:
headerr="\t".join(header)
if timeinfo is not None:
headerr+="\t@time="+str(timeinfo)
elif len(params)>0:
headerr += "\t@"
for pn,v in params.items():
headerr+="\t"+pn+"="+str(v)
if discontinuous_elem_indices is None:
numpy.savetxt(fname,data,header=headerr.lstrip(),delimiter="\t") #type:ignore
else:
with open(fname,"w") as f:
f.write("#"+headerr.lstrip()+"\n")
for e in discontinuous_elem_indices:
for j in e:
if j==-1:
break
f.write("\t".join(map(str,data[j,:]))+"\n")
f.write("\n")
class _OutputTxtAlongLine(_BaseOutputter):
def __init__(self,*fields:str,coords:Optional[Union[NPFloatArray,List[Sequence[ExpressionOrNum]]]]=None,start:Optional[List[ExpressionOrNum]]=None,end:Optional[List[ExpressionOrNum]]=None,N:Optional[int]=None,isovalue:Optional[Tuple[str,ExpressionOrNum]]=None,mesh:Optional["AnySpatialMesh"]=None,ftrunk:str="along_line",in_subdir:bool=True,file_ext:Optional[Union[str,List[str]]]=None,hide_lagrangian:bool=True,hide_underscore:bool=True,eigenvector:Optional[int]=None,eigenmode:"MeshDataEigenModes"="abs",NaN_outside:bool=False):
super().__init__()
if mesh is None:
raise ValueError("Need to supply at least a mesh")
self.fname_trunk=ftrunk
self.in_subdir=in_subdir
assert mesh is not None
self.mesh=mesh
self.problem=mesh.get_problem()
self.file_ext=file_ext
self.fields:List[str]=[*fields]
self.use_tri_interpolator=True
self.hide_lagrangian=hide_lagrangian
self.hide_underscore = hide_underscore
self.eigenvector=eigenvector
self.eigenmode:"MeshDataEigenModes"=eigenmode
self.NaN_outside=NaN_outside
if isovalue is not None:
if coords is not None or start is not None or end is not None:
raise RuntimeError("Cannot specify coords, start or end if isovalue is set")
self.isovalue=isovalue
elif coords is None:
if (start is None) or (end is None) or (N is None):
raise RuntimeError("Require to specify start, end and N if no coords are given")
#ss=p.get_scaling("spatial")
meshdata = self.mesh.get_problem().get_cached_mesh_data(self.mesh, tesselate_tri=True, nondimensional=False)
ss=meshdata.get_unit("spatial")
s:NPFloatArray=numpy.array([float(x/ss) for x in start]) #type:ignore
e:NPFloatArray = numpy.array([float(x/ss) for x in end]) #type:ignore
l:NPFloatArray=numpy.linspace(0,1,N,endpoint=True) #type:ignore
self.coords:NPFloatArray=numpy.tensordot(1-l,s,axes=0)+numpy.tensordot(l,e,axes=0)
self.isovalue=None
else:
if (start is not None) or (end is not None) or (N is not None):
raise RuntimeError("Cannot specify start, end or N if coords are given")
meshdata = self.mesh.get_problem().get_cached_mesh_data(self.mesh, tesselate_tri=True, nondimensional=False)
ss=meshdata.get_unit("spatial")
coords=numpy.array(coords/ss,dtype=numpy.float64)
self.coords:NPFloatArray=numpy.array(coords) #type:ignore
self.isovalue=None
def init(self, eqtree:"EquationTree", continue_info:Optional[Dict[str,Any]]=None, rank:int=0):
super().init(eqtree, continue_info, rank)
if isinstance(self.mesh, str):
self.mesh = self.problem.get_mesh(self.mesh)
if self.in_subdir and rank == 0:
Path(os.path.join(self.problem.get_output_directory()), self.fname_trunk).mkdir(parents=True, exist_ok=True)
if self.file_ext is None:
self.file_ext=self.problem.default_1d_file_extension
def get_filename(self,step:int) -> Union[List[str] ,str]:
assert self.file_ext is not None
if isinstance(self.file_ext,list):
res:List[str]=[]
for e in self.file_ext:
fname = self.fname_trunk + "_{:06d}".format(step) + "." + e
if self.in_subdir:
fname = os.path.join(self.problem.get_output_directory(), self.fname_trunk, fname)
else:
fname = os.path.join(self.problem.get_output_directory(), fname)
res.append(fname)
return res
else:
fname = self.fname_trunk + "_{:06d}".format(step) + "." + self.file_ext
if self.in_subdir:
fname = os.path.join(self.problem.get_output_directory(), self.fname_trunk, fname)
else:
fname = os.path.join(self.problem.get_output_directory(), fname)
return fname
def after_remeshing(self,eqtree:"EquationTree"):
m=eqtree.get_mesh()
assert not isinstance(m,ODEStorageMesh)
self.mesh=m
def get_data_and_descs(self)->Tuple[NPFloatArray,List[str]]:
if self.use_tri_interpolator:
meshdata = self.mesh.get_problem().get_cached_mesh_data(self.mesh, tesselate_tri=True, nondimensional=False,eigenmode=self.eigenmode,eigenvector=self.eigenvector)
coordinates:NPFloatArray=meshdata.get_coordinates()
import matplotlib.tri as tri
triang = tri.Triangulation(coordinates[0,:], coordinates[1,:], meshdata.elem_indices)
if self.isovalue is not None:
import matplotlib.pyplot as plt
isodata=meshdata.get_data(self.isovalue[0])-float(self.isovalue[1]) # TODO: Nondimensionalize cast to float
isol=plt.tricontour(triang,isodata,[0.0])
self.coords=[]
for path in isol.allsegs[0]:
for p in path:
self.coords.append(p)
self.coords=numpy.array(self.coords)
plt.close()
del isol
fields=meshdata.get_default_output_fields(rem_lagrangian=self.hide_lagrangian,rem_underscore=self.hide_underscore)
fields=[f for f in fields if f not in meshdata.elemental_field_inds.keys()] # Does not work for elemental fields
dataL:List[NPFloatArray]=[]
for f in fields:
inter=tri.LinearTriInterpolator(triang, meshdata.get_data(f))
inter=inter(self.coords[:,0],self.coords[:,1]) #type:ignore
if self.NaN_outside:
if f not in {"coordinate_x","coordinate_y","coordinate_z"}:
inter[inter.mask] = numpy.NaN
elif f=="coordinate_x":
inter=self.coords[:,0]
elif f=="coordinate_y":
inter=self.coords[:,1]
elif f=="coordinate_z":
raise RuntimeError("Z coord")
else:
inter = inter[~inter.mask] #type:ignore
dataL.append(numpy.array(inter,dtype=numpy.float64)) #type:ignore
data:NPFloatArray=numpy.array(dataL).transpose() #type:ignore
units=meshdata.get_unit(fields,with_brackets=True,as_string=True)
descs=[fields[i]+units[i] for i in range(len(fields))]
return cast(NPFloatArray,data),descs #type:ignore
else:
raise RuntimeError("Not implemented")
data, mask, descs = self.mesh.get_values_at_zetas(self.coords, True) #type:ignore
fullmask = numpy.transpose([mask] * len(data[0])) #type:ignore
data:NPFloatArray = numpy.ma.masked_array(data, fullmask).transpose() #type:ignore
return data,descs
def output(self,step:int):
mesh=self.mesh
if (not mesh.is_mesh_distributed()) and self._mpi_rank>0:
return
if self.eigenvector is not None:
if self.eigenvector >= len(self.mesh.get_problem()._last_eigenvectors): #type:ignore
return # No output hrere
data,header=self.get_data_and_descs()
fname=self.get_filename(step)
params = {}
for n in self.mesh.get_problem().get_global_parameter_names():
params[n] = self.mesh.get_problem().get_global_parameter(n).value
if isinstance(fname,list):
for fn in fname:
save_by_extension(fn, data, header=header,timeinfo=mesh.get_problem().get_current_time(as_float=True),params=params)
else:
save_by_extension(fname,data,header=header,timeinfo=mesh.get_problem().get_current_time(as_float=True),params=params)
self.clean_up()
def delete_files_from_previous_simulation(self):
try:
step = 0
while True:
fn = self.get_filename(step)
if isinstance(fn, list):
for f in fn:
os.remove(f)
else:
os.remove(fn)
step += 1
except:
pass
class _GridFileOutput(_BaseOutputter):
def __init__(self,*fields:str,lower:List[Sequence[ExpressionOrNum]],upper:List[ExpressionOrNum],N:Optional[List[int]]=None,dx:Optional[List[ExpressionOrNum]],mesh:Optional["AnySpatialMesh"]=None,ftrunk:str="grid_out",in_subdir:bool=True,file_ext:Optional[Union[str,List[str]]]=None,hide_lagrangian:bool=True,hide_underscore:bool=True,eigenvector:Optional[int]=None,eigenmode:"MeshDataEigenModes"="abs"):
super().__init__()
if mesh is None:
raise ValueError("Need to supply at least a mesh")
self.fname_trunk=ftrunk
self.in_subdir=in_subdir
assert mesh is not None
self.mesh=mesh
self.problem=mesh.get_problem()
self.file_ext=file_ext
self.fields:List[str]=[*fields]
self.use_tri_interpolator=True
self.hide_lagrangian=hide_lagrangian
self.hide_underscore = hide_underscore
self.eigenvector=eigenvector
self.eigenmode:"MeshDataEigenModes"=eigenmode
self.lower=lower
self.upper=upper
self.dx=dx
self.N=N
pr=self.mesh.get_problem()
meshdata = pr.get_cached_mesh_data(self.mesh, tesselate_tri=True, nondimensional=False)
ss=meshdata.get_unit("spatial")
self.coords_per_dir=[]
if self.dx is not None:
for ll,uu,step in zip(self.lower,self.upper,self.dx):
ll_nd=float(ll/ss)
uu_nd=float(uu/ss)
step_nd=float(dx/ss)
self.coords_per_dir.append(numpy.arange(ll_nd,uu_nd,step_nd))
else:
for ll,uu,N in zip(self.lower,self.upper,self.N):
ll_nd=float(ll/ss)
uu_nd=float(uu/ss)
self.coords_per_dir.append(numpy.linspace(ll_nd,uu_nd,num=N,endpoint=True))
if len(self.coords_per_dir)!=2:
raise RuntimeError("Only works for 2d")
self.coords_x,self.coords_y=numpy.meshgrid(numpy.array(self.coords_per_dir[0]),numpy.array(self.coords_per_dir[1]))
self.coords_x,self.coords_y=self.coords_x.flatten(),self.coords_y.flatten()
def init(self, eqtree:"EquationTree", continue_info:Optional[Dict[str,Any]]=None, rank:int=0):
super().init(eqtree, continue_info, rank)
if isinstance(self.mesh, str):
self.mesh = self.problem.get_mesh(self.mesh)
if self.in_subdir and rank == 0:
Path(os.path.join(self.problem.get_output_directory()), self.fname_trunk).mkdir(parents=True, exist_ok=True)
if self.file_ext is None:
self.file_ext=self.problem.default_1d_file_extension
def get_filename(self,step:int) -> Union[List[str] ,str]:
assert self.file_ext is not None
if isinstance(self.file_ext,list):
res:List[str]=[]
for e in self.file_ext:
fname = self.fname_trunk + "_{:06d}".format(step) + "." + e
if self.in_subdir:
fname = os.path.join(self.problem.get_output_directory(), self.fname_trunk, fname)
else:
fname = os.path.join(self.problem.get_output_directory(), fname)
res.append(fname)
return res
else:
fname = self.fname_trunk + "_{:06d}".format(step) + "." + self.file_ext
if self.in_subdir:
fname = os.path.join(self.problem.get_output_directory(), self.fname_trunk, fname)
else:
fname = os.path.join(self.problem.get_output_directory(), fname)
return fname
def after_remeshing(self,eqtree:"EquationTree"):
m=eqtree.get_mesh()
assert not isinstance(m,ODEStorageMesh)
self.mesh=m
def get_data_and_descs(self)->Tuple[NPFloatArray,List[str]]:
if self.use_tri_interpolator:
meshdata = self.mesh.get_problem().get_cached_mesh_data(self.mesh, tesselate_tri=True, nondimensional=False,eigenmode=self.eigenmode,eigenvector=self.eigenvector)
coordinates:NPFloatArray=meshdata.get_coordinates()
import matplotlib.tri as tri
triang = tri.Triangulation(coordinates[0,:], coordinates[1,:], meshdata.elem_indices)
fields=meshdata.get_default_output_fields(rem_lagrangian=self.hide_lagrangian,rem_underscore=self.hide_underscore)
dataL:List[NPFloatArray]=[]
for f in fields:
inter=tri.LinearTriInterpolator(triang, meshdata.get_data(f))
inter=inter(self.coords_x,self.coords_y) #type:ignore
if True:
if f not in {"coordinate_x","coordinate_y","coordinate_z"}:
inter[inter.mask] = numpy.NaN
elif f=="coordinate_x":
inter=self.coords_x
elif f=="coordinate_y":
inter=self.coords_y
elif f=="coordinate_z":
raise RuntimeError("Z coord")
else:
inter = inter[~inter.mask] #type:ignore
dataL.append(numpy.array(inter,dtype=numpy.float64)) #type:ignore
data:NPFloatArray=numpy.array(dataL).transpose() #type:ignore
units=meshdata.get_unit(fields,with_brackets=True,as_string=True)
descs=[fields[i]+units[i] for i in range(len(fields))]
return cast(NPFloatArray,data),descs #type:ignore
else:
raise RuntimeError("Not implemented")
data, mask, descs = self.mesh.get_values_at_zetas(self.coords, True) #type:ignore
fullmask = numpy.transpose([mask] * len(data[0])) #type:ignore
data:NPFloatArray = numpy.ma.masked_array(data, fullmask).transpose() #type:ignore
return data,descs
def output(self,step:int):
mesh=self.mesh
if (not mesh.is_mesh_distributed()) and self._mpi_rank>0:
return
if self.eigenvector is not None:
if self.eigenvector >= len(self.mesh.get_problem()._last_eigenvectors): #type:ignore
return # No output hrere
data,header=self.get_data_and_descs()
fname=self.get_filename(step)
params = {}
for n in self.mesh.get_problem().get_global_parameter_names():
params[n] = self.mesh.get_problem().get_global_parameter(n).value
if isinstance(fname,list):
for fn in fname:
save_by_extension(fn, data, header=header,timeinfo=mesh.get_problem().get_current_time(as_float=True),params=params)
else:
save_by_extension(fname,data,header=header,timeinfo=mesh.get_problem().get_current_time(as_float=True),params=params)
self.clean_up()
def delete_files_from_previous_simulation(self):
try:
step = 0
while True:
fn = self.get_filename(step)
if isinstance(fn, list):
for f in fn:
os.remove(f)
else:
os.remove(fn)
step += 1
except:
pass
#############################
class _BaseODEOutput(_BaseOutputter):
def __init__(self):
super().__init__()
self._element:_pyoomph.BulkElementODE0d
self._odemesh:ODEStorageMesh
def init(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]]=None,rank:int=0):
self._eqtree=eqtree
self._mpi_rank=rank
self._element=self._odemesh._get_ODE("ODE")
pass
def get_ODE_values(self)->Tuple[NPFloatArray,Dict[str,int]]:
values,fieldinds=self._element.to_numpy()
return values,fieldinds
def output(self,step:int)->None:
values,_=self.get_ODE_values()
print("ODE:",values)
def get_additional_values(self):
return None,None
#######################
class _ODEFileOutput(_BaseODEOutput):
def __init__(self,odemesh:ODEStorageMesh,eqtree:"EquationTree",fname:Optional[str]=None,first_column:List[Union[str,GlobalParameter]]=["time"],continue_info:Optional[Dict[str,Any]]=None,in_units:Dict[str,ExpressionOrNum]={},hide_underscore:bool=False):
super().__init__()
self.fname=fname
self._odemesh=odemesh
self.in_units=in_units
self.hide_underscore=hide_underscore
if self.hide_underscore:
raise RecursionError("TODO: Hiding underscore")
self.file:Any=None
self._element=self._odemesh._get_ODE("ODE")
self._eqtree=eqtree
self.first_column=first_column
self.continue_info=continue_info
def change_output_directory(self,newdir:str,eqtree:"EquationTree"):
oldname=self.fname
self.fname = os.path.join(newdir, os.path.basename(self.fname))
if self.fname!=oldname:
self.file.close()
if os.path.exists(self.fname):
self.init(eqtree,{"TODO":"Fill further information here"},self._mpi_rank)
else:
self.init(eqtree,None,self._mpi_rank)
def init(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]]=None,rank:int=0):
super().init(eqtree,continue_info,rank)
assert self.fname is not None
if continue_info is None:
self.file = open(self.fname, "w")
else:
self.file=open(self.fname,"a")
values, fieldinds = self.get_ODE_values()
obs=self._eqtree.get_mesh().evaluate_all_observables()
compiled_ifuncs = self._eqtree.get_mesh().list_integral_functions()
descs=[""]*(len(values)+len(obs))
for d,ind in fieldinds.items():
descs[ind]=d
for i,n in enumerate(obs.keys(),start=len(values)):
descs[i]=n
#TODO Scales
_, indices = self._element.to_numpy()
scales:List[ExpressionOrNum] = [1.0] * (len(indices)+len(obs))
for k, i in indices.items():
s = self._eqtree.get_equations().get_scaling(k)
if not isinstance(s,Expression):
s=Expression(s)
s = self._eqtree.get_equations().get_current_code_generator().expand_placeholders(s,True)
factor, unit, rest, success = _pyoomph.GiNaC_collect_units(s)
scales[i] = float(factor)
if k in self.in_units.keys():
scales[i]*=float(unit/self.in_units[k])
descs[i] = descs[i] + "[" + str(self.in_units[k]) + "]"
else:
try:
float(unit)
except:
descs[i]=descs[i]+"["+unit_to_string(unit,estimate_prefix=False)+"]"
for (i, n),v in zip(enumerate(obs.keys(), start=len(values)),obs.values()):
if n in compiled_ifuncs:
ieunit = self._eqtree.get_mesh().get_code_gen()._get_integral_function_unit_factor(n)
_, unit, rest, _ = _pyoomph.GiNaC_collect_units(ieunit)
try:
float(unit)
except:
descs[i] = descs[i] + "[" + unit_to_string(unit,estimate_prefix=False) + "]"
scales[i]=1/unit
else:
if isinstance(v,_pyoomph.Expression):
factor, unit, rest, success = _pyoomph.GiNaC_collect_units(v)
scales[i] = unit
if factor.is_zero():
raise RuntimeError("TODO: Find a good way to detemine a unit here...")
try:
float(unit)
except:
descs[i] = descs[i] + "[" + unit_to_string(unit,estimate_prefix=False) + "]"
else:
scales[i]=1.0
self._scales = scales
firstcols:List[str]=[]
for fc in self.first_column:
if fc=="time":
tscale=self._eqtree.get_equations().get_scaling("temporal")
if not isinstance(tscale,Expression):
tscale=Expression(tscale)
factor, unit, rest, success = _pyoomph.GiNaC_collect_units(tscale)
try:
float(unit)
tscale=""
except:
tscale= "[" + unit_to_string(unit,estimate_prefix=False) + "]"
firstcols.append("time"+tscale)
elif isinstance(fc,GlobalParameter):
firstcols.append(fc.get_name())
else:
raise RuntimeError(repr(fc))
if continue_info is None:
self.file.write("#"+"\t".join(firstcols+descs)+"\n")
self.firsttime=True
def output(self,step:int):
values,_=self.get_ODE_values()
obs=self._eqtree.get_mesh().evaluate_all_observables()
_, indices = self._element.to_numpy()
self._scales:List[ExpressionOrNum] = [1.0] * (len(indices)+len(obs))
for k, i in indices.items():
s = self._eqtree.get_equations().get_scaling(k)
if not isinstance(s,Expression):
s=Expression(s)
s = self._eqtree.get_equations().get_current_code_generator().expand_placeholders(s,True)
factor, unit, rest, success = _pyoomph.GiNaC_collect_units(s)
self._scales[i] = float(factor)
values[:]=values[:]*self._scales[:len(values)] #type:ignore
obsv=numpy.array([v for v in obs.values()]) #type:ignore
obsv[:]=obsv[:]*self._scales[len(values):len(values)+len(obsv)]
obsv=numpy.array(list(map(float,obsv))) #type:ignore
#try:
# obsv=obsv.astype(numpy.float)
#except:
# pass
values=numpy.concatenate([values,obsv]) #type:ignore
addv,_=self.get_additional_values() #type:ignore
if (addv is not None) and len(addv)>0:
addstr="\t"+"\t".join(map(str,addv))
else:
addstr=""
firstcols:List[str]=[]
for fc in self.first_column:
if fc=="time":
firstcols.append(str(self.get_time()))
elif isinstance(fc,GlobalParameter):
firstcols.append(str(fc.value))
else:
raise RuntimeError(repr(fc))
self.file.write("\t".join(firstcols+list(map(str,values)))+addstr+"\n")
self.file.flush()
######################
class GenericOutput(BaseEquations):
def __init__(self):
super(GenericOutput, self).__init__()
self._outputter:Dict["EquationTree",_BaseOutputter]={} #Map from eqtree node to an outputter object
def after_remeshing(self,eqtree:"EquationTree"):
for _,out in self._outputter.items():
out.after_remeshing(eqtree)
def _construct_outputter_for_eq_tree(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]],mpirank:int)->_BaseOutputter:
raise NotImplementedError("Implement this")
def _expand_filename(self,eqtree:"EquationTree",filename:Optional[str]=None,extension:str="",add_problem_outdir:bool=True):
outdir = eqtree.get_mesh().get_problem().get_output_directory()
if filename is None:
if add_problem_outdir:
fname = os.path.join(outdir, eqtree.get_full_path(eqtree, sep="__") + extension)
else:
fname=eqtree.get_full_path(eqtree, sep="__") + extension
return fname
else:
if len(self._outputter) > 1:# or (not eqtree in self._outputter.keys()):
raise RuntimeError("There are multiple outputs written to the same file "+filename)
if add_problem_outdir:
return os.path.join(outdir, filename)
else:
return filename
def _init_output(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]]=None,rank:int=0):
super()._init_output(eqtree,continue_info,rank)
self._outputter[eqtree]=self._construct_outputter_for_eq_tree(eqtree,continue_info,rank)
self._outputter[eqtree].problem = eqtree.get_mesh().get_problem()
self._outputter[eqtree].init(eqtree,continue_info,rank)
def _do_output(self, eqtree:"EquationTree", step:int,stage:str):
self._outputter[eqtree].output(step)
def change_output_directory(self, dir:str,eqtree:"EquationTree"):
self._outputter[eqtree].change_output_directory(dir,eqtree)
[docs]
class ODEFileOutput(GenericOutput):
"""
ODEFileOutput writes the variables of all ODE unknowns at the current time to a text file.
Args:
filename: The name of the output file. Default is None, meaning that the output file will be named after the equation tree node.
first_column: The value(s) to be written in the first column of the output file. Default is "time".
in_units: A dictionary specifying the units of the variables to be written in the output file. Default is an empty dictionary, i.e. base SI units.
hide_underscore: A flag indicating whether to hide variable names starting with an underscore in the output file. Default is False.
"""
def __init__(self,filename:Optional[str]=None,first_column:Optional[Union[str,GlobalParameter,List[Union[str,GlobalParameter]]]]="time",in_units:Dict[str,ExpressionOrNum]={},hide_underscore:bool=False):
super(ODEFileOutput, self).__init__()
self.filename=filename
self.in_units=in_units
self.hide_underscore=hide_underscore
if not isinstance(first_column,list):
if first_column is None:
self.first_column=[]
else:
self.first_column=[first_column]
else:
self.first_column=first_column
def _construct_outputter_for_eq_tree(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]],mpirank:int) -> _ODEFileOutput:
fn=self._expand_filename(eqtree,self.filename,".txt")
mesh=eqtree.get_mesh()
assert isinstance(mesh,ODEStorageMesh)
return _ODEFileOutput(mesh,eqtree,fname=fn,first_column=self.first_column,continue_info=continue_info,in_units=self.in_units,hide_underscore=self.hide_underscore)
def _is_ode(self):
return True
[docs]
class TextFileOutput(GenericOutput):
"""
A class for writing the degrees of freedom at the current time step to a text file. Will be invoked whenever Problem.output is called.
Args:
filetrunk (Optional[str]): The file trunk name. If not set, it will take the filename from the domain we added this equation to.
filename (Optional[str]): Same as filetrunk, but for backwards compatibility.
nondimensional (bool): Flag indicating whether the output should be nondimensional. Default is False.
hide_underscore (bool): Flag indicating whether to hide variables starting with an underscore. Default is True.
hide_lagrangian (bool): Flag indicating whether to hide Lagrangian coordinates. Default is True.
eigenvector (Optional[int]): The eigenvector index. If set, we write the eigenvector at this index instead of the solution. Only writing output when the eigenvector at this index is calculated. Default is None.
eigenmode (MeshDataEigenModes): The eigenmode type ("abs","real","imag"). Default is "abs".
reverse_segment_if (Optional[Callable[[List[int], NPFloatArray], bool]]): A function to reverse individual segments of a segregated 1d line embedded in higher spaces. Default is None.
sort_segments_by (Optional[Callable[[List[int], NPFloatArray], float]]): A function to sort such segments based on a condition. Otherwise, the ordering is more or less random. Default is None.
discontinuous (bool): Flag indicating whether discontinuous output should be written. In that case, each node can be written multiple times, potential with different values. Default is False.
add_eigen_to_mesh_positions (bool): When outputting an eigenvector on a moving mesh, do we want to add the original mesh coordinates to the eigensolution or not. Default is True.
"""
def __init__(self,filetrunk:Optional[str]=None,filename:Optional[str]=None, nondimensional:bool=False,hide_underscore:bool=True,hide_lagrangian:bool=True,eigenvector:Optional[int]=None,eigenmode:"MeshDataEigenModes"="abs",reverse_segment_if:Optional[Callable[[List[int],NPFloatArray],bool]]=None,sort_segments_by:Optional[Callable[[List[int],NPFloatArray],float]]=None,discontinuous:bool=False,add_eigen_to_mesh_positions:bool=True,operator:Optional["MeshDataCacheOperatorBase"]=None,tesselate_tri:bool=True):
super(TextFileOutput, self).__init__()
if filetrunk is not None and filename is not None:
raise RuntimeError("Please set either filename or filetrunk - both are the same, just for backwards compatibility")
elif filetrunk is not None:
self.filename=filetrunk
else:
self.filename=filename
self.nondimensional=nondimensional
self.hide_underscore=hide_underscore
self.hide_lagrangian = hide_lagrangian
self.eigenvector=eigenvector
self.eigenmode:"MeshDataEigenModes"=eigenmode
self.sort_segments_by=sort_segments_by
self.reverse_segment_if=reverse_segment_if
self.discontinuous=discontinuous
self.add_eigen_to_mesh_positions=add_eigen_to_mesh_positions
self.operator=operator
self.tesselate_tri=tesselate_tri
def _construct_outputter_for_eq_tree(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]],mpirank:int) -> _TextOutput:
fn=self._expand_filename(eqtree,self.filename,"",add_problem_outdir=False)
mesh=eqtree.get_mesh()
assert not isinstance(mesh,ODEStorageMesh)
return _TextOutput(mesh,ftrunk=fn,nondimensional=self.nondimensional,hide_underscore=self.hide_underscore,hide_lagrangian=self.hide_lagrangian,eigenvector=self.eigenvector,eigenmode=self.eigenmode,sort_segments_by=self.sort_segments_by,reverse_segment_if=self.reverse_segment_if,discontinuous=self.discontinuous,add_eigen_to_mesh_positions=self.add_eigen_to_mesh_positions,operator=self.operator,tesselate_tri=self.tesselate_tri)
def _is_ode(self):
return False
class TextFileOutputAlongLine(GenericOutput):
def __init__(self,filename:Optional[str]=None,coords:Optional[Union[NPFloatArray,List[Sequence[ExpressionOrNum]]]]=None,start:Optional[List[ExpressionOrNum]]=None,end:Optional[List[ExpressionOrNum]]=None,N:Optional[int]=None,isovalue:Optional[Tuple[str,ExpressionOrNum]]=None,file_ext:Optional[Union[str,List[str]]]=None,eigenvector:Optional[int]=None,eigenmode:"MeshDataEigenModes"="abs",NaN_outside:bool=False):
super(TextFileOutputAlongLine, self).__init__()
self.filename=filename
self.file_ext=file_ext
self.start=start
self.end=end
self.N=N
self.coords=coords
self.eigenvector=eigenvector
self.eigenmode:"MeshDataEigenModes"=eigenmode
self.isovalue=isovalue
self.NaN_outside=NaN_outside
def _construct_outputter_for_eq_tree(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]],mpirank:int) -> _OutputTxtAlongLine:
fn=self._expand_filename(eqtree,self.filename,"",add_problem_outdir=False)
mesh=eqtree.get_mesh()
assert not isinstance(mesh,ODEStorageMesh)
return _OutputTxtAlongLine(mesh=mesh,ftrunk=fn,start=self.start,end=self.end,isovalue=self.isovalue,coords=self.coords,N=self.N,file_ext=self.file_ext,eigenvector=self.eigenvector,eigenmode=self.eigenmode,NaN_outside=self.NaN_outside)
def _is_ode(self):
return False
class GridFileOutput(GenericOutput):
def __init__(self,lower:Union[NPFloatArray,List[Sequence[ExpressionOrNum]]],upper:Optional[List[ExpressionOrNum]],N:Union[int,List[int]]=None,dx:Union[ExpressionOrNum,List[ExpressionOrNum]]=None,filename:Optional[str]=None,file_ext:Optional[Union[str,List[str]]]=None,eigenvector:Optional[int]=None,eigenmode:"MeshDataEigenModes"="abs"):
super(GridFileOutput, self).__init__()
self.lower=lower
self.upper=upper
if len(self.lower)!=len(self.upper):
raise RuntimeError("Start coordinate vector 'lower' must have the same length as the end coordinate vector 'upper'")
if N is None and dx is None:
raise RuntimeError("Must either set dx or N")
elif N is not None and dx is not None:
raise RuntimeError("Cannot set N and dx simultaneously, just set one")
elif N is not None:
if not isinstance(N,(list,tuple)):
N=[N]*len(self.lower)
self.N=N
self.dx=None
else:
if not isinstance(dx,(list,tuple)):
dx=[dx]*len(self.lower)
self.dx=dx
self.N=None
self.filename=filename
self.file_ext=file_ext
self.N=N
self.eigenvector=eigenvector
self.eigenmode:"MeshDataEigenModes"=eigenmode
def _construct_outputter_for_eq_tree(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]],mpirank:int) -> _OutputTxtAlongLine:
fn=self._expand_filename(eqtree,self.filename,"",add_problem_outdir=False)
mesh=eqtree.get_mesh()
assert not isinstance(mesh,ODEStorageMesh)
return _GridFileOutput(mesh=mesh,ftrunk=fn,lower=self.lower,upper=self.upper,N=self.N,dx=self.dx,file_ext=self.file_ext,eigenvector=self.eigenvector,eigenmode=self.eigenmode)
def _is_ode(self):
return False
class _IntegralObservableOutput(_BaseOutputter):
def __init__(self, mesh:"AnySpatialMesh", ftrunk:str,continue_info:Optional[Dict[str,Any]],file_ext:Optional[Union[str,List[str]]]=None,first_column:List[str]=["time"]):
super(_IntegralObservableOutput, self).__init__()
self._mesh=mesh
self._filetrunk=ftrunk
self._file_ext=file_ext
self._units:Dict[str,Expression]={}
self._iexprs:Optional[List[str]]=None
self._files:Dict[str,Any]={}
self._continue_info=continue_info
self.first_column=first_column
def after_remeshing(self,eqtree:"EquationTree"):
self._mesh=eqtree.get_mesh()
def _eval_all_integral_funcs(self)->Dict[str,Expression]:
res:Dict[str,Expression]={}
if self._iexprs is None:
return res
for n in self._iexprs:
try:
rs=self._mesh._evaluate_integral_function(n)
except:
print("IN INTEGRAL FUNCTION "+n)
raise
res[n]=rs
return res
def change_output_directory(self,newdir:str,eqtree:"EquationTree"):
print("TODO: Change output path in IntegralObservables")
def _eval_dependent_funcs(self,intres:Dict[str,Expression]) -> Dict[str, float]:
import pyoomph.equations.generic
deps=self._mesh.get_code_gen()._dependent_integral_funcs #type:ignore
args:Dict[str,Expression]={k:v for k,v in intres.items()}
res:Dict[str,Expression] = {k: v for k, v in intres.items()}
args["time"]=self._mesh.get_problem().get_current_time(dimensional=True,as_float=False)
remaining=set(deps.keys())
while len(remaining)>0:
torem:Set[str]=set()
for r in remaining:
#Check if we can evaluate
l=deps[r]
all_present=True
if isinstance(l,pyoomph.equations.generic.DependentIntegralObservable):
reqargs=l.argnames
func_to_call=l.func
else:
reqargs=inspect.signature(l).parameters
func_to_call=l
arglist:List[ExpressionOrNum]=[]
for a in reqargs:
if not a in args.keys():
all_present=False
else:
arglist.append(args[a])
if all_present:
torem.add(r)
depres=func_to_call(*arglist)
if not isinstance(depres,Expression):
depres=Expression(depres)
args[r]=depres
res[r]=depres
if len(torem)==0:
raise RuntimeError("Cannot evaluate the dependent integral functions, probably due to unknown or circular arguments : "+str(remaining))
remaining = remaining-torem
for n,v in res.items():
if not n in self._units.keys():
if not isinstance(v,numpy.ndarray):
if v == 0 or (isinstance(v, _pyoomph.Expression) and v.is_zero()): #type:ignore # TODO: Unit cannot be detemined that way!
if n in intres.keys():
ieunit=self._mesh.get_code_gen()._get_integral_function_unit_factor(n)
fact, unit, rest, reslt = _pyoomph.GiNaC_collect_units(ieunit)
self._units[n] = unit
else:
pass #Can't do anything right now
else:
if not isinstance(v,_pyoomph.Expression): #type:ignore
v=_pyoomph.Expression(v)
fact,unit,rest,reslt=_pyoomph.GiNaC_collect_units(v)
self._units[n]=unit
else:
for i,direct,cmp in zip([0,1,2],["x","y","z"],v):
if cmp == 0 or (isinstance(cmp,_pyoomph.Expression) and cmp.is_zero()): # TODO: Unit cannot be detemined that way!
if n in intres.keys():
ieunit = self._mesh.get_code_gen()._get_integral_function_unit_factor(n)
fact, unit, rest, reslt = _pyoomph.GiNaC_collect_units(ieunit)
self._units[n]=unit
self._units[n+"_"+direct] = unit
else:
pass # Can't do anything right now
else:
fact, unit, rest, reslt = _pyoomph.GiNaC_collect_units(cmp)
self._units[n] = unit
self._units[n+"_"+direct] = unit
#Second loop, try fo set the remainin units by calling things with the units
for n,v in res.items():
if not n in self._units.keys():
arglist=[]
if v == 0 or (isinstance(v, _pyoomph.Expression) and v.is_zero()): #type:ignore
l = deps[n]
problem=False
for a in inspect.signature(l).parameters:
if not a in self._units.keys():
#Cannot do anything here
problem=True
break
else:
arglist.append(self._units[a])
if not problem:
depres = l(*arglist)
if depres == 0 or (isinstance(depres, _pyoomph.Expression) and depres.is_zero()):
pass
else:
if not isinstance(depres,Expression):
depres=Expression(depres)
fact, unit, rest, reslt = _pyoomph.GiNaC_collect_units(depres)
self._units[n]=unit
else:
pass #TODO: Further ways to get the unit??
nondim_res:Dict[str,float]={}
for n,v in res.items():
if n in self._mesh.get_code_gen()._dependent_integral_funcs_is_vector_helper.keys():
continue
if isinstance(v,numpy.ndarray):
for i,direct,cmp in zip([0,1,2],["x","y","z"],v):
if i>=self._mesh.get_code_gen().get_nodal_dimension():
break
if not n in self._units.keys():
nondim_res[n+"_"+direct] = cmp
else:
vef = (cmp / self._units.get(n, 1)).evalf()
try:
nondim_res[n+"_"+direct] = float(vef)
except:
fact, unit, rest, reslt = _pyoomph.GiNaC_collect_units(vef)
vef = fact * unit * rest
nondim_res[n+"_"+direct] = float(vef)
else:
if not n in self._units.keys():
nondim_res[n] = float(v)
else:
vef=(v / self._units.get(n, 1)).evalf()
try:
nondim_res[n] = float(vef)
except:
fact, unit, rest, reslt = _pyoomph.GiNaC_collect_units(vef)
vef=fact*unit*rest
nondim_res[n] = float(vef)
return nondim_res
def _eval_all(self) -> Dict[str, float]:
intres=self._eval_all_integral_funcs()
all=self._eval_dependent_funcs(intres)
#self._eqtree.get_mesh().evaluate_all_observables()
return all
def output(self,step:int):
fexts=self._file_ext
assert fexts is not None
if not isinstance(fexts,list):
fexts=[fexts]
for ext in fexts:
if ext in ["mat","MAT"]:
continue
if ext not in self._files.keys():
_filename=self._filetrunk+"."+ext
if self._continue_info is None:
self._files[ext]=open(_filename,"wt")
else:
self._files[ext] = open(_filename, "at")
#TODO: Trim here the output
#print(self._continue_info)
#raise RuntimeError("TODO")
firsttime=False
if self._iexprs is None:
firsttime=True
self._iexprs=self._mesh.list_integral_functions()
all=self._eval_all()
firstcols:List[str]=[]
for fc in self.first_column:
if fc == "time":
tscale = self._eqtree.get_equations().get_scaling("temporal")
if not isinstance(tscale,Expression):
tscale=Expression(tscale)
_, unit, _, _ = _pyoomph.GiNaC_collect_units(tscale)
try:
float(unit)
tscale = ""
except:
tscale = "[" + unit_to_string(unit, estimate_prefix=False) + "]"
firstcols.append("time" + tscale)
elif isinstance(fc, _pyoomph.GiNaC_GlobalParam):
firstcols.append(fc.get_name())
elif isinstance(fc,str) and (fc in self._mesh.get_problem().get_global_parameter_names()): #type:ignore
firstcols.append(fc)
else:
raise RuntimeError(repr(fc))
desc=firstcols
for n,v in sorted(all.items()):
if n[0]!="_":
entry=n
if self._units.get(n,1)!=1:
entry+="["+unit_to_string(self._units.get(n,1),estimate_prefix=False)+"]"
desc.append(entry)
if self._continue_info is None:
for f in self._files.values():
f.write("#"+"\t".join(desc)+"\n")
self._descs=desc
else:
all = self._eval_all()
line:List[str]=[]
for fc in self.first_column:
if fc=="time":
line.append(str(self.get_time()))
elif isinstance(fc,_pyoomph.GiNaC_GlobalParam):
line.append(str(fc.value))
elif isinstance(fc, str) and (fc in self._mesh.get_problem().get_global_parameter_names()): #type:ignore
line.append(str(self._mesh.get_problem().get_global_parameter(fc).value))
else:
raise RuntimeError(repr(fc))
#line=[self.get_time()]
for n, v in sorted(all.items()):
if n[0] != "_":
line.append(str(v))
for f in self._files.values():
f.write("\t".join(map(str,line))+"\n")
f.flush()
for ext in fexts:
if ext in ["mat","MAT"]:
_filename=self._filetrunk+"."+ext
mdict={}
if os.path.exists(_filename) and not firsttime:
mdict=loadmat(_filename) #type:ignore
for i,d in enumerate(self._descs):
if "[" in d:
d = d[0:d.find("[")]
if d in mdict:
#print(d)
olddata=mdict[d] #type:ignore
if len(olddata.shape)>1: #type:ignore
olddata=olddata[0,:] #type:ignore
else:
olddata=numpy.array(olddata[:]) #type:ignore
newdata=numpy.array(line[i]) #type:ignore
if len(newdata.shape)<len(olddata.shape):
newdata=numpy.array([newdata],dtype="float64") #type:ignore
#print(olddata,newdata)
mdict[d]=numpy.concatenate([olddata,newdata]) #type:ignore
else:
mdict[d]=numpy.array([line[i]],dtype="float64") #type:ignore
savemat(_filename,mdict,appendmat=True)
def init(self,eqtree:"EquationTree",continue_info:Optional[Dict[str,Any]]=None,rank:int=0):
super().init(eqtree,continue_info,rank)
if self._file_ext is None:
self._file_ext=self.problem.default_1d_file_extension
[docs]
class IntegralObservableOutput(GenericOutput):
"""
Outputs all integral observables on this domain to a text file.
Args:
filename: The name of the output file (without extension). Default is None, meaning that the output file will be named after the domain.
file_ext: The file extension. Default is None, meaning that the default file extension from the problem will be used.
first_column: The value(s) to be written in the first column of the output file. Default is ``"time"``.
"""
def __init__(self, filename:Optional[str]=None, file_ext:Optional[Union[str,List[str]]]=None,first_column:List[str]=["time"]):
super(IntegralObservableOutput, self).__init__()
self.filename = filename
self.file_ext = file_ext
self.first_column=first_column
def _construct_outputter_for_eq_tree(self, eqtree:"EquationTree", continue_info:Optional[Dict[str,Any]], mpirank:int) -> _IntegralObservableOutput:
fn = self._expand_filename(eqtree, self.filename, "_IntObsv")
mesh=eqtree.get_mesh()
assert not isinstance(mesh,ODEStorageMesh)
return _IntegralObservableOutput(mesh=mesh, ftrunk=fn , file_ext=self.file_ext,continue_info=continue_info,first_column=self.first_column)
def _is_ode(self):
return None
#ODEObservableOutput=IntegralObservableOutput