Source code for pyoomph.utils.paramscan

#  @file
#  @author Christian Diddens <c.diddens@utwente.nl>
#  @author Duarte Rocha <d.rocha@utwente.nl>
#  
#  @section LICENSE
# 
#  pyoomph - a multi-physics finite element framework based on oomph-lib and GiNaC 
#  Copyright (C) 2021-2025  Christian Diddens & Duarte Rocha
# 
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
# 
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
# 
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see <http://www.gnu.org/licenses/>. 
#
#  The authors may be contacted at c.diddens@utwente.nl and d.rocha@utwente.nl
#
# ========================================================================
 
import functools
import io
#from concurrent.futures import *
import concurrent.futures 
import subprocess
import os
from pathlib import Path
import sys

import __main__

import operator

from ..typings import *


#Sorts an array [0,1,2,3,4] like this [0,4,2,1,3]
#Useful to for parameter scans: First do the extremes and the center, then gradually refine the spaces in between
def alternate_sorting(inp:List[float])->List[float]:
    if len(inp) < 3:
        return inp
    centrali = len(inp) // 2
    res = [inp[0], inp[-1], inp[centrali]]
    sublist:List[List[float]] = []
    if centrali > 1:
        sublist.append(inp[1:centrali])
    if centrali + 1 < len(inp):
        sublist.append(inp[centrali + 1:-1])
    while len(sublist) > 0:
        old = sublist
        sublist = []
        for s in old:
            if len(s) > 0:
                centrali = len(s) // 2
                res.append(s[centrali])
                if centrali > 0:
                    sublist.append(s[0:centrali])
                if centrali < len(s):
                    sublist.append(s[centrali + 1:])
    return res


[docs] class SimulationNamespace: """ A class representing a namespace for simulation parameters for the parallel parameter scan. If your problem has e.g. an nested property like problem.droplet.contact_angle, you must create a SimulationNamespace object for the SingleParallelParameterSimulation object, i.e. sim.droplet=SimulationNamespace() sim.droplet.contact_angle=... Otherwise, nested properties cannot be set. """ def _INTERNAL_add_to_arglist(self,argnames:List[str],trunk:str)->None: for x in dir(self): if x.startswith("_INTERNAL_") or x.startswith("__"): continue if isinstance(getattr(self,x),SimulationNamespace): getattr(self, x)._INTERNAL_add_to_arglist(argnames,trunk+"."+x) else: argnames.append(trunk+"."+x)
[docs] class SingleParallelParameterSimulation: """ Storage class for the used parameters of a single simulation in a parallel parameter scan. Instances are generated by .. code-block:: python sim=ParallelParameterScan.new_sim(...) Once generated, the parameters can be set as attributes of the instance sim, e.g. .. code-block:: python sim.paramA=... For nested parameters, use the SimulationNamespace class, e.g. .. code-block:: python sim.droplet=SimulationNamespace() sim.droplet.contact_angle=... """ def __init__(self,subdir:Optional[str],additional_args:List[str]): self._INTERNAL_subdir=subdir self._INTERNAL_additional_args=additional_args self._INTERNAL_pararunner:Optional["ParallelParameterScan"]=None self._INTERNAL_script:Optional[str]=None def _INTERNAL_assemble_args(self) -> Optional[List[str]]: argnames:List[str]=[] for x in dir(self): if x[0]=="_": continue if isinstance(getattr(self,x),SimulationNamespace): getattr(self, x)._INTERNAL_add_to_arglist(argnames,x) else: argnames.append(x) #print(argnames) if self._INTERNAL_subdir is None: if len(argnames)==0: raise RuntimeError("Cannot run a ParallelParameterScan new_sim without sub-directory if there are no parameters passed") #self._INTERNAL_subdir= "__".join([an + "_" + str(getattr(self, an)) for an in argnames]) self._INTERNAL_subdir = "__".join([an + "_" + str(operator.attrgetter(an)(self)) for an in argnames]) if self._INTERNAL_pararunner is None: raise RuntimeError("The simulation was not correctly created with ParallelParameterScan.new_sim(...)") assert self._INTERNAL_script is not None args=["-u",self._INTERNAL_script,"--outdir", os.path.join(self._INTERNAL_pararunner._output_dir, self._INTERNAL_subdir)] #type: ignore if len(argnames)>0: args.append("-P") for an in argnames: root=self splt=an.split(".") for i in range(len(splt)-1): root=getattr(root,splt[i]) args.append(an + "=" + str(getattr(root, splt[-1]))) args += self._INTERNAL_additional_args return args
def _para_sim_call(args:List[str],logfilename:str,env:Dict[str,str]) -> bool: print("STARTING ",args) #p = subprocess.Popen(args,stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL,shell=0) if logfilename is not None: logfile=io.open(logfilename,"w") else: logfile=subprocess.DEVNULL p = subprocess.Popen(args, stdout=logfile, stderr=logfile, shell=False,env=env) res=p.wait() == 0 print("DONE ",res, args) return res
[docs] class ParallelParameterScan: """ A class for performing parallel parameter scans. Args: script_to_call (str): The script to call for each parameter simulation. output_dir (Optional[str]): The output directory for the parameter simulations. If not provided, a default directory will be created based on the script's name. max_procs (Optional[int]): The maximum number of processes to use for parallel execution. Defaults to the number of CPUs in the system. single_threaded_childs (bool): Whether to run each child process in single-threaded mode. Defaults to True. interpreter (str): The path to the Python interpreter to use. Defaults to the system's default interpreter. """ def __init__(self,script_to_call:str,output_dir:Optional[str]=None,max_procs:Optional[int]=os.cpu_count(),single_threaded_childs:bool=True,interpreter:str=sys.executable): self._script=script_to_call self._interpreter=interpreter if output_dir is None: parascript=__main__.__file__ bn=os.path.splitext(parascript)[0] output_dir=os.path.join(os.path.dirname(bn), os.path.basename(bn)) self._output_dir=output_dir self._max_procs=max_procs #0 means Nprocs of system self._single_threaded_childs=single_threaded_childs self._sims:List[SingleParallelParameterSimulation]=[] self._donefile=None def mark_as_done(self,sim:SingleParallelParameterSimulation,args:List[str])->None: if self._donefile is None: self._donefile=open(os.path.join(self._output_dir,"DONE_SIMS.txt"),"a") if self.already_done(args): return self._donefile.write("\t".join(args)+"\n") self._donefile.flush() #only_by_script_and_outdir is important, since passed expressions might be differently ordered! def already_done(self,args:List[str],only_by_script_and_outdir:bool=True)->bool: try: _donefile = open(os.path.join(self._output_dir, "DONE_SIMS.txt"), "r") if only_by_script_and_outdir: args=args[0:min(5,len(args))] tofind = "\t".join(args)+"\t" if not only_by_script_and_outdir: tofind+= "\n" for l in _donefile.readlines(): if only_by_script_and_outdir: if l.startswith(tofind): return True else: if l == tofind: return True except: return False return False
[docs] def new_sim(self, subdir: Optional[str] = None, additional_args: List[str] = []) -> SingleParallelParameterSimulation: """ Create a new simulation. You can set the parameters of the simulation by setting attributes of the returned instance. Args: subdir (str, optional): The subdirectory where the simulation will be saved. Defaults to None, will be determined automatically based on the parameters. additional_args (List[str], optional): Additional arguments to be passed to the simulation. Defaults to an empty list. Returns: SingleParallelParameterSimulation: The newly created simulation instance. """ res = SingleParallelParameterSimulation(subdir, additional_args.copy()) res._INTERNAL_script = self._script # type: ignore res._INTERNAL_pararunner = self # type: ignore self._sims.append(res) return res
[docs] def clear(self): """ Remove all simulations from the list. """ self._sims=[]
def done_callback(self,sim): #type: ignore if sim.result(): #type: ignore self.mark_as_done(sim._sim,sim._sim._args) #type: ignore
[docs] def run_all(self, skip_done: bool = False): """ Run all added simulations in parallel, by default single-CPU each, always using a maximum of max_procs processes. Args: skip_done (bool, optional): Whether to skip simulations that are already completed. Defaults to False. """ Path(self._output_dir).mkdir(parents=True, exist_ok=True) my_env = os.environ.copy() if self._single_threaded_childs: my_env["OMP_NUM_THREADS"] = "1" my_env["MKL_NUM_THREADS"] = "1" my_env["OPENBLAS_NUM_THREADS"] = "1" my_env["MKL_DOMAIN_NUM_THREADS"] = "MKL_BLAS=1, MKL_DOMAIN_PARDISO=1" with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_procs) as executor: for s in self._sims: args = s._INTERNAL_assemble_args() # type: ignore args: List[str] = [self._interpreter] + args s._args = args # type: ignore if skip_done: if self.already_done(args): print("SKIPPING COMPLETED ", args) continue logfile = os.path.join(self._output_dir, "log_" + s._INTERNAL_subdir + ".txt") # type: ignore future = executor.submit(functools.partial(_para_sim_call, args, logfile, my_env)) future._sim = s # type: ignore future.add_done_callback(lambda sim: self.done_callback(sim)) # type: ignore
#para=ParallelParameterScan("myscript.py") #sim=para.new_sim("SimA") #sim.paramA="bla" #sim.paramB=100 #para.run_all()