From 040098d2ccf24d8a54ec661e7656fe58acb817a3 Mon Sep 17 00:00:00 2001 From: james <81617086+je-cook@users.noreply.github.com> Date: Mon, 8 Jun 2026 10:08:55 +0100 Subject: [PATCH 1/6] scan cleanup --- process/core/scan.py | 791 +++++++++++++------------------------------ 1 file changed, 233 insertions(+), 558 deletions(-) diff --git a/process/core/scan.py b/process/core/scan.py index b2da1471e0..6e96e0ebbd 100644 --- a/process/core/scan.py +++ b/process/core/scan.py @@ -2,8 +2,9 @@ import logging import time -from dataclasses import astuple, dataclass +from dataclasses import dataclass from enum import Enum +from types import DynamicClassAttribute from typing import TYPE_CHECKING import numpy as np @@ -16,7 +17,7 @@ from process.core.solver import constraints from process.core.solver.solver_handler import SolverHandler from process.data_structure.numerics import FiguresOfMerit, PROCESSRunMode -from process.data_structure.scan_variables import IPNSCNS, NOUTVARS, ScanData +from process.data_structure.scan_variables import IPNSCNS, NOUTVARS if TYPE_CHECKING: from process.core.model import DataStructure, Model @@ -26,162 +27,141 @@ @dataclass class ScanVariable: - variable_name: str - variable_description: str + variable_area: SVE variable_num: int - def __iter__(self): - return iter(astuple(self)[:2]) + +class SVE(Enum): + P = "physics" + D = "divertor" + C = "constraints" + T = "tfcoil" + TR = "rebco" + CD = "current_drive" + NUM = "numerics" + CST = "costs" + IR = "impurity_radiation" + B = "build" + HT = "heat_transport" + PF = "pf_coil" + CS = "cs_fatigue" + FWBS = "fwbs" class ScanVariables(Enum): @classmethod - def _missing_(cls, var): - if isinstance(var, int): + def _missing_(cls, value): + if isinstance(value, int): for sv in cls: - if sv.value.variable_num == var: + if sv.number == value: return sv - return super()._missing_(var) - - aspect = ScanVariable("aspect", "Aspect_ratio", 1) - pflux_div_heat_load_max_mw = ScanVariable( - "pflux_div_heat_load_max_mw", "Div_heat_limit_(MW/m2)", 2 - ) - p_plant_electric_net_required_mw = ScanVariable( - "p_plant_electric_net_required_mw", "Net_electric_power_(MW)", 3 - ) - hfact = ScanVariable("hfact", "Confinement_H_factor", 4) - j_tf_coil_full_area = ScanVariable( - "j_tf_coil_full_area", "TF_inboard_leg_J_(MA/m2)", 5 - ) - pflux_fw_neutron_max_mw = ScanVariable( - "pflux_fw_neutron_max_mw", "Allow._wall_load_(MW/m2)", 6 - ) - beamfus0 = ScanVariable("beamfus0", "Beam_bkgrd_multiplier", 7) - temp_plasma_electron_vol_avg_kev = ScanVariable( - "temp_plasma_electron_vol_avg_kev", "Electron_temperature_keV", 9 - ) - boundu15 = ScanVariable("boundu(15)", "Volt-second_upper_bound", 10) - beta_norm_max = ScanVariable("beta_norm_max", "Beta_coefficient", 11) - f_c_plasma_bootstrap_max = ScanVariable( - "f_c_plasma_bootstrap_max", "Bootstrap_fraction", 12 - ) - boundu10 = ScanVariable("boundu(10)", "H_factor_upper_bound", 13) - fiooic = ScanVariable("fiooic", "TFC_Iop_/_Icrit_margin", 14) - rmajor = ScanVariable("rmajor", "Plasma_major_radius_(m)", 16) - b_tf_inboard_max = ScanVariable("b_tf_inboard_max", "Max_toroidal_field_(T)", 17) - eta_cd_norm_hcd_primary_max = ScanVariable( - "eta_cd_norm_hcd_primary_max", "Maximum_CD_gamma", 18 - ) - boundl16 = ScanVariable("boundl(16)", "CS_thickness_lower_bound", 19) - t_burn_min = ScanVariable("t_burn_min", "Minimum_burn_time_(s)", 20) - f_t_plant_available = ScanVariable( - "f_t_plant_available", "Plant_availability_factor", 22 - ) - p_fusion_total_max_mw = ScanVariable( - "p_fusion_total_max_mw", "Fusion_power_limit_(MW)", 24 - ) - kappa = ScanVariable("kappa", "Plasma_elongation", 25) - triang = ScanVariable("triang", "Plasma_triangularity", 26) - tbrmin = ScanVariable("tbrmin", "Min_tritium_breed._ratio", 27) - b_plasma_toroidal_on_axis = ScanVariable( - "b_plasma_toroidal_on_axis", "Tor._field_on_axis_(T)", 28 - ) - coreradius = ScanVariable("coreradius", "Core_radius", 29) - f_alpha_energy_confinement_min = ScanVariable( - "f_alpha_energy_confinement_min", "t_alpha_confinement/taueff_lower_limit", 31 - ) - epsvmc = ScanVariable("epsvmc", "VMCON error tolerance", 32) - boundu129 = ScanVariable("boundu(129)", " Neon upper limit", 38) - boundu131 = ScanVariable("boundu(131)", " Argon upper limit", 39) - boundu135 = ScanVariable("boundu(135)", " Xenon upper limit", 40) - dr_blkt_outboard = ScanVariable("dr_blkt_outboard", "Outboard blanket thick.", 41) - f_nd_impurity_electrons9 = ScanVariable( - "f_nd_impurity_electrons(9)", "Argon fraction", 42 - ) - sig_tf_case_max = ScanVariable( - "sig_tf_case_max", "Allowable_stress_in_tf_coil_case_Tresca_(pa)", 44 - ) - temp_tf_superconductor_margin_min = ScanVariable( - "temp_tf_superconductor_margin_min", "Minimum_allowable_temperature_margin", 45 - ) - boundu152 = ScanVariable( - "boundu(152)", "Max allowable f_nd_plasma_separatrix_greenwald", 46 - ) - n_tf_wp_pancakes = ScanVariable("n_tf_wp_pancakes", "TF Coil - n_tf_wp_pancakes", 48) - n_tf_wp_layers = ScanVariable("n_tf_wp_layers", "TF Coil - n_tf_wp_layers", 49) - f_nd_impurity_electrons13 = ScanVariable( - "f_nd_impurity_electrons(13)", "Xenon fraction", 50 - ) - f_p_div_lower = ScanVariable("f_p_div_lower", "lower_divertor_power_fraction", 51) - rad_fraction_sol = ScanVariable("rad_fraction_sol", "SoL radiation fraction", 52) - boundu157 = ScanVariable("boundu(157)", "Max allowable fvssu", 53) - Bc2_0K = ScanVariable("Bc2(0K)", "GL_NbTi Bc2(0K)", 54) - dr_shld_inboard = ScanVariable("dr_shld_inboard", "Inboard neutronic shield", 55) - p_cryo_plant_electric_max_mw = ScanVariable( - "p_cryo_plant_electric_max_mw", "max allowable p_cryo_plant_electric_mw", 56 - ) - boundl2 = ScanVariable("boundl(2)", "b_plasma_toroidal_on_axis minimum", 57) - dr_fw_plasma_gap_inboard = ScanVariable( - "dr_fw_plasma_gap_inboard", "Inboard FW-plasma sep gap", 58 - ) - dr_fw_plasma_gap_outboard = ScanVariable( - "dr_fw_plasma_gap_outboard", "Outboard FW-plasma sep gap", 59 - ) - sig_tf_wp_max = ScanVariable( - "sig_tf_wp_max", "Allowable_stress_in_tf_coil_conduit_Tresca_(pa)", 60 - ) - copperaoh_m2_max = ScanVariable( - "copperaoh_m2_max", "Max CS coil current / copper area", 61 - ) - coheof = ScanVariable("coheof", "CS coil current density at EOF (A/m2)", 62) - dr_cs = ScanVariable("dr_cs", "CS coil thickness (m)", 63) - ohhghf = ScanVariable("ohhghf", "CS height (m)", 64) - n_cycle_min = ScanVariable("n_cycle_min", "CS stress cycles min", 65) - oh_steel_frac = ScanVariable("oh_steel_frac", "CS steel fraction", 66) - t_crack_vertical = ScanVariable( - "t_crack_vertical", "Initial crack vertical size (m)", 67 - ) - inlet_temp_liq = ScanVariable( - "inlet_temp_liq", "Inlet Temperature Liquid Metal Breeder/Coolant (K)", 68 - ) - outlet_temp_liq = ScanVariable( - "outlet_temp_liq", "Outlet Temperature Liquid Metal Breeder/Coolant (K)", 69 - ) - blpressure_liq = ScanVariable( - "blpressure_liq", "Blanket liquid metal breeder/coolant pressure (Pa)", 70 - ) - n_liq_recirc = ScanVariable( - "n_liq_recirc", - "Selected number of liquid metal breeder recirculations per day", - 71, - ) - bz_channel_conduct_liq = ScanVariable( - "bz_channel_conduct_liq", - "Conductance of liquid metal breeder duct walls (A V-1 m-1)", - 72, - ) - pnuc_fw_ratio_dcll = ScanVariable( - "pnuc_fw_ratio_dcll", - "Ratio of FW nuclear power as fraction of total (FW+BB)", - 73, - ) - f_nuc_pow_bz_struct = ScanVariable( - "f_nuc_pow_bz_struct", - "Fraction of BZ power cooled by primary coolant for dual-coolant blanket", - 74, - ) - dx_fw_module = ScanVariable( - "dx_fw_module", "dx_fw_module of first wall cooling channels (m)", 75 - ) - eta_turbine = ScanVariable("eta_turbine", "Thermal conversion eff.", 76) - startupratio = ScanVariable("startupratio", "Gyrotron redundancy", 77) - fkind = ScanVariable("fkind", "Multiplier for Nth of a kind costs", 78) - eta_ecrh_injector_wall_plug = ScanVariable( - "eta_ecrh_injector_wall_plug", "ECH wall plug to injector efficiency", 79 - ) - fcoolcp = ScanVariable("fcoolcp", "Coolant fraction of TF", 80) - n_tf_coil_turns = ScanVariable("n_tf_coil_turns", "Number of turns in TF", 81) + raise ProcessValueError("Illegal scan variable number", nwp=value) + + def full_name(self): + if "__" in self.name: + return self.name.replace("__", "(") + ")" + return self.name + + def set(self, data, sweep): + var_area = getattr(data, self.area.value) + + if self.value.variable_num == 22 and var_area.i_plant_availability == 1: + raise ProcessValueError( + "Do not scan f_t_plant_available if i_plant_availability=1" + ) + + if "__" in self.name: + name, index = self.name.split("__") + getattr(var_area, name)[index] = sweep + if name == "f_nd_impurity_electrons": + var_area.f_nd_impurity_electron_array[int(index - 1)] = sweep + else: + setattr(var_area, self.name, sweep) + + self._data_ = getattr(var_area, name) + + @DynamicClassAttribute + def area(self): + return self.value.variable_area + + @DynamicClassAttribute + def number(self): + return self.value.variable_num + + @DynamicClassAttribute + def data(self): + if hasattr(self, "_data_"): + return self._data_ + raise ValueError("Data not available") + + aspect = ScanVariable(SVE.P, 1) + pflux_div_heat_load_max_mw = ScanVariable(SVE.D, 2) + p_plant_electric_net_required_mw = ScanVariable(SVE.C, 3) + hfact = ScanVariable(SVE.P, 4) + j_tf_coil_full_area = ScanVariable(SVE.T, 5) + pflux_fw_neutron_max_mw = ScanVariable(SVE.C, 6) + beamfus0 = ScanVariable(SVE.P, 7) + temp_plasma_electron_vol_avg_kev = ScanVariable(SVE.P, 9) + boundu__14 = ScanVariable(SVE.NUM, 10) + beta_norm_max = ScanVariable(SVE.P, 11) + f_c_plasma_bootstrap_max = ScanVariable(SVE.CD, 12) + boundu__10 = ScanVariable(SVE.NUM, 13) + rmajor = ScanVariable(SVE.P, 16) + b_tf_inboard_max = ScanVariable(SVE.C, 17) + eta_cd_norm_hcd_primary_max = ScanVariable(SVE.C, 18) + boundl__16 = ScanVariable(SVE.NUM, 19) + t_burn_min = ScanVariable(SVE.C, 20) + f_t_plant_available = ScanVariable(SVE.CST, 22) + p_fusion_total_max_mw = ScanVariable(SVE.C, 24) + kappa = ScanVariable(SVE.P, 25) + triang = ScanVariable(SVE.P, 26) + tbrmin = ScanVariable(SVE.C, 27) + b_plasma_toroidal_on_axis = ScanVariable(SVE.P, 28) + coreradius = ScanVariable(SVE.IR, 29) + f_alpha_energy_confinement_min = ScanVariable(SVE.C, 31) + epsvmc = ScanVariable(SVE.NUM, 32) + boundu__129 = ScanVariable(SVE.NUM, 38) + boundu__131 = ScanVariable(SVE.NUM, 39) + boundu__135 = ScanVariable(SVE.NUM, 40) + dr_blkt_outboard = ScanVariable(SVE.B, 41) + f_nd_impurity_electrons__9 = ScanVariable(SVE.IR, 42) + sig_tf_case_max = ScanVariable(SVE.T, 44) + temp_tf_superconductor_margin_min = ScanVariable(SVE.T, 45) + boundu__152 = ScanVariable(SVE.NUM, 46) + n_tf_wp_pancakes = ScanVariable(SVE.T, 48) + n_tf_wp_layers = ScanVariable(SVE.T, 49) + f_nd_impurity_electrons__13 = ScanVariable(SVE.IR, 50) + f_p_div_lower = ScanVariable(SVE.P, 51) + rad_fraction_sol = ScanVariable(SVE.P, 52) + boundu__157 = ScanVariable(SVE.NUM, 53) + b_crit_upper_nbti = ScanVariable(SVE.T, 54) + dr_shld_inboard = ScanVariable(SVE.B, 55) + p_cryo_plant_electric_max_mw = ScanVariable(SVE.HT, 56) + boundl__2 = ScanVariable(SVE.NUM, 57) + dr_fw_plasma_gap_inboard = ScanVariable(SVE.B, 58) + dr_fw_plasma_gap_outboard = ScanVariable(SVE.B, 59) + sig_tf_wp_max = ScanVariable(SVE.T, 60) + copperaoh_m2_max = ScanVariable(SVE.TR, 61) + coheof = ScanVariable(SVE.PF, 62) + dr_cs = ScanVariable(SVE.B, 63) + ohhghf = ScanVariable(SVE.PF, 64) + n_cycle_min = ScanVariable(SVE.CS, 65) + oh_steel_frac = ScanVariable(SVE.PF, 66) + t_crack_vertical = ScanVariable(SVE.CS, 67) + inlet_temp_liq = ScanVariable(SVE.FWBS, 68) + outlet_temp_liq = ScanVariable(SVE.FWBS, 69) + blpressure_liq = ScanVariable(SVE.FWBS, 70) + n_liq_recirc = ScanVariable(SVE.FWBS, 71) + bz_channel_conduct_liq = ScanVariable(SVE.FWBS, 72) + pnuc_fw_ratio_dcll = ScanVariable(SVE.FWBS, 73) + f_nuc_pow_bz_struct = ScanVariable(SVE.FWBS, 74) + dx_fw_module = ScanVariable(SVE.FWBS, 75) + eta_turbine = ScanVariable(SVE.HT, 76) + startupratio = ScanVariable(SVE.CST, 77) + fkind = ScanVariable(SVE.CST, 78) + eta_ecrh_injector_wall_plug = ScanVariable(SVE.CD, 79) + fcoolcp = ScanVariable(SVE.T, 80) + n_tf_coil_turns = ScanVariable(SVE.T, 81) class Scan: @@ -261,20 +241,16 @@ def post_optimise(self, ifail: int): ) process_output.oheadr(constants.NOUT, "Numerics") - if self.solver == "fsolve": - process_output.ocmmnt( - constants.NOUT, "PROCESS has performed an fsolve (evaluation) run." - ) - else: - process_output.ocmmnt( - constants.NOUT, "PROCESS has performed a VMCON (optimisation) run." - ) + process_output.ocmmnt( + constants.NOUT, + f"PROCESS has performed a {'fsolve' if self.solver == 'fsolve' else 'VMCON'} (optimisation) run.", + ) if ifail != 1: process_output.ovarin(constants.NOUT, "Error flag", "(ifail)", ifail) process_output.oheadr( constants.IOTTY, "PROCESS COULD NOT FIND A FEASIBLE SOLUTION" ) - process_output.oblnkl(constants.IOTTY) + print() logger.critical("Solver returns with ifail /= 1. %s", ifail) @@ -282,7 +258,7 @@ def post_optimise(self, ifail: int): if self.solver == "vmcon": self.verror(ifail) process_output.oblnkl(constants.NOUT) - process_output.oblnkl(constants.IOTTY) + print() else: # Solution found if self.solver != "fsolve": @@ -303,33 +279,13 @@ def post_optimise(self, ifail: int): process_output.ovarin(constants.NOUT, "Error flag", "(ifail)", ifail) if self.data.numerics.sqsumsq >= 1.0e-2: - process_output.oblnkl(constants.NOUT) - process_output.ocmmnt( - constants.NOUT, - "WARNING: Constraint residues are HIGH; consider re-running", - ) - process_output.ocmmnt( - constants.NOUT, - " with lower values of EPSVMC to confirm convergence...", - ) - process_output.ocmmnt( - constants.NOUT, - " (should be able to get down to about 1.0E-8 okay)", - ) - process_output.oblnkl(constants.NOUT) - process_output.ocmmnt( - constants.IOTTY, - "WARNING: Constraint residues are HIGH; consider re-running", - ) - process_output.ocmmnt( - constants.IOTTY, - " with lower values of EPSVMC to confirm convergence...", - ) - process_output.ocmmnt( - constants.IOTTY, - " (should be able to get down to about 1.0E-8 okay)", + string = ( + "WARNING: Constraint residues are HIGH; consider re-running\n" + " with lower values of EPSVMC to confirm convergence...\n" + " (should be able to get down to about 1.0E-8 okay)\n" ) - process_output.oblnkl(constants.IOTTY) + process_output.ocmmnt(constants.NOUT, ("\n" + string)) + print(string) logger.warning( f"High final constraint residues. {self.data.numerics.sqsumsq=}" @@ -703,147 +659,55 @@ def verror(ifail: int): """ if ifail == -1: - process_output.ocmmnt(constants.NOUT, "User-terminated execution of VMCON.") - process_output.ocmmnt(constants.IOTTY, "User-terminated execution of VMCON.") + strings = ("User-terminated execution of VMCON.",) elif ifail == 0: - process_output.ocmmnt( - constants.NOUT, "Improper input parameters to the VMCON routine." - ) - process_output.ocmmnt(constants.NOUT, "PROCESS coding must be checked.") - - process_output.ocmmnt( - constants.IOTTY, "Improper input parameters to the VMCON routine." + strings = ( + "Improper input parameters to the VMCON routine.", + "PROCESS coding must be checked.", ) - process_output.ocmmnt(constants.IOTTY, "PROCESS coding must be checked.") elif ifail == 2: - process_output.ocmmnt( - constants.NOUT, - "The maximum number of calls has been reached without solution.", - ) - process_output.ocmmnt( - constants.NOUT, - "The code may be stuck in a minimum in the residual space that is significantly above zero.", - ) - process_output.oblnkl(constants.NOUT) - process_output.ocmmnt( - constants.NOUT, "There is either no solution possible, or the code" - ) - process_output.ocmmnt( - constants.NOUT, "is failing to escape from a deep local minimum." - ) - process_output.ocmmnt( - constants.NOUT, - "Try changing the variables in IXC, or modify their initial values.", - ) - - process_output.ocmmnt( - constants.IOTTY, + strings = ( "The maximum number of calls has been reached without solution.", - ) - process_output.ocmmnt( - constants.IOTTY, "The code may be stuck in a minimum in the residual space that is significantly above zero.", - ) - process_output.oblnkl(constants.NOUT) - process_output.oblnkl(constants.IOTTY) - process_output.ocmmnt( - constants.IOTTY, "There is either no solution possible, or the code" - ) - process_output.ocmmnt( - constants.IOTTY, "is failing to escape from a deep local minimum." - ) - process_output.ocmmnt( - constants.IOTTY, + "", + "There is either no solution possible, or the code", + "is failing to escape from a deep local minimum.", "Try changing the variables in IXC, or modify their initial values.", ) elif ifail == 3: - process_output.ocmmnt( - constants.NOUT, "The line search required the maximum of 10 calls." - ) - process_output.ocmmnt( - constants.NOUT, "A feasible solution may be difficult to achieve." - ) - process_output.ocmmnt( - constants.NOUT, "Try changing or adding variables to IXC." - ) - - process_output.ocmmnt( - constants.IOTTY, "The line search required the maximum of 10 calls." - ) - process_output.ocmmnt( - constants.IOTTY, "A feasible solution may be difficult to achieve." - ) - process_output.ocmmnt( - constants.IOTTY, "Try changing or adding variables to IXC." + strings = ( + "The line search required the maximum of 10 calls.", + "A feasible solution may be difficult to achieve.", + "Try changing or adding variables to IXC.", ) elif ifail == 4: - process_output.ocmmnt( - constants.NOUT, "An uphill search direction was found." - ) - process_output.ocmmnt( - constants.NOUT, "Try changing the equations in ICC, or" + strings = ( + "An uphill search direction was found.", + "Try changing the equations in ICC, or", + "adding new variables to IXC.", ) - process_output.ocmmnt(constants.NOUT, "adding new variables to IXC.") - - process_output.ocmmnt( - constants.IOTTY, "An uphill search direction was found." - ) - process_output.ocmmnt( - constants.IOTTY, "Try changing the equations in ICC, or" - ) - process_output.ocmmnt(constants.IOTTY, "adding new variables to IXC.") elif ifail == 5: - process_output.ocmmnt( - constants.NOUT, "The quadratic programming technique was unable to" - ) - process_output.ocmmnt(constants.NOUT, "find a feasible point.") - process_output.oblnkl(constants.NOUT) - process_output.ocmmnt( - constants.NOUT, "Try changing or adding variables to IXC, or modify" - ) - process_output.ocmmnt( - constants.NOUT, + strings = ( + "The quadratic programming technique was unable to", + "find a feasible point.", + "", + "Try changing or adding variables to IXC, or modify", "their initial values (especially if only 1 optimisation", + "iteration was performed).", ) - process_output.ocmmnt(constants.NOUT, "iteration was performed).") - process_output.ocmmnt( - constants.IOTTY, "The quadratic programming technique was unable to" - ) - process_output.ocmmnt(constants.IOTTY, "find a feasible point.") - process_output.oblnkl(constants.IOTTY) - process_output.ocmmnt( - constants.IOTTY, "Try changing or adding variables to IXC, or modify" - ) - process_output.ocmmnt( - constants.IOTTY, - "their initial values (especially if only 1 optimisation", - ) - process_output.ocmmnt(constants.IOTTY, "iteration was performed).") elif ifail == 6: - process_output.ocmmnt( - constants.NOUT, "The quadratic programming technique was restricted" + strings = ( + "The quadratic programming technique was restricted", + "by an artificial bound, or failed due to a singular", + "matrix.", + "Try changing the equations in ICC, or", + "adding new variables to IXC.", ) - process_output.ocmmnt( - constants.NOUT, "by an artificial bound, or failed due to a singular" - ) - process_output.ocmmnt(constants.NOUT, "matrix.") - process_output.ocmmnt( - constants.NOUT, "Try changing the equations in ICC, or" - ) - process_output.ocmmnt(constants.NOUT, "adding new variables to IXC.") - process_output.ocmmnt( - constants.IOTTY, "The quadratic programming technique was restricted" - ) - process_output.ocmmnt( - constants.IOTTY, "by an artificial bound, or failed due to a singular" - ) - process_output.ocmmnt(constants.IOTTY, "matrix.") - process_output.ocmmnt( - constants.IOTTY, "Try changing the equations in ICC, or" - ) - process_output.ocmmnt(constants.IOTTY, "adding new variables to IXC.") + strings = "\n".join(strings) + process_output.ocmmnt(constants.NOUT, strings) + print(strings) def scan_1d(self): """Run a 1-D scan.""" @@ -871,7 +735,7 @@ def scan_1d(self): " ****************************************** Scan Convergence Summary ****************************************** \n" ) sweep_values = self.data.scan.sweep[: self.data.scan.isweep] - nsweep_var_name, _ = self.scan_select( + nsweep_var = self.scan_select( self.data.scan.nsweep, self.data.scan.sweep, self.data.scan.isweep ) converged_count = 0 @@ -881,20 +745,18 @@ def scan_1d(self): max_sweep_value_length - len(str(sweep_val).replace(".", "")) for sweep_val in sweep_values ] - for iscan in range(1, self.data.scan.isweep + 1): - if scan_1d_ifail_dict[iscan] == 1: + for iscan in range(self.data.scan.isweep): + pstring = ( + f"Scan {iscan:02d}: {nsweep_var.name} = {sweep_values[iscan]} " + + " " * offsets[iscan] + + "\u001b[32m{}CONVERGED \u001b[0m" + ) + if scan_1d_ifail_dict[iscan + 1] == 1: converged_count += 1 - print( - f"Scan {iscan:02d}: {nsweep_var_name} = {sweep_values[iscan - 1]} " - + " " * offsets[iscan - 1] - + "\u001b[32mCONVERGED \u001b[0m" - ) + pstring.format("") else: - print( - f"Scan {iscan:02d}: {nsweep_var_name} = {sweep_values[iscan - 1]} " - + " " * offsets[iscan - 1] - + "\u001b[31mUNCONVERGED \u001b[0m" - ) + pstring.format("UN") + print(pstring) converged_percentage = converged_count / self.data.scan.isweep * 100 print(f"\nConvergence Percentage: {converged_percentage:.2f}%") @@ -932,10 +794,10 @@ def scan_2d(self): ) sweep_1_values = self.data.scan.sweep[: self.data.scan.isweep] sweep_2_values = self.data.scan.sweep_2[: self.data.scan.isweep_2] - nsweep_var_name, _ = self.scan_select( + nsweep_var = self.scan_select( self.data.scan.nsweep, self.data.scan.sweep, self.data.scan.isweep ) - nsweep_2_var_name, _ = self.scan_select( + nsweep_2_var = self.scan_select( self.data.scan.nsweep_2, self.data.scan.sweep_2, self.data.scan.isweep_2 ) converged_count = 0 @@ -957,82 +819,56 @@ def scan_2d(self): for iscan_1 in range(1, self.data.scan.isweep + 1): for iscan_2 in range(1, self.data.scan.isweep_2 + 1): + string = ( + f"Scan {scan_point:02d}: ({nsweep_var.name} = {sweep_1_values[iscan_1 - 1]}," + f" {nsweep_2_var.name} = {sweep_2_values[iscan_2 - 1]}) " + + " " * offsets[iscan_1 - 1][iscan_2 - 1] + + "\u001b[32m{}CONVERGED \u001b[0m" + ) if scan_2d_ifail_list[iscan_1][iscan_2] == 1: converged_count += 1 - print( - f"Scan {scan_point:02d}: ({nsweep_var_name} = {sweep_1_values[iscan_1 - 1]}, {nsweep_2_var_name} = {sweep_2_values[iscan_2 - 1]}) " - + " " * offsets[iscan_1 - 1][iscan_2 - 1] - + "\u001b[32mCONVERGED \u001b[0m" - ) - scan_point += 1 + print(string.format()) else: - print( - f"Scan {scan_point:02d}: ({nsweep_var_name} = {sweep_1_values[iscan_1 - 1]}, {nsweep_2_var_name} = {sweep_2_values[iscan_2 - 1]}) " - + " " * offsets[iscan_1 - 1][iscan_2 - 1] - + "\u001b[31mUNCONVERGED \u001b[0m" - ) - scan_point += 1 + print(string.format("UN")) + scan_point += 1 converged_percentage = ( converged_count / (self.data.scan.isweep * self.data.scan.isweep_2) * 100 ) print(f"\nConvergence Percentage: {converged_percentage:.2f}%") - @staticmethod - def scan_2d_init(scan_data: ScanData): - process_output.ovarin( - constants.MFILE, - "Number of first variable scan points", - "(isweep)", - scan_data.isweep, - ) - process_output.ovarin( - constants.MFILE, - "Number of second variable scan points", - "(isweep_2)", - scan_data.isweep_2, - ) - process_output.ovarin( - constants.MFILE, - "Scanning first variable number", - "(nsweep)", - scan_data.nsweep, - ) - process_output.ovarin( - constants.MFILE, - "Scanning second variable number", - "(nsweep_2)", - scan_data.nsweep_2, - ) - process_output.ovarin( - constants.MFILE, - "Scanning second variable number", - "(nsweep_2)", - scan_data.nsweep_2, - ) - process_output.ovarin( - constants.MFILE, - "Scanning second variable number", - "(nsweep_2)", - scan_data.nsweep_2, - ) + def scan_2d_init(self): + sv = self.data.scan + for d, n, v in ( + ("Number of first variable scan points", "(isweep)", sv.isweep), + ("Number of second variable scan points", "(isweep_2)", sv.isweep_2), + ("Scanning first variable number", "(nsweep)", sv.nsweep), + ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), + ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), + ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), + ): + process_output.ovarin(constants.MFILE, d, n, v) + + def _set_v_x_label(self, iscan, twod=False): + if twod: + sv = self.scan_select(self.data.scan.nsweep_2, self.data.scan.sweep_2, iscan) + else: + sv = self.scan_select(self.data.scan.nsweep, self.data.scan.sweep, iscan) + self.data.globals.vlabel.vlabel = sv.name + self.data.globals.xlabel = sv.data.description def scan_1d_write_point_header(self, iscan: int): self.data.globals.iscan_global = iscan - self.data.globals.vlabel, self.data.globals.xlabel = self.scan_select( - self.data.scan.nsweep, self.data.scan.sweep, iscan - ) + self._set_v_x_label(iscan) process_output.oblnkl(constants.NOUT) - process_output.ostars(constants.NOUT, 110) + process_output.oblnkl(constants.MFILE) process_output.write( constants.NOUT, - f"***** Scan point {iscan} of {self.data.scan.isweep} : {self.data.globals.xlabel}" + f"Scan point {iscan} of {self.data.scan.isweep} : {self.data.globals.xlabel}" f", {self.data.globals.vlabel} = {self.data.scan.sweep[iscan - 1]} " - "*****", + "", ) - process_output.ostars(constants.NOUT, 110) - process_output.oblnkl(constants.MFILE) process_output.ovarin(constants.MFILE, "Scan point number", "(iscan)", iscan) print( @@ -1047,15 +883,11 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): # Makes iscan available globally (read-only) self.data.globals.iscan_global = iscan - self.data.globals.vlabel, self.data.globals.xlabel = self.scan_select( - self.data.scan.nsweep, self.data.scan.sweep, iscan_1 - ) - self.data.globals.vlabel_2, self.data.globals.xlabel_2 = self.scan_select( - self.data.scan.nsweep_2, self.data.scan.sweep_2, iscan_r - ) + self._set_v_x_label(iscan_1) + self._set_v_x_label(iscan_r) process_output.oblnkl(constants.NOUT) - process_output.ostars(constants.NOUT, 110) + process_output.oblnkl(constants.MFILE) process_output.write( constants.NOUT, @@ -1064,8 +896,6 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): f" {self.data.globals.vlabel_2} = {self.data.scan.sweep_2[iscan_r - 1]} " "*****", ) - process_output.ostars(constants.NOUT, 110) - process_output.oblnkl(constants.MFILE) process_output.ovarin(constants.MFILE, "Scan point number", "(iscan)", iscan) print( @@ -1077,173 +907,18 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): return iscan_r - @staticmethod - def scan_1d_write_plot(scan_data: ScanData): - if scan_data.first_call_1d: - process_output.ovarin( - constants.MFILE, - "Number of scan points", - "(isweep)", - scan_data.isweep, - ) - process_output.ovarin( - constants.MFILE, - "Scanning variable number", - "(nsweep)", - scan_data.nsweep, - ) + def scan_1d_write_plot(self): + if self.data.scan.first_call_1d: + for d, n, v in ( + ("Number of scan points", "(isweep)", self.data.scan.isweep), + ("Scanning variable number", "(nsweep)", self.data.scan.nsweep), + ): + process_output.ovarin(constants.MFILE, d, n, v) - scan_data.first_call_1d = False + self.data.scan.first_call_1d = False def scan_select(self, nwp, swp, iscn): - match nwp: - case 1: - self.data.physics.aspect = swp[iscn - 1] - case 2: - self.data.divertor.pflux_div_heat_load_max_mw = swp[iscn - 1] - case 3: - self.data.constraints.p_plant_electric_net_required_mw = swp[iscn - 1] - case 4: - self.data.physics.hfact = swp[iscn - 1] - case 5: - self.data.tfcoil.j_tf_coil_full_area = swp[iscn - 1] - case 6: - self.data.constraints.pflux_fw_neutron_max_mw = swp[iscn - 1] - case 7: - self.data.physics.beamfus0 = swp[iscn - 1] - case 9: - self.data.physics.temp_plasma_electron_vol_avg_kev = swp[iscn - 1] - case 10: - self.data.numerics.boundu[14] = swp[iscn - 1] - case 11: - self.data.physics.beta_norm_max = swp[iscn - 1] - case 12: - self.data.current_drive.f_c_plasma_bootstrap_max = swp[iscn - 1] - case 13: - self.data.numerics.boundu[9] = swp[iscn - 1] - case 16: - self.data.physics.rmajor = swp[iscn - 1] - case 17: - self.data.constraints.b_tf_inboard_max = swp[iscn - 1] - case 18: - self.data.constraints.eta_cd_norm_hcd_primary_max = swp[iscn - 1] - case 19: - self.data.numerics.boundl[15] = swp[iscn - 1] - case 20: - self.data.constraints.t_burn_min = swp[iscn - 1] - case 22: - if self.data.costs.i_plant_availability == 1: - raise ProcessValueError( - "Do not scan f_t_plant_available if i_plant_availability=1" - ) - self.data.costs.f_t_plant_available = swp[iscn - 1] - case 24: - self.data.constraints.p_fusion_total_max_mw = swp[iscn - 1] - case 25: - self.data.physics.kappa = swp[iscn - 1] - case 26: - self.data.physics.triang = swp[iscn - 1] - case 27: - self.data.constraints.tbrmin = swp[iscn - 1] - case 28: - self.data.physics.b_plasma_toroidal_on_axis = swp[iscn - 1] - case 29: - self.data.impurity_radiation.radius_plasma_core_norm = swp[iscn - 1] - case 31: - self.data.constraints.f_alpha_energy_confinement_min = swp[iscn - 1] - case 32: - self.data.numerics.epsvmc = swp[iscn - 1] - case 38: - self.data.numerics.boundu[128] = swp[iscn - 1] - case 39: - self.data.numerics.boundu[130] = swp[iscn - 1] - case 40: - self.data.numerics.boundu[134] = swp[iscn - 1] - case 41: - self.data.build.dr_blkt_outboard = swp[iscn - 1] - case 42: - self.data.impurity_radiation.f_nd_impurity_electrons[8] = swp[iscn - 1] - self.data.impurity_radiation.f_nd_impurity_electron_array[8] = ( - self.data.impurity_radiation.f_nd_impurity_electrons[8] - ) - case 44: - self.data.tfcoil.sig_tf_case_max = swp[iscn - 1] - case 45: - self.data.tfcoil.temp_tf_superconductor_margin_min = swp[iscn - 1] - case 46: - self.data.numerics.boundu[151] = swp[iscn - 1] - case 48: - self.data.tfcoil.n_tf_wp_pancakes = int(swp[iscn - 1]) - case 49: - self.data.tfcoil.n_tf_wp_layers = int(swp[iscn - 1]) - case 50: - self.data.impurity_radiation.f_nd_impurity_electrons[12] = swp[iscn - 1] - self.data.impurity_radiation.f_nd_impurity_electron_array[12] = ( - self.data.impurity_radiation.f_nd_impurity_electrons[12] - ) - case 51: - self.data.physics.f_p_div_lower = swp[iscn - 1] - case 52: - self.data.physics.rad_fraction_sol = swp[iscn - 1] - case 53: - self.data.numerics.boundu[156] = swp[iscn - 1] - case 54: - self.data.tfcoil.b_crit_upper_nbti = swp[iscn - 1] - case 55: - self.data.build.dr_shld_inboard = swp[iscn - 1] - case 56: - self.data.heat_transport.p_cryo_plant_electric_max_mw = swp[iscn - 1] - case 57: - self.data.numerics.boundl[1] = swp[iscn - 1] - case 58: - self.data.build.dr_fw_plasma_gap_inboard = swp[iscn - 1] - case 59: - self.data.build.dr_fw_plasma_gap_outboard = swp[iscn - 1] - case 60: - self.data.tfcoil.sig_tf_wp_max = swp[iscn - 1] - case 61: - self.data.rebco.copperaoh_m2_max = swp[iscn - 1] - case 62: - self.data.pf_coil.j_cs_flat_top_end = swp[iscn - 1] - case 63: - self.data.build.dr_cs = swp[iscn - 1] - case 64: - self.data.pf_coil.f_z_cs_tf_internal = swp[iscn - 1] - case 65: - self.data.cs_fatigue.n_cycle_min = swp[iscn - 1] - case 66: - self.data.pf_coil.f_a_cs_turn_steel = swp[iscn - 1] - case 67: - self.data.cs_fatigue.t_crack_vertical = swp[iscn - 1] - case 68: - self.data.fwbs.inlet_temp_liq = swp[iscn - 1] - case 69: - self.data.fwbs.outlet_temp_liq = swp[iscn - 1] - case 70: - self.data.fwbs.blpressure_liq = swp[iscn - 1] - case 71: - self.data.fwbs.n_liq_recirc = swp[iscn - 1] - case 72: - self.data.fwbs.bz_channel_conduct_liq = swp[iscn - 1] - case 73: - self.data.fwbs.pnuc_fw_ratio_dcll = swp[iscn - 1] - case 74: - self.data.fwbs.f_nuc_pow_bz_struct = swp[iscn - 1] - case 75: - self.data.fwbs.dx_fw_module = swp[iscn - 1] - case 76: - self.data.heat_transport.eta_turbine = swp[iscn - 1] - case 77: - self.data.costs.startupratio = swp[iscn - 1] - case 78: - self.data.costs.fkind = swp[iscn - 1] - case 79: - self.data.current_drive.eta_ecrh_injector_wall_plug = swp[iscn - 1] - case 80: - self.data.tfcoil.fcoolcp = swp[iscn - 1] - case 81: - self.data.tfcoil.n_tf_coil_turns = swp[iscn - 1] - case _: - raise ProcessValueError("Illegal scan variable number", nwp=nwp) - - return ScanVariables(int(nwp)).value + sv = ScanVariables(int(nwp)) + sweep = swp[iscn - 1] + sv.set(self.data, sweep) + return sv From 7151b9e9c553bff59cbac604b999ecd4f9a55ab6 Mon Sep 17 00:00:00 2001 From: james <81617086+je-cook@users.noreply.github.com> Date: Sat, 13 Jun 2026 10:13:21 +0100 Subject: [PATCH 2/6] scan variable cleanup --- process/core/scan.py | 364 ++++++++++++++++++++----------------------- 1 file changed, 172 insertions(+), 192 deletions(-) diff --git a/process/core/scan.py b/process/core/scan.py index 6e96e0ebbd..16f1dcc954 100644 --- a/process/core/scan.py +++ b/process/core/scan.py @@ -27,8 +27,8 @@ @dataclass class ScanVariable: - variable_area: SVE - variable_num: int + number: int + area: SVE class SVE(Enum): @@ -48,7 +48,7 @@ class SVE(Enum): FWBS = "fwbs" -class ScanVariables(Enum): +class ScanVariables(ScanVariable, Enum): @classmethod def _missing_(cls, value): if isinstance(value, int): @@ -65,7 +65,7 @@ def full_name(self): def set(self, data, sweep): var_area = getattr(data, self.area.value) - if self.value.variable_num == 22 and var_area.i_plant_availability == 1: + if self.number == 22 and var_area.i_plant_availability == 1: raise ProcessValueError( "Do not scan f_t_plant_available if i_plant_availability=1" ) @@ -80,88 +80,80 @@ def set(self, data, sweep): self._data_ = getattr(var_area, name) - @DynamicClassAttribute - def area(self): - return self.value.variable_area - - @DynamicClassAttribute - def number(self): - return self.value.variable_num - @DynamicClassAttribute def data(self): if hasattr(self, "_data_"): return self._data_ raise ValueError("Data not available") - aspect = ScanVariable(SVE.P, 1) - pflux_div_heat_load_max_mw = ScanVariable(SVE.D, 2) - p_plant_electric_net_required_mw = ScanVariable(SVE.C, 3) - hfact = ScanVariable(SVE.P, 4) - j_tf_coil_full_area = ScanVariable(SVE.T, 5) - pflux_fw_neutron_max_mw = ScanVariable(SVE.C, 6) - beamfus0 = ScanVariable(SVE.P, 7) - temp_plasma_electron_vol_avg_kev = ScanVariable(SVE.P, 9) - boundu__14 = ScanVariable(SVE.NUM, 10) - beta_norm_max = ScanVariable(SVE.P, 11) - f_c_plasma_bootstrap_max = ScanVariable(SVE.CD, 12) - boundu__10 = ScanVariable(SVE.NUM, 13) - rmajor = ScanVariable(SVE.P, 16) - b_tf_inboard_max = ScanVariable(SVE.C, 17) - eta_cd_norm_hcd_primary_max = ScanVariable(SVE.C, 18) - boundl__16 = ScanVariable(SVE.NUM, 19) - t_burn_min = ScanVariable(SVE.C, 20) - f_t_plant_available = ScanVariable(SVE.CST, 22) - p_fusion_total_max_mw = ScanVariable(SVE.C, 24) - kappa = ScanVariable(SVE.P, 25) - triang = ScanVariable(SVE.P, 26) - tbrmin = ScanVariable(SVE.C, 27) - b_plasma_toroidal_on_axis = ScanVariable(SVE.P, 28) - coreradius = ScanVariable(SVE.IR, 29) - f_alpha_energy_confinement_min = ScanVariable(SVE.C, 31) - epsvmc = ScanVariable(SVE.NUM, 32) - boundu__129 = ScanVariable(SVE.NUM, 38) - boundu__131 = ScanVariable(SVE.NUM, 39) - boundu__135 = ScanVariable(SVE.NUM, 40) - dr_blkt_outboard = ScanVariable(SVE.B, 41) - f_nd_impurity_electrons__9 = ScanVariable(SVE.IR, 42) - sig_tf_case_max = ScanVariable(SVE.T, 44) - temp_tf_superconductor_margin_min = ScanVariable(SVE.T, 45) - boundu__152 = ScanVariable(SVE.NUM, 46) - n_tf_wp_pancakes = ScanVariable(SVE.T, 48) - n_tf_wp_layers = ScanVariable(SVE.T, 49) - f_nd_impurity_electrons__13 = ScanVariable(SVE.IR, 50) - f_p_div_lower = ScanVariable(SVE.P, 51) - rad_fraction_sol = ScanVariable(SVE.P, 52) - boundu__157 = ScanVariable(SVE.NUM, 53) - b_crit_upper_nbti = ScanVariable(SVE.T, 54) - dr_shld_inboard = ScanVariable(SVE.B, 55) - p_cryo_plant_electric_max_mw = ScanVariable(SVE.HT, 56) - boundl__2 = ScanVariable(SVE.NUM, 57) - dr_fw_plasma_gap_inboard = ScanVariable(SVE.B, 58) - dr_fw_plasma_gap_outboard = ScanVariable(SVE.B, 59) - sig_tf_wp_max = ScanVariable(SVE.T, 60) - copperaoh_m2_max = ScanVariable(SVE.TR, 61) - coheof = ScanVariable(SVE.PF, 62) - dr_cs = ScanVariable(SVE.B, 63) - ohhghf = ScanVariable(SVE.PF, 64) - n_cycle_min = ScanVariable(SVE.CS, 65) - oh_steel_frac = ScanVariable(SVE.PF, 66) - t_crack_vertical = ScanVariable(SVE.CS, 67) - inlet_temp_liq = ScanVariable(SVE.FWBS, 68) - outlet_temp_liq = ScanVariable(SVE.FWBS, 69) - blpressure_liq = ScanVariable(SVE.FWBS, 70) - n_liq_recirc = ScanVariable(SVE.FWBS, 71) - bz_channel_conduct_liq = ScanVariable(SVE.FWBS, 72) - pnuc_fw_ratio_dcll = ScanVariable(SVE.FWBS, 73) - f_nuc_pow_bz_struct = ScanVariable(SVE.FWBS, 74) - dx_fw_module = ScanVariable(SVE.FWBS, 75) - eta_turbine = ScanVariable(SVE.HT, 76) - startupratio = ScanVariable(SVE.CST, 77) - fkind = ScanVariable(SVE.CST, 78) - eta_ecrh_injector_wall_plug = ScanVariable(SVE.CD, 79) - fcoolcp = ScanVariable(SVE.T, 80) - n_tf_coil_turns = ScanVariable(SVE.T, 81) + aspect = (1, SVE.P) + pflux_div_heat_load_max_mw = (2, SVE.D) + p_plant_electric_net_required_mw = (3, SVE.C) + hfact = (4, SVE.P) + j_tf_coil_full_area = (5, SVE.T) + pflux_fw_neutron_max_mw = (6, SVE.C) + beamfus0 = (7, SVE.P) + temp_plasma_electron_vol_avg_kev = (9, SVE.P) + boundu__14 = (10, SVE.NUM) + beta_norm_max = (11, SVE.P) + f_c_plasma_bootstrap_max = (12, SVE.CD) + boundu__10 = (13, SVE.NUM) + rmajor = (16, SVE.P) + b_tf_inboard_max = (17, SVE.C) + eta_cd_norm_hcd_primary_max = (18, SVE.C) + boundl__16 = (19, SVE.NUM) + t_burn_min = (20, SVE.C) + f_t_plant_available = (22, SVE.CST) + p_fusion_total_max_mw = (24, SVE.C) + kappa = (25, SVE.P) + triang = (26, SVE.P) + tbrmin = (27, SVE.C) + b_plasma_toroidal_on_axis = (28, SVE.P) + coreradius = (29, SVE.IR) + f_alpha_energy_confinement_min = (31, SVE.C) + epsvmc = (32, SVE.NUM) + boundu__129 = (38, SVE.NUM) + boundu__131 = (39, SVE.NUM) + boundu__135 = (40, SVE.NUM) + dr_blkt_outboard = (41, SVE.B) + f_nd_impurity_electrons__9 = (42, SVE.IR) + sig_tf_case_max = (44, SVE.T) + temp_tf_superconductor_margin_min = (45, SVE.T) + boundu__152 = (46, SVE.NUM) + n_tf_wp_pancakes = (48, SVE.T) + n_tf_wp_layers = (49, SVE.T) + f_nd_impurity_electrons__13 = (50, SVE.IR) + f_p_div_lower = (51, SVE.P) + rad_fraction_sol = (52, SVE.P) + boundu__157 = (53, SVE.NUM) + b_crit_upper_nbti = (54, SVE.T) + dr_shld_inboard = (55, SVE.B) + p_cryo_plant_electric_max_mw = (56, SVE.HT) + boundl__2 = (57, SVE.NUM) + dr_fw_plasma_gap_inboard = (58, SVE.B) + dr_fw_plasma_gap_outboard = (59, SVE.B) + sig_tf_wp_max = (60, SVE.T) + copperaoh_m2_max = (61, SVE.TR) + coheof = (62, SVE.PF) + dr_cs = (63, SVE.B) + ohhghf = (64, SVE.PF) + n_cycle_min = (65, SVE.CS) + oh_steel_frac = (66, SVE.PF) + t_crack_vertical = (67, SVE.CS) + inlet_temp_liq = (68, SVE.FWBS) + outlet_temp_liq = (69, SVE.FWBS) + blpressure_liq = (70, SVE.FWBS) + n_liq_recirc = (71, SVE.FWBS) + bz_channel_conduct_liq = (72, SVE.FWBS) + pnuc_fw_ratio_dcll = (73, SVE.FWBS) + f_nuc_pow_bz_struct = (74, SVE.FWBS) + dx_fw_module = (75, SVE.FWBS) + eta_turbine = (76, SVE.HT) + startupratio = (77, SVE.CST) + fkind = (78, SVE.CST) + eta_ecrh_injector_wall_plug = (79, SVE.CD) + fcoolcp = (80, SVE.T) + n_tf_coil_turns = (81, SVE.T) class Scan: @@ -261,20 +253,11 @@ def post_optimise(self, ifail: int): print() else: # Solution found - if self.solver != "fsolve": - process_output.ocmmnt( - constants.NOUT, "and found a feasible set of parameters." - ) - process_output.oheadr( - constants.IOTTY, "PROCESS found a feasible solution" - ) - else: - process_output.ocmmnt( - constants.NOUT, "and found a consistent set of parameters." - ) - process_output.oheadr( - constants.IOTTY, "PROCESS found a consistent solution" - ) + descr = "consistent" if self.solver == "fsolve" else "feasible" + process_output.ocmmnt( + constants.NOUT, f"and found a {descr} set of parameters." + ) + process_output.oheadr(constants.IOTTY, f"PROCESS found a {descr} solution") process_output.oblnkl(constants.NOUT) process_output.ovarin(constants.NOUT, "Error flag", "(ifail)", ifail) @@ -291,28 +274,22 @@ def post_optimise(self, ifail: int): f"High final constraint residues. {self.data.numerics.sqsumsq=}" ) - process_output.ovarin( - constants.NOUT, - "Number of iteration variables", - "(nvar)", - self.data.numerics.nvar, - ) - process_output.ovarin( - constants.NOUT, - "Number of constraints (total)", - "(neqns+nineqns)", - self.data.numerics.neqns + self.data.numerics.nineqns, - ) - process_output.ovarin( - constants.NOUT, - "Optimisation switch", - "(ioptimz)", - self.data.numerics.ioptimz, - ) + for d, var, v in ( + ("Number of iteration variables", "(nvar)", self.data.numerics.nvar), + ( + "Number of constraints (total)", + "(neqns+nineqns)", + self.data.numerics.neqns + self.data.numerics.nineqns, + ), + ("Optimisation switch", "(ioptimz)", self.data.numerics.ioptimz), + ): + process_output.ovarin(constants.NOUT, d, var, v) + process_output.ocmmnt( constants.NOUT, f" {PROCESSRunMode(self.data.numerics.ioptimz).description}", ) + # Objective function output: none for fsolve if self.solver != "fsolve": process_output.ovarin( @@ -322,68 +299,66 @@ def post_optimise(self, ifail: int): self.data.numerics.minmax, ) - objf_name = f'"{FiguresOfMerit(abs(self.data.numerics.minmax)).description}"' - - self.data.numerics.objf_name = objf_name - - process_output.ovarst( - constants.NOUT, - "Objective function name", - "(objf_name)", - self.data.numerics.objf_name, - ) - process_output.ovarre( - constants.NOUT, - "Normalised objective function", - "(norm_objf)", - self.data.numerics.norm_objf, - "OP ", + self.data.numerics.objf_name = ( + f'"{FiguresOfMerit(abs(self.data.numerics.minmax)).description}"' ) - process_output.ovarre( - constants.NOUT, - "Square root of the sum of squares of the constraint residuals", - "(sqsumsq)", - self.data.numerics.sqsumsq, - "OP ", - ) - if self.solver != "fsolve": - process_output.ovarre( - constants.NOUT, - "VMCON convergence parameter", - "(convergence_parameter)", - self.data.globals.convergence_parameter, - "OP ", - ) - process_output.ovarin( - constants.NOUT, - "Number of optimising solver iterations", - "(nviter)", - self.data.numerics.nviter, - "OP ", - ) + for d, var, v, o in ( + ( + "Objective function name", + "(objf_name)", + self.data.numerics.objf_name, + "", + ), + ( + "Normalised objective function", + "(norm_objf)", + self.data.numerics.norm_objf, + "OP ", + ), + ( + "VMCON convergence parameter", + "(convergence_parameter)", + self.data.globals.convergence_parameter, + "OP ", + ), + ( + "Number of optimising solver iterations", + "(nviter)", + self.data.numerics.nviter, + "OP ", + ), + ( + "Square root of the sum of squares of the constraint residuals", + "(sqsumsq)", + self.data.numerics.sqsumsq, + "OP ", + ), + ): + process_output.ovarre(constants.NOUT, d, var, v, o) + process_output.oblnkl(constants.NOUT) if self.solver == "fsolve": - if ifail == 1: - msg = "PROCESS has solved using fsolve." - else: - msg = "PROCESS failed to solve using fsolve." process_output.write( constants.NOUT, - f"{msg}\n", + "PROCESS has solved using fsolve.\n" + if ifail == 1 + else "PROCESS failed to solve using fsolve.\n", ) else: - if ifail == 1: - string1 = "PROCESS has successfully optimised" - else: - string1 = "PROCESS has failed to optimise" - - string2 = "minimise" if self.data.numerics.minmax > 0 else "maximise" - process_output.write( constants.NOUT, - f"{string1} the optimisation parameters to {string2} the objective function: {objf_name}\n", + ( + ( + "PROCESS has successfully optimised" + if ifail == 1 + else "PROCESS has failed to optimise" + ) + + " the optimisation parameters to" + + ("minimise" if self.data.numerics.minmax > 0 else "maximise") + + f" the objective function: {self.data.numerics.objf_name}\n" + ), ) written_warning = False @@ -440,10 +415,10 @@ def post_optimise(self, ifail: int): self.data.numerics.xcs[i], ) - if self.data.numerics.boundu[i] == self.data.numerics.boundl[i]: - xnorm = 1.0 - else: - xnorm = min( + xnorm = ( + 1.0 + if self.data.numerics.boundu[i] == self.data.numerics.boundl[i] + else min( max( ( self.data.numerics.xcm[i] @@ -457,6 +432,7 @@ def post_optimise(self, ifail: int): ), 1.0, ) + ) process_output.ovarre( constants.MFILE, @@ -749,13 +725,13 @@ def scan_1d(self): pstring = ( f"Scan {iscan:02d}: {nsweep_var.name} = {sweep_values[iscan]} " + " " * offsets[iscan] - + "\u001b[32m{}CONVERGED \u001b[0m" + + "\u001b[3{}CONVERGED \u001b[0m" ) if scan_1d_ifail_dict[iscan + 1] == 1: converged_count += 1 - pstring.format("") + pstring.format("2m") else: - pstring.format("UN") + pstring.format("1mUN") print(pstring) converged_percentage = converged_count / self.data.scan.isweep * 100 print(f"\nConvergence Percentage: {converged_percentage:.2f}%") @@ -789,6 +765,21 @@ def scan_2d(self): scan_2d_ifail_list[iscan_1][iscan_2] = ifail iscan += 1 + self.output_2d_summary(scan_2d_ifail_list) + + def scan_2d_init(self): + sv = self.data.scan + for d, n, v in ( + ("Number of first variable scan points", "(isweep)", sv.isweep), + ("Number of second variable scan points", "(isweep_2)", sv.isweep_2), + ("Scanning first variable number", "(nsweep)", sv.nsweep), + ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), + ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), + ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), + ): + process_output.ovarin(constants.MFILE, d, n, v) + + def output_2d_summary(self, scan_2d_ifail_list): print( " ****************************************** Scan Convergence Summary ****************************************** \n" ) @@ -823,37 +814,26 @@ def scan_2d(self): f"Scan {scan_point:02d}: ({nsweep_var.name} = {sweep_1_values[iscan_1 - 1]}," f" {nsweep_2_var.name} = {sweep_2_values[iscan_2 - 1]}) " + " " * offsets[iscan_1 - 1][iscan_2 - 1] - + "\u001b[32m{}CONVERGED \u001b[0m" + + "\u001b[3{}CONVERGED \u001b[0m" ) if scan_2d_ifail_list[iscan_1][iscan_2] == 1: converged_count += 1 - print(string.format()) + print(string.format("2m")) else: - print(string.format("UN")) + print(string.format("1mUN")) scan_point += 1 converged_percentage = ( converged_count / (self.data.scan.isweep * self.data.scan.isweep_2) * 100 ) print(f"\nConvergence Percentage: {converged_percentage:.2f}%") - def scan_2d_init(self): - sv = self.data.scan - for d, n, v in ( - ("Number of first variable scan points", "(isweep)", sv.isweep), - ("Number of second variable scan points", "(isweep_2)", sv.isweep_2), - ("Scanning first variable number", "(nsweep)", sv.nsweep), - ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), - ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), - ("Scanning second variable number", "(nsweep_2)", sv.nsweep_2), - ): - process_output.ovarin(constants.MFILE, d, n, v) - def _set_v_x_label(self, iscan, twod=False): - if twod: - sv = self.scan_select(self.data.scan.nsweep_2, self.data.scan.sweep_2, iscan) - else: - sv = self.scan_select(self.data.scan.nsweep, self.data.scan.sweep, iscan) - self.data.globals.vlabel.vlabel = sv.name + sv = ( + self.scan_select(self.data.scan.nsweep_2, self.data.scan.sweep_2, iscan) + if twod + else self.scan_select(self.data.scan.nsweep, self.data.scan.sweep, iscan) + ) + self.data.globals.vlabel = sv.name self.data.globals.xlabel = sv.data.description def scan_1d_write_point_header(self, iscan: int): @@ -891,10 +871,10 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): process_output.write( constants.NOUT, - f"***** 2D Scan point {iscan} of {self.data.scan.isweep * self.data.scan.isweep_2} : " + f"2D Scan point {iscan} of {self.data.scan.isweep * self.data.scan.isweep_2} : " f"{self.data.globals.vlabel} = {self.data.scan.sweep[iscan_1 - 1]} and" f" {self.data.globals.vlabel_2} = {self.data.scan.sweep_2[iscan_r - 1]} " - "*****", + "", ) process_output.ovarin(constants.MFILE, "Scan point number", "(iscan)", iscan) From f56ac4bcec4dbb9b6c187af62b9ab898ec5beb61 Mon Sep 17 00:00:00 2001 From: james <81617086+je-cook@users.noreply.github.com> Date: Sat, 13 Jun 2026 10:14:07 +0100 Subject: [PATCH 3/6] solver_handler cleanup --- process/core/solver/solver_handler.py | 52 +++++++++++++-------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/process/core/solver/solver_handler.py b/process/core/solver/solver_handler.py index b457e25ca9..d60f6573c3 100644 --- a/process/core/solver/solver_handler.py +++ b/process/core/solver/solver_handler.py @@ -1,3 +1,5 @@ +from contextlib import contextmanager + from process.core.solver.evaluators import Evaluators from process.core.solver.iteration_variables import ( load_iteration_variables, @@ -36,14 +38,9 @@ def run(self): # Initialise iteration variables and bounds in Python: relies on Fortran # iteration variables being defined above # Trim maximum size arrays down to actually used size - n = self.data.numerics.nvar - x = self.data.numerics.xcm[:n] - bndl = self.data.numerics.itv_scaled_lower_bounds[:n] - bndu = self.data.numerics.itv_scaled_upper_bounds[:n] - - # Define total number of constraints and equality constraints - m = self.data.numerics.neqns + self.data.numerics.nineqns - meq = self.data.numerics.neqns + x = self.data.numerics.xcm[: self.data.numerics.nvar] + bndl = self.data.numerics.itv_scaled_lower_bounds[: self.data.numerics.nvar] + bndu = self.data.numerics.itv_scaled_upper_bounds[: self.data.numerics.nvar] # Evaluators() calculates the objective and constraint functions and # their gradients for a given vector x @@ -54,30 +51,17 @@ def run(self): self.solver.set_evaluators(evaluators) self.solver.set_bounds(bndl, bndu) self.solver.set_opt_params(x) - self.solver.set_constraints(m, meq) + # Define total number of constraints and equality constraints + self.solver.set_constraints( + m=self.data.numerics.neqns + self.data.numerics.nineqns, meq=self.data.numerics.neqns + ) ifail = self.solver.solve() # If VMCON optimisation has failed then try altering value of epsfcn if self.solver_name == "vmcon": if ifail != 1: - print("Trying again with new epsfcn") - # epsfcn is only used in evaluators.Evaluators() - # TODO epsfcn could be set in Evaluators instance now, don't need to - # set/unset in self.data.numerics module - self.data.numerics.epsfcn *= 10 # try new larger value - print("new epsfcn = ", self.data.numerics.epsfcn) - - ifail = self.solver.solve() - # First solution attempt failed (ifail != 1): supply ifail value - # to next attempt - self.data.numerics.epsfcn /= 10 # reset value - - if ifail != 1: - print("Trying again with new epsfcn") - self.data.numerics.epsfcn /= 10 # try new smaller value - print("new epsfcn = ", self.data.numerics.epsfcn) - ifail = self.solver.solve() - self.data.numerics.epsfcn *= 10 # reset value + with epsfcn_context(self.data.numerics): + self.solver.solve() # If VMCON has exited with error code 5 try another run using a multiple # of the identity matrix as input for the Hessian b(n,n) @@ -104,3 +88,17 @@ def output(self): # than required, size self.data.numerics.xcm[: self.solver.x.shape[0]] = self.solver.x self.data.numerics.rcm[: self.solver.conf.shape[0]] = self.solver.conf + + +@contextmanager +def epsfcn_context(numerics): + print("Trying again with new epsfcn") + # epsfcn is only used in evaluators.Evaluators() + # TODO epsfcn could be set in Evaluators instance now, don't need to + # set/unset in numerics module + numerics.epsfcn *= 10 # try new larger value + print("new epsfcn = ", numerics.epsfcn) + try: + yield + finally: + numerics.epsfcn /= 10 # reset value From 824d51cc0185ace1fe073ca9006354f9ddf1f7f5 Mon Sep 17 00:00:00 2001 From: james <81617086+je-cook@users.noreply.github.com> Date: Sat, 13 Jun 2026 10:14:53 +0100 Subject: [PATCH 4/6] OH MY GOD THE DUPLICATION IT BURNS --- process/core/io/plot/cli.py | 4 +- process/core/io/plot/scans.py | 1374 ++++++++++++--------------------- process/core/scan.py | 6 +- 3 files changed, 518 insertions(+), 866 deletions(-) diff --git a/process/core/io/plot/cli.py b/process/core/io/plot/cli.py index 9820a6b967..6251f5c08e 100644 --- a/process/core/io/plot/cli.py +++ b/process/core/io/plot/cli.py @@ -238,10 +238,10 @@ def plot_scans_cli( list(map(float, filter(None, x_axis_max))), list(map(float, filter(None, x_axis_range))), y_axis_percent, - y_axis_percent2, list(map(float, filter(None, y_axis_max))), - list(map(float, filter(None, y_axis2_max))), list(map(float, filter(None, y_axis_range))), + y_axis_percent2, + list(map(float, filter(None, y_axis2_max))), list(map(float, filter(None, y_axis_range2))), label_name, twod_contour, diff --git a/process/core/io/plot/scans.py b/process/core/io/plot/scans.py index 09d7ea0d62..2bae68f1a1 100644 --- a/process/core/io/plot/scans.py +++ b/process/core/io/plot/scans.py @@ -22,17 +22,127 @@ - If the file is a folder, the contained MFILE is used as an input. """ +from __future__ import annotations + import math -import sys from collections.abc import Iterable, Sequence +from dataclasses import dataclass +from enum import Enum, auto from pathlib import Path +from typing import TYPE_CHECKING import matplotlib.pyplot as plt -import matplotlib.ticker as mtick import numpy as np +from matplotlib.ticker import MultipleLocator, PercentFormatter from process.core.io.mfile import MFile +from process.core.io.mfile.cli import mfile from process.core.io.variable_metadata import var_dicts as meta +from process.core.scan import ScanVariables + +if TYPE_CHECKING: + from matproplib.axes import Axes + + +@dataclass +class AxisData: + name: str + percent: bool + max_: Sequence[float] + range_: Sequence[float] + tick_size: float + font_size: float + legend_size: float = 12 + + +class AxisChoice(Enum): + X = auto() + Y = auto() + + def axis(self, ax): + return getattr(ax, f"{self.name.lower()}axis") + + def set_lim(self, ax, lower, upper): + getattr(ax, f"set_{self.name.lower()}lim")(lower, upper) + + +def get_list_padded(inp, names): + target_len = len(names) + inp_array = np.array(inp, dtype=float) + if (i_len := len(inp_array)) < target_len: + if i_len == 0: + return [None] * target_len + + return np.concatenate(( + inp_array, + np.full(target_len - i_len, inp_array[-1], dtype=float), + )) + return inp_array[:target_len] + + +def value_checks(scan_var, scan_2_var, m_file, is_2D_scan, input_files): + ve_string = ( + "`{}` does not exist in PROCESS dicts\n" + " The scan variable is probably an upper/lower boundary\n" + " Please modify 'nsweep_dict' dict with the constrained var" + ) + # Check if the scan variable is present + if scan_var.name not in m_file.data: + raise ValueError(ve_string.format(scan_var.name)) + + # Check if the second scan variable is present + if is_2D_scan and (scan_2_var.name not in m_file.data): + raise ValueError(ve_string.format(scan_2_var.name)) + + if is_2D_scan and len(input_files) > 1: + raise ValueError("Only one input file can be used for 2D scans") + + +def array_check(output_name: str, m_file: MFile) -> bool: + # Check if the output variable exists in the MFILE + if output_name not in m_file.data: + print( + f"Warning : `{output_name}` does not exist in PROCESS dicts\n" + f"Warning : `{output_name}` will not be output" + ) + return False + return True + + +def create_o_array( + n_scan: int, m_file: MFile, output_name: str, conv_i: list[int] +) -> np.ndarray: + return np.array([m_file.get(output_name, scan=conv_i[ii]) for ii in range(n_scan)]) + + +def get_label(name: str) -> str: + return meta[name].latex if name in meta else f"{name}" + + +def axis_manipulation(ax: Axes, axis: AxisData, index: int, contour: np.ndarray): + + an = AxisChoice[axis.name.upper()] + + if len(axis.range_) > 0: + divisions = (axis.range_[1] - axis.range_[0]) / 10 + if axis.percent: + if axis.max_[index] is None: + axis.max_[index] = max(np.abs(contour)) + ticks = PercentFormatter(axis.max_[index]) + if len(axis.range_) > 0: + scale = axis.max_[index] / 100 + divisions = 5 * math.ceil(divisions / 5) * scale + range_ = (axis.range_[0] * scale, axis.range_[1] * scale) + an.axis(ax).set_major_formatter(ticks) + + if len(axis.range_) > 0: + if axis.percent is False: + range_ = axis.range_ + an.set_lim(ax, range_[0], range_[1]) + an.axis(ax).set_major_locator(MultipleLocator(divisions)) + + ax.figure.tight_layout() + ax.tick_params(axis=an.name.lower(), labelsize=axis.tick_size) def plot_scan( @@ -48,10 +158,10 @@ def plot_scan( x_axis_max: Sequence[float] = (), x_axis_range: Sequence[float] = (), y_axis_percent: bool = False, - y_axis_percent2: bool = False, y_axis_max: Sequence[float] = (), - y_axis2_max: Sequence[float] = (), y_axis_range: Sequence[float] = (), + y_axis_percent2: bool = False, + y_axis2_max: Sequence[float] = (), y_axis_range2: Sequence[float] = (), label_name: Sequence[str] = (), twod_contour: bool = False, @@ -60,904 +170,444 @@ def plot_scan( """Main plot scans script.""" outputdir = outputdir or Path.cwd() input_files = mfiles if isinstance(mfiles, Iterable) else [mfiles] - x_max_input = x_axis_max - - y_max_input = y_axis_max - y_max2_input = y_axis2_max + is_2D_scan = False # If the input file is a directory, add MFILE.DAT for ii, if_ in enumerate(input_files): if if_.is_dir(): input_files[ii] = if_ / "MFILE.DAT" - # nsweep varible dict - # ------------------- - # TODO WOULD BE GREAT TO HAVE IT AUTOMATICALLY GENERATED ON THE PROCESS CMAKE! - # THE SAME WAY THE DICTS ARE - # This needs to be kept in sync automatically; this will break frequently - # otherwise - # Rem : Some variables are not in the MFILE, making the defintion rather tricky... - nsweep_dict = { - 1: "aspect", - 2: "pflux_div_heat_load_max_mw", - 3: "p_plant_electric_net_mw", - 4: "hfact", - 5: "j_tf_coil_full_area", - 6: "pflux_fw_neutron_max_mw", - 7: "beamfus0", - 8: "Obsolete", # OBSOLETE - 9: "temp_plasma_electron_vol_avg_kev", - 10: "boundu(15)", - 11: "beta_norm_max", - 12: "f_c_plasma_bootstrap_max", - 13: "boundu(10)", - 14: "fiooic", - 16: "rmajor", - 17: "b_tf_inboard_peak_symmetric", # b_tf_inboard_max the maximum T field upper limit is the scan variable - 18: "eta_cd_norm_hcd_primary_max", - 19: "boundl(16)", - 20: "cnstv.t_burn_min", - 21: "", - 22: "f_t_plant_available", - 23: "boundu(72)", - 24: "p_fusion_total_max_mw", - 25: "kappa", - 26: "triang", - 27: "tbrmin", - 28: "b_plasma_toroidal_on_axis", - 29: "radius_plasma_core_norm", - 30: "", # OBSOLETE - 31: "f_alpha_energy_confinement_min", - 32: "epsvmc", - 33: "ttarget", - 34: "qtargettotal", - 35: "lambda_q_omp", - 36: "lambda_target", - 37: "lcon_factor", - 38: "boundu(129)", - 39: "boundu(131)", - 40: "boundu(135)", - 41: "dr_blkt_outboard", - 42: "f_nd_impurity_electrons(9)", - 43: "Obsolete", # OBSOLETE - 44: "alstrtf", - 45: "temp_tf_superconductor_margin_min", - 46: "boundu(152)", - 47: "impurity_enrichment(9)", - 48: "n_tf_wp_pancakes", - 49: "n_tf_wp_layers", - 50: "f_nd_impurity_electrons(13)", - 51: "f_p_div_lower", - 52: "rad_fraction_sol", - 53: "Obsolete", # OBSOLETE - 54: "b_crit_upper_nbti", - 55: "dr_shld_inboard", - 56: "p_cryo_plant_electric_max_mw", - 57: "b_plasma_toroidal_on_axis", # Genuinly b_plasma_toroidal_on_axis lower bound - 58: "dr_fw_plasma_gap_inboard", - 59: "dr_fw_plasma_gap_outboard", - 60: "sig_tf_wp_max", - 61: "copperaoh_m2_max", - 62: "j_cs_flat_top_end", - 63: "dr_cs", - 64: "f_z_cs_tf_internal", - 65: "n_cycle_min", - 66: "f_a_cs_turn_steel", - 67: "t_crack_vertical", - 68: "inlet_temp_liq", - 69: "outlet_temp_liq", - 70: "blpressure_liq", - 71: "n_liq_recirc", - 72: "bz_channel_conduct_liq", - 73: "pnuc_fw_ratio_dcll", - 74: "f_nuc_pow_bz_struct", - 75: "dx_fw_module", - 76: "eta_turbine", - 77: "startupratio", - 78: "fkind", - 79: "eta_ecrh_injector_wall_plug", - 80: "fcoolcp", - 81: "n_tf_coil_turns", - } - # ------------------- - # Getting the scanned variable name m_file = MFile(filename=input_files[-1]) - nsweep_ref = int(m_file.data["nsweep"].get_scan(-1)) - scan_var_name = nsweep_dict[nsweep_ref] + nsweep_ref = int(m_file.get("nsweep", scan=-1)) + scan_var = ScanVariables(nsweep_ref) + # Get the eventual second scan variable - nsweep_2_ref = 0 - is_2D_scan = False - scan_2_var_name = "" if "nsweep_2" in m_file.data: is_2D_scan = True - nsweep_2_ref = int(m_file.data["nsweep_2"].get_scan(-1)) - scan_2_var_name = nsweep_dict[nsweep_2_ref] + scan_2_var = ScanVariables(int(m_file.get("nsweep_2", scan=-1))) - # Checks - # ------ - # Check if the nsweep dict has been updated - if nsweep_ref > len(nsweep_dict) + 1: - print( - f"ERROR : nsweep = {nsweep_ref} not supported by the utility\n" - "ERROR : Please update the 'nsweep_dict' dict" - ) - sys.exit() + value_checks(scan_var, scan_2_var, m_file, is_2D_scan, input_files) - # Check if the scan variable is present in the - if scan_var_name not in m_file.data: - print( - f"ERROR : `{scan_var_name}` does not exist in PROCESS dicts\n" - "ERROR : The scan variable is probably an upper/lower boundary\n" - "ERROR : Please modify 'nsweep_dict' dict with the constrained var" - ) - sys.exit() + x_max = get_list_padded(x_axis_max, output_names) + x_axis = AxisData( + "x", x_axis_percent, x_max, x_axis_range, axis_tick_size, axis_font_size + ) - # Check if the second scan variable is present in the - if is_2D_scan and (scan_2_var_name not in m_file.data): - print( - f"ERROR : `{scan_2_var_name}` does not exist in PROCESS dicts\n" - "ERROR : The scan variable is probably an upper/lower boundary\n" - "ERROR : Please modify 'nsweep_dict' dict with the constrained var" + y_max = get_list_padded(y_axis_max, output_names) + y_axis = AxisData( + "y", y_axis_percent, y_max, y_axis_range, axis_tick_size, axis_font_size + ) + if is_2D_scan: + twod_scan( + input_files, + scan_var, + scan_2_var, + output_names, + outputdir, + save_format, + x_axis, + y_axis, + twod_contour=twod_contour, + ) + else: + y_axis2 = AxisData( + "y", + y_axis_percent2, + ( + get_list_padded(y_axis2_max, output_names) + if len(output_names2) > 0 + else y_axis2_max + ), + y_axis_range2, + axis_tick_size, + axis_font_size, ) - sys.exit() - # Only one imput must be used for a 2D scan - if is_2D_scan and len(input_files) > 1: - print("ERROR : Only one input file can be used for 2D scans\nERROR : Exiting") - sys.exit() - # ------ - - # Plot settings - # ------------- - # Plot cosmetic settings - def _format_lists(inp, output_names): - x_max = [] - if len(inp) > 0: - for i in range(len(output_names)): - j = 0 - try: - x_max += [float(inp[i])] - j += 1 - except IndexError: - x_max += [float(inp[j])] - else: - x_max = [None] * len(output_names) + scan_var_array, output_arrays, output_arrays2 = oned_scan( + input_files, + nsweep_ref, + scan_var, + output_names, + output_names2, + term_output=term_output, + ) + plot_1d_scan( + input_files, + m_file, + output_names, + output_names2, + scan_var, + outputdir, + save_format, + label_name, + scan_var_array, + output_arrays, + output_arrays2, + x_axis, + y_axis, + y_axis2, + stack_plots=stack_plots, + ) - return x_max - legend_size = 12 - x_max = ( - _format_lists(x_max_input, output_names) - if len(x_max_input) != len(output_names) - else np.float64(x_max_input) - ) - y_max = ( - _format_lists(y_max_input, output_names) - if len(y_max_input) != len(output_names) - else np.float64(y_max_input) - ) - if len(output_names2) > 0: - y_max2 = ( - _format_lists(y_max2_input, output_names) - if len(y_max2_input) != len(output_names) - else np.float64(y_max2_input) - ) - else: - y_max2 = y_max2_input - # ------------- - - # Case of a set of 1D scans - # ---------------------------------------------------------------------------------------------- - if not is_2D_scan: - # Loop over the MFILEs - output_arrays = {} - output_arrays2 = {} - scan_var_array = {} - for input_file in input_files: - # Opening the MFILE.DAT - m_file = MFile(filename=input_file) - - # Check if the the scan variable is the same for all inputs - # --- - # Same scan var - nsweep = int(m_file.data["nsweep"].get_scan(-1)) - if nsweep != nsweep_ref: +def oned_scan( + input_files: Sequence[Path], + nsweep_ref: int, + scan_var: ScanVariables, + output_names: Sequence[str], + output_names2: Sequence[str], + *, + term_output: bool, +) -> tuple[np.ndarray, ...]: + # input file, output_name, scan + output_arrays = {} + # input file, output_name2, scan + output_arrays2 = {} + # input_file, scan + scan_var_array = {} + for input_file in input_files: + # Opening the MFILE.DAT + m_file = MFile(filename=input_file) + n_scan = int(m_file.get("isweep", scan=-1)) + + # Check if the the scan variable is the same for all inputs + # Same scan var + nsweep = int(m_file.get("nsweep", scan=-1)) + if nsweep != nsweep_ref: + raise ValueError("You must use inputs files with the same scan variables\n") + + if "nsweep_2" in m_file.data: + raise ValueError("You cannot mix 1D with 2D scans\nERROR : Exiting") + + # Only selecting the scans that has converged + # Converged indexes + conv_i = [] + for ii in range(n_scan): + ifail = m_file.get("ifail", scan=ii + 1) + if ifail == 1: + conv_i.append(ii + 1) + else: + failed_value = scan_var.get_val(m_file, scan=ii + 1) print( - "ERROR : You must use inputs files with the same scan variables\n" - "ERROR : Exiting" + f"Warning : Non-convergent scan point : {scan_var.name} = {failed_value}\n" + "Warning : This point will not be shown." ) - sys.exit() - - # No D scans - if "nsweep_2" in m_file.data: - print("ERROR : You cannot mix 1D with 2D scans\nERROR : Exiting") - sys.exit() - # --- - - # Only selecting the scans that has converged - # --- - # Number of scan points - n_scan = int(m_file.data["isweep"].get_scan(-1)) - - # Converged indexes - conv_i = [] - for ii in range(n_scan): - ifail = m_file.data["ifail"].get_scan(ii + 1) - if ifail == 1: - conv_i.append(ii + 1) - else: - failed_value = m_file.data[scan_var_name].get_scan(ii + 1) - print( - f"Warning : Non-convergent scan point : {scan_var_name} = {failed_value}\n" - "Warning : This point will not be shown." - ) - - # Updating the number of scans - n_scan = len(conv_i) - # --- - # Scanned variable - scan_var_array[input_file] = np.zeros(n_scan) - for ii in range(n_scan): - scan_var_array[input_file][ii] = m_file.data[scan_var_name].get_scan( - conv_i[ii] + # Updating the number of scans + n_scan = len(conv_i) + scan_var_array[input_file] = np.array([ + scan_var.get_val(mfile, scan=conv_i[ii]) for ii in range(n_scan) + ]) + output_arrays[input_file] = { + output_name: create_o_array(n_scan, m_file, output_name, conv_i) + for output_name in output_names + if array_check(output_name, m_file) + } + output_arrays2[input_file] = { + output_name2: create_o_array(n_scan, m_file, output_name2, conv_i) + for output_name2 in output_names2 + if array_check(output_name2, m_file) + } + # Terminal output + if term_output: + print( + f"\n{input_file} scan output\n\nX-axis:\n" + f"scan var {scan_var.name} : {scan_var_array[input_file]}\n\nY-axis:" + + "\n".join( + f"{output_name} : {output_arrays[input_file][output_name]}" + for output_name in output_names + if output_name in m_file.data ) - # output list declaration - output_arrays[input_file] = {} - output_arrays2[input_file] = {} - # First variable scan - for output_name in output_names: - ouput_array = np.zeros(n_scan) - - # Check if the output variable exists in the MFILE - if output_name not in m_file.data: - print( - f"Warning : `{output_name}` does not exist in PROCESS dicts\n" - f"Warning : `{output_name}` will not be output" - ) - continue - - for ii in range(n_scan): - ouput_array[ii] = m_file.data[output_name].get_scan(conv_i[ii]) - output_arrays[input_file][output_name] = ouput_array - # Second variable scan - for output_name2 in output_names2: - ouput_array2 = np.zeros(n_scan) - - # Check if the output variable exists in the MFILE - if output_name2 not in m_file.data: - print( - f"Warning : `{output_name2}` does not exist in PROCESS dicts\n" - f"Warning : `{output_name2}` will not be output" - ) - continue - - for ii in range(n_scan): - ouput_array2[ii] = m_file.data[output_name2].get_scan(conv_i[ii]) - output_arrays2[input_file][output_name2] = ouput_array2 - # Terminal output - if term_output: + + "\n" + ) + if len(output_names2) > 0: + last_name = output_names2[-1] print( - f"\n{input_file} scan output\n\nX-axis:\n" - f"scan var {scan_var_name} : {scan_var_array[input_file]}\n\nY-axis:" + f"Y2-Axis\n {last_name} : {output_arrays2[input_file][last_name]}\n" ) - for output_name in output_names: - # Check if the output variable exists in the MFILE - if output_name not in m_file.data: - continue - - print(f"{output_name} : {output_arrays[input_file][output_name]}") - print() - if len(output_names2) > 0: - print( - f"Y2-Axis\n {output_name2} : {output_arrays2[input_file][output_name2]}\n" - ) - # Plot section - # ----------- - for index, output_name in enumerate(output_names): - if not stack_plots: - fig, ax = plt.subplots() - if len(output_names2) > 0: - ax2 = ax.twinx() - # reset counter for label_name - kk = 0 - - # Check if the output variable exists in the MFILE - if output_name not in m_file.data: - continue - - # Loop over inputs - for input_file in input_files: - # Legend label formating - if len(label_name) == 0: - labl = input_file.name - else: - labl = label_name[kk] - kk += 1 - - # Plot the graph - if len(output_names2) > 0 and not stack_plots: - ax.plot( - scan_var_array[input_file], - output_arrays[input_file][output_name], - "--o", - color="blue" if len(input_files) == 1 else None, - label=labl, - ) - if len(y_axis_range) > 0: - y_divisions = (y_axis_range[1] - y_axis_range[0]) / 10 - if y_axis_percent: - if y_max[index] is None: - y_max[index] = max( - np.abs(output_arrays[input_file][output_name]) - ) - yticks = mtick.PercentFormatter(y_max[index]) - if len(y_axis_range) > 0: - y_divisions = ( - 5 * math.ceil(y_divisions / 5) * y_max[index] / 100 - ) - y_range = ( - y_axis_range[0] * y_max[index] / 100, - y_axis_range[1] * y_max[index] / 100, - ) - ax.yaxis.set_major_formatter(yticks) - if len(y_axis_range) > 0: - if y_axis_percent is False: - y_range = y_axis_range - ax.set_ylim(y_range[0], y_range[1]) - ax.yaxis.set_major_locator(mtick.MultipleLocator(y_divisions)) - if len(x_axis_range) > 0: - x_divisions = (x_axis_range[1] - x_axis_range[0]) / 10 - if x_axis_percent: - if x_max[index] is None: - x_max[index] = max(np.abs(scan_var_array[input_file])) - xticks = mtick.PercentFormatter(x_max[index]) - ax.xaxis.set_major_formatter(xticks) - if len(x_axis_range) > 0: - x_divisions = ( - 5 * math.ceil(x_divisions / 5) * x_max[index] / 100 - ) - x_range = ( - x_axis_range[0] * x_max[index] / 100, - x_axis_range[1] * x_max[index] / 100, - ) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - if len(x_axis_range) > 0: - if x_axis_percent is False: - x_range = x_axis_range - plt.xlim(x_range[0], x_range[1]) - ax.xaxis.set_major_locator(mtick.MultipleLocator(x_divisions)) - plt.tight_layout() - elif stack_plots: - # check stack plots will work - if len(output_names) <= 1: - raise ValueError( - "stack_plots requires at least two output variables" - ) - # Create subplots only once for the first output - if index == 0: - fig, axs = plt.subplots( - len(output_names), - 1, - figsize=(8.0, (3.5 + (1 * len(output_names)))), - sharex=True, - ) - fig.subplots_adjust(hspace=0.0) - - axs[index].plot( - scan_var_array[input_file], - output_arrays[input_file][output_name], - "--o", - color="blue" if len(output_names2) > 0 else None, - label=labl, - ) - if len(y_axis_range) > 0: - y_divisions = (y_axis_range[1] - y_axis_range[0]) / 10 - if y_axis_percent: - if y_max[index] is None: - y_max[index] = max( - np.abs(output_arrays[input_file][output_name]) - ) - yticks = mtick.PercentFormatter(y_max[index]) - if len(y_axis_range) > 0: - y_divisions = ( - 5 * math.ceil(y_divisions / 5) * y_max[index] / 100 - ) - y_range = ( - y_axis_range[0] * y_max[index] / 100, - y_axis_range[1] * y_max[index] / 100, - ) - axs[output_names.index(output_name)].yaxis.set_major_formatter( - yticks - ) - if len(y_axis_range) > 0: - if y_axis_percent is False: - y_range = y_axis_range - axs[output_names.index(output_name)].set_ylim( - y_range[0], y_range[1] - ) - axs[output_names.index(output_name)].yaxis.set_major_locator( - mtick.MultipleLocator(y_divisions) - ) - if len(x_axis_range) > 0: - x_divisions = (x_axis_range[1] - x_axis_range[0]) / 10 - if x_axis_percent: - if x_max[index] is None: - x_max[index] = max(np.abs(scan_var_array[input_file])) - xticks = mtick.PercentFormatter(x_max[index]) - if len(x_axis_range) > 0: - x_divisions = ( - 5 * math.ceil(x_divisions / 5) * x_max[index] / 100 - ) - x_range = ( - x_axis_range[0] * x_max[index] / 100, - x_axis_range[1] * x_max[index] / 100, - ) - axs[output_names.index(output_name)].xaxis.set_major_formatter( - xticks - ) - if len(x_axis_range) > 0: - if x_axis_percent is False: - x_range = x_axis_range - plt.xlim(x_range[0], x_range[1]) - axs[output_names.index(output_name)].xaxis.set_major_locator( - mtick.MultipleLocator(x_divisions) - ) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - plt.tight_layout() - else: - ax.plot( - scan_var_array[input_file], - output_arrays[input_file][output_name], - "--o", - color="blue" if len(output_names2) > 0 else None, - label=labl, - ) - if len(y_axis_range) > 0: - y_divisions = (y_axis_range[1] - y_axis_range[0]) / 10 - if y_axis_percent: - if y_max[index] is None: - y_max[index] = max( - np.abs(output_arrays[input_file][output_name]) - ) - yticks = mtick.PercentFormatter(y_max[index]) - if len(y_axis_range) > 0: - y_divisions = ( - 5 * math.ceil(y_divisions / 5) * y_max[index] / 100 - ) - y_range = ( - y_axis_range[0] * y_max[index] / 100, - y_axis_range[1] * y_max[index] / 100, - ) - ax.yaxis.set_major_formatter(yticks) - if len(y_axis_range) > 0: - if y_axis_percent is False: - y_range = y_axis_range - ax.set_ylim(y_range[0], y_range[1]) - ax.yaxis.set_major_locator(mtick.MultipleLocator(y_divisions)) - if len(x_axis_range) > 0: - x_divisions = (x_axis_range[1] - x_axis_range[0]) / 10 - if x_axis_percent: - if x_max[index] is None: - x_max[index] = max(np.abs(scan_var_array[input_file])) - xticks = mtick.PercentFormatter(x_max[index]) - if len(x_axis_range) > 0: - x_divisions = ( - 5 * math.ceil(x_divisions / 5) * x_max[index] / 100 - ) - x_range = ( - x_axis_range[0] * x_max[index] / 100, - x_axis_range[1] * x_max[index] / 100, - ) - ax.xaxis.set_major_formatter(xticks) - if len(x_axis_range) > 0: - if x_axis_percent is False: - x_range = x_axis_range - plt.xlim(x_range[0], x_range[1]) - ax.xaxis.set_major_locator(mtick.MultipleLocator(x_divisions)) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - plt.tight_layout() - if len(output_names2) > 0: + return scan_var_array, output_arrays, output_arrays2 + + +def plot_1d_scan( + input_files: Sequence[Path], + m_file: MFile, + output_names: Sequence[str], + output_names2: Sequence[str], + scan_var: ScanVariables, + outputdir: Path, + save_format: str, + label_name: Sequence[str], + scan_var_array: np.ndarray, + output_arrays: np.ndarray, + output_arrays2: np.ndarray, + x_axis: AxisData, + y_axis: AxisData, + y_axis2: AxisData, + *, + stack_plots: bool, +): + + if stack_plots: + # check stack plots will work + if len(output_names) <= 1: + raise ValueError("stack_plots requires at least two output variables") + # Create subplots only once for the first output + fig, axs = plt.subplots( + len(output_names), + 1, + figsize=(8.0, (3.5 + (1 * len(output_names)))), + sharex=True, + ) + fig.subplots_adjust(hspace=0.0) + + colour = ( # be careful changing this + ("blue" if len(output_names2) > 0 else None) + if len(output_names2) <= 0 or stack_plots + else ("blue" if len(input_files) == 1 else None) + ) + + for index, output_name in enumerate(output_names): + # reset counter for label_name + kk = 0 + + if output_name not in m_file.data: + continue + + if stack_plots: + ax_ = axs[index] + ax = axs[output_names.index(output_name)] + else: + fig, ax = plt.subplots() + if len(output_names2) > 0: + ax2 = ax.twinx() + ax_ = ax + + for input_file in input_files: + if len(label_name) == 0: + labl = input_file.name + else: + labl = label_name[kk] + kk += 1 + + ax_.plot( + scan_var_array[input_file], + output_arrays[input_file][output_name], + "--o", + color=colour, + label=labl, + ) + + axis_manipulation( + ax, axis=x_axis, index=index, contour=scan_var_array[input_file] + ) + axis_manipulation( + ax, + axis=y_axis, + index=index, + contour=output_arrays[input_file][output_name], + ) + + if len(output_names2) > 0: + for output_name2 in output_names2: + yval = output_arrays2[input_file][output_name2] + colour = "red" if len(input_files) == 1 else None ax2.plot( scan_var_array[input_file], - output_arrays2[input_file][output_name2], + yval, "--o", - color="red" if len(input_files) == 1 else None, + color=colour, label=labl, ) ax2.set_ylabel( - ( - meta[output_name2].latex - if output_name2 in meta - else f"{output_name2}" - ), - fontsize=axis_font_size, - color="red" if len(input_files) == 1 else "black", + get_label(output_name2), + fontsize=y_axis.font_size, + color=colour or "black", ) - if len(y_axis_range2) > 0: - y_divisions2 = (y_axis_range2[1] - y_axis_range2[0]) / 10 - if y_axis_percent2: - if y_max2[index] is None: - y_max2[index] = max( - np.abs(output_arrays2[input_file][output_name2]) - ) - yticks2 = mtick.PercentFormatter(y_max2[index]) - if len(y_axis_range2) > 0: - y_divisions2 = ( - 5 * math.ceil(y_divisions2 / 5) * y_max2[index] / 100 - ) - y_range2 = ( - y_axis_range2[0] * y_max2[index] / 100, - y_axis_range2[1] * y_max2[index] / 100, - ) - ax2.yaxis.set_major_formatter(yticks2) - if len(y_axis_range2) > 0: - if y_axis_percent2 is False: - y_range2 = y_axis_range2 - ax2.set_ylim(y_range2[0], y_range2[1]) - ax2.yaxis.set_major_locator(mtick.MultipleLocator(y_divisions2)) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - plt.tight_layout() - if len(output_names2) > 0: - ax2.yaxis.grid(True) - ax.xaxis.grid(True) - ax.set_ylabel( - ( - meta[output_name].latex - if output_name in meta - else f"{output_name}" - ), - fontsize=axis_font_size, - color="blue" if len(input_files) == 1 else "black", - ) - ax.set_xlabel( - ( - meta[scan_var_name].latex - if scan_var_name in meta - else f"{scan_var_name}" - ), - fontsize=axis_font_size, - ) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - if len(input_files) != 1: - plt.legend(loc="best", fontsize=legend_size) - plt.tight_layout() - elif stack_plots: - axs[output_names.index(output_name)].minorticks_on() - axs[output_names.index(output_name)].grid(True) - axs[output_names.index(output_name)].set_ylabel( - ( - meta[output_name].latex - if output_name in meta - else f"{output_name}" - ), + axis_manipulation(ax2, axis=y_axis2, index=index, contour=yval) + if len(output_names2) > 0: + ax2.yaxis.grid(True) + ax.xaxis.grid(True) + ax.set_ylabel( + get_label(output_name), + fontsize=y_axis.font_size, + color="blue" if len(input_files) == 1 else "black", + ) + ax.set_xlabel( + get_label(scan_var.name), + fontsize=x_axis.font_size, + ) + if len(input_files) != 1: + fig.legend(loc="best", fontsize=x_axis.legend_size) + elif stack_plots: + ax.minorticks_on() + ax.grid(True) + ax.set_ylabel(get_label(output_name)) + ax.set_xlabel(get_label(scan_var.name), fontsize=x_axis.font_size) + + ymin, ymax = ax.get_ylim() + if ymin < 0 and ymax > 0: + mod_min = ymin * 1.1 + mod_max = ymax * 1.1 + elif ymin >= 0: + mod_min = ymin * 0.9 + mod_max = ymax * 1.1 + else: + mod_min = ymin * 1.1 + mod_max = ymax * 0.9 + ax.set_ylim(mod_min, mod_max) + + if len(input_files) > 1: + fig.legend( + loc="lower center", + fontsize=x_axis.legend_size, + bbox_to_anchor=(0.5, -0.5 - (0.1 * len(output_names))), + fancybox=True, + shadow=False, + ncol=len(input_files), + columnspacing=0.8, ) - plt.xlabel( - ( - meta[scan_var_name].latex - if scan_var_name in meta - else f"{scan_var_name}" - ), - fontsize=axis_font_size, - ) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - if len(input_files) > 1: - plt.legend( - loc="lower center", - fontsize=legend_size, - bbox_to_anchor=(0.5, -0.5 - (0.1 * len(output_names))), - fancybox=True, - shadow=False, - ncol=len(input_files), - columnspacing=0.8, - ) - plt.tight_layout() - ymin, ymax = axs[output_names.index(output_name)].get_ylim() - if ymin < 0 and ymax > 0: - axs[output_names.index(output_name)].set_ylim(ymin * 1.1, ymax * 1.1) - elif ymin >= 0: - axs[output_names.index(output_name)].set_ylim(ymin * 0.9, ymax * 1.1) - else: - axs[output_names.index(output_name)].set_ylim(ymin * 1.1, ymax * 0.9) + else: + ax.grid(True) + ax.set_ylabel( + get_label(output_name), + fontsize=x_axis.font_size, + color="red" if len(output_names2) > 0 else "black", + ) + ax.set_xlabel(get_label(scan_var.name), fontsize=x_axis.font_size) + + fig.title( + f"{get_label(output_name)} vs {get_label(scan_var.name)}", + fontsize=x_axis.font_size, + ) + if len(input_files) != 1: + fig.legend(loc="best", fontsize=x_axis.legend_size) + + ax.tick_params(axis=x_axis.name.lower(), labelsize=x_axis.tick_size) + ax.tick_params(axis=y_axis.name.lower(), labelsize=y_axis.tick_size) + fig.tight_layout() + + # Output file naming + # This uses exclusively the last output_name2 defined in an earlier loop ignoring all other output_name2s... + if output_name == "plasma_current_MA": + extra_str = f"plasma_current{f'_vs_{output_name2}' if len(output_names2) > 0 else ''}" + elif stack_plots and output_names[-1] == output_name: + extra_str = f"{output_name}{f'_vs_{output_name2}' if len(output_names2) > 0 else '_vs_'.join(output_names)}" + else: + extra_str = ( + f"{output_name}{f'_vs_{output_name2}' if len(output_names2) > 0 else ''}" + ) + + if (not stack_plots) or (stack_plots and output_names[-1] == output_name): + fig.savefig( + f"{outputdir}/scan_{scan_var.name}_vs_{extra_str}.{save_format}", + dpi=300, + ) + plt.show() + + +def twod_scan( + input_files: Sequence[Path], + scan_var: ScanVariables, + scan_2_var: ScanVariables, + output_names: Sequence[str], + outputdir: Path, + save_format: str, + x_axis: AxisData, + y_axis: AxisData, + *, + twod_contour: bool, +): + m_file = MFile(filename=input_files[0]) + + # Number of scan points + n_scan_1 = int(m_file.get("isweep", scan=-1)) + n_scan_2 = int(m_file.get("isweep_2", scan=-1)) + # Selecting the converged runs only + contour_conv_ij = [] # List of non-converged scan point numbers + conv_ij = [] # 2D array of converged scan point numbers (sweep = rows, sweep_2 = columns) + ii_jj = 0 + for ii in range(n_scan_1): + conv_ij.append([]) + for _jj in range(n_scan_2): + ii_jj += 1 # Represents the scan point number in the MFILE + ifail = m_file.get("ifail", scan=ii_jj) + if ifail == 1: + conv_ij[ii].append(ii_jj) # Only appends scan number if scan converged + contour_conv_ij.append(ii_jj) else: - plt.grid(True) - plt.ylabel( - ( - meta[output_name].latex - if output_name in meta - else f"{output_name}" - ), - fontsize=axis_font_size, - color="red" if len(output_names2) > 0 else "black", - ) - plt.xlabel( - ( - meta[scan_var_name].latex - if scan_var_name in meta - else f"{scan_var_name}" - ), - fontsize=axis_font_size, - ) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - plt.title( - f"{meta[output_name].latex if output_name in meta else {output_name}} vs " - f"{meta[scan_var_name].latex if scan_var_name in meta else {scan_var_name}}", - fontsize=axis_font_size, + failed_value_1 = scan_var.get_val(m_file, scan=ii_jj) + failed_value_2 = scan_2_var.get_val(m_file, scan=ii_jj) + print( + f"Warning : Non-convergent scan point : ({scan_var.name},{scan_2_var.name}) " + f"= ({failed_value_1},{failed_value_2})\n" + "Warning : This point will not be shown." ) - plt.tight_layout() - if len(input_files) != 1: - plt.legend(loc="best", fontsize=legend_size) - plt.tight_layout() - - # Output file naming - if output_name == "plasma_current_MA": - extra_str = f"plasma_current{f'_vs_{output_name2}' if len(output_names2) > 0 else ''}" - elif stack_plots and output_names[-1] == output_name: - extra_str = f"{output_name}{f'_vs_{output_name2}' if len(output_names2) > 0 else '_vs_'.join(output_names)}" - else: - extra_str = f"{output_name}{f'_vs_{output_name2}' if len(output_names2) > 0 else ''}" - if (not stack_plots) or (stack_plots and output_names[-1] == output_name): - plt.savefig( - f"{outputdir}/scan_{scan_var_name}_vs_{extra_str}.{save_format}", - dpi=300, + for index, output_name in enumerate(output_names): + if output_name not in m_file.data: + print( + f"Warning : `{output_name}` does not exist in PROCESS dicts\n" + f"Warning : `{output_name}` will not be output" + ) + continue + + fig, ax = plt.subplots() + x_contour = [scan_2_var.get_val(m_file, scan=i + 1) for i in range(n_scan_2)] + + if twod_contour: + y_contour = [ + scan_var.get_val(m_file, scan=i + 1) + for i in range(1, n_scan_1 * n_scan_2, n_scan_2) + ] + + output_contour_z = np.zeros((n_scan_1, n_scan_2)) + for i in contour_conv_ij: + ind1 = (i - 1) // n_scan_2 + ind2 = (i - 1) % n_scan_2 + output_contour_z[ind1][(ind2 if ind1 % 2 == 0 else (-ind2 - 1))] = ( + m_file.get(output_name, scan=i) ) - plt.show() - plt.clf() - plt.close() - # ------------ + flat_output_z = output_contour_z.flatten() + flat_output_z.sort() - # In case of a 2D scan - # ---------------------------------------------------------------------------------------------- - else: - # Opening the MFILE.DAT - m_file = MFile(filename=input_files[0]) - - # Number of scan points - n_scan_1 = int(m_file.data["isweep"].get_scan(-1)) - n_scan_2 = int(m_file.data["isweep_2"].get_scan(-1)) - # Selecting the converged runs only - contour_conv_ij = [] # List of non-converged scan point numbers - conv_ij = [] # 2D array of converged scan point numbers (sweep = rows, sweep_2 = columns) - ii_jj = 0 - for ii in range(n_scan_1): - conv_ij.append([]) - for _jj in range(n_scan_2): - ii_jj += 1 # Represents the scan point number in the MFILE - ifail = m_file.data["ifail"].get_scan(ii_jj) - if ifail == 1: - conv_ij[ii].append( - ii_jj - ) # Only appends scan number if scan converged - contour_conv_ij.append(ii_jj) - else: - failed_value_1 = m_file.data[scan_var_name].get_scan(ii_jj) - failed_value_2 = m_file.data[scan_2_var_name].get_scan(ii_jj) - print( - f"Warning : Non-convergent scan point : ({scan_var_name},{scan_2_var_name}) " - f"= ({failed_value_1},{failed_value_2})\n" - "Warning : This point will not be shown." - ) + levels = np.linspace( + next(filter(lambda i: i > 0.0, flat_output_z)), flat_output_z.max(), 50 + ) + contour = ax.contourf(x_contour, y_contour, output_contour_z, levels=levels) - # Looping over requested outputs - for index, output_name in enumerate(output_names): - # Check if the output variable exists in the MFILE - if output_name not in m_file.data: - print( - f"Warning : `{output_name}` does not exist in PROCESS dicts\n" - f"Warning : `{output_name}` will not be output" - ) - continue - - # Declaring the outputs - output_arrays = [] - - if twod_contour: - output_contour_z = np.zeros((n_scan_1, n_scan_2)) - x_contour = [ - m_file.data[scan_2_var_name].get_scan(i + 1) for i in range(n_scan_2) - ] - y_contour = [ - m_file.data[scan_var_name].get_scan(i + 1) - for i in range(1, n_scan_1 * n_scan_2, n_scan_2) - ] - for i in contour_conv_ij: - output_contour_z[((i - 1) // n_scan_2)][ - ( - ((i - 1) % n_scan_2) - if ((i - 1) // n_scan_2) % 2 == 0 - else (-((i - 1) % n_scan_2) - 1) - ) - ] = m_file.data[output_name].get_scan(i) - - flat_output_z = output_contour_z.flatten() - flat_output_z.sort() - fig, ax = plt.subplots() - levels = np.linspace( - next(filter(lambda i: i > 0.0, flat_output_z)), - flat_output_z.max(), - 50, - ) - contour = ax.contourf( - x_contour, - y_contour, - output_contour_z, - levels=levels, - ) + fig.colorbar(contour).set_label( + label=get_label(output_name), size=y_axis.font_size + ) + ax.set_ylabel(get_label(scan_var.name), fontsize=y_axis.font_size) - fig.colorbar(contour).set_label( - label=( - meta[output_name].latex - if output_name in meta - else f"{output_name}" - ), - size=axis_font_size, - ) - plt.ylabel( - ( - meta[scan_var_name].latex - if scan_var_name in meta - else f"{scan_var_name}" - ), - fontsize=axis_font_size, - ) - plt.xlabel( - ( - meta[scan_2_var_name].latex - if scan_2_var_name in meta - else f"{scan_2_var_name}" - ), - fontsize=axis_font_size, - ) - if len(y_axis_range) > 0: - y_divisions = (y_axis_range[1] - y_axis_range[0]) / 10 - if y_axis_percent: - if y_max[index] is None: - y_max[index] = max(np.abs(y_contour)) - yticks = mtick.PercentFormatter(y_max[index]) - if len(y_axis_range) > 0: - y_divisions = 5 * math.ceil(y_divisions / 5) * y_max[index] / 100 - y_range = ( - y_axis_range[0] * y_max[index] / 100, - y_axis_range[1] * y_max[index] / 100, - ) - ax.yaxis.set_major_formatter(yticks) - if len(y_axis_range) > 0: - if y_axis_percent is False: - y_range = y_axis_range - ax.set_ylim(y_range[0], y_range[1]) - ax.yaxis.set_major_locator(mtick.MultipleLocator(y_divisions)) - if len(x_axis_range) > 0: - x_divisions = (x_axis_range[1] - x_axis_range[0]) / 10 - if x_axis_percent: - if x_max[index] is None: - x_max[index] = max(np.abs(x_contour)) - xticks = mtick.PercentFormatter(x_max[index]) - if len(x_axis_range) > 0: - x_divisions = 5 * math.ceil(x_divisions / 5) * x_max[index] / 100 - x_range = ( - x_axis_range[0] * x_max[index] / 100, - x_axis_range[1] * x_max[index] / 100, - ) - ax.xaxis.set_major_formatter(xticks) - if len(x_axis_range) > 0: - if x_axis_percent is False: - x_range = x_axis_range - plt.xlim(x_range[0], x_range[1]) - ax.xaxis.set_major_locator(mtick.MultipleLocator(x_divisions)) - plt.rc("xtick", labelsize=axis_tick_size) - plt.rc("ytick", labelsize=axis_tick_size) - plt.tight_layout() - plt.savefig( - outputdir - / f"scan_{output_name}_vs_{scan_var_name}_{scan_2_var_name}.{save_format}" - ) - plt.grid(True) - plt.show() - plt.clf() + else: + y_contour = [m_file.get(output_name, scan=i + 1) for i in range(n_scan_2)] - else: - # Converged indexes, for normal 2D line plot - fig, ax = plt.subplots() - for conv_j in ( - conv_ij - ): # conv_j is an array element containing the converged scan numbers - # Scanned variables - scan_1_var_array = np.zeros(len(conv_j)) - scan_2_var_array = np.zeros(len(conv_j)) - output_array = np.zeros(len(conv_j)) - for jj in range(len(conv_j)): - scan_1_var_array[jj] = m_file.data[scan_var_name].get_scan( - conv_j[jj] - ) - scan_2_var_array[jj] = m_file.data[scan_2_var_name].get_scan( - conv_j[jj] - ) - output_array[jj] = m_file.data[output_name].get_scan(conv_j[jj]) - - # Label formating - labl = f"{meta[scan_var_name].latex if scan_var_name in meta else {scan_var_name}} = {scan_1_var_array[0]}" - - # Plot the graph - ax.plot(scan_2_var_array, output_array, "--o", label=labl) - - plt.grid(True) - plt.ylabel( + # conv_j is an array element containing the converged scan numbers + for conv_j in conv_ij: + # Scanned variables + scan_1_var_array, scan_2_var_array, output_array = np.array([ ( - meta[output_name].latex - if output_name in meta - else f"{output_name}" - ), - fontsize=axis_font_size, - ) - plt.xlabel( - ( - meta[scan_2_var_name].latex - if scan_2_var_name in meta - else f"{scan_2_var_name}" - ), - fontsize=axis_font_size, - ) - plt.legend(loc="best", fontsize=legend_size) - y_data = [ - m_file.data[output_name].get_scan(i + 1) for i in range(n_scan_2) - ] - if len(y_axis_range) > 0: - y_divisions = (y_axis_range[1] - y_axis_range[0]) / 10 - if y_axis_percent: - if y_max[index] is None: - y_max[index] = max(np.abs(y_data)) - yticks = mtick.PercentFormatter(y_max[index]) - if len(y_axis_range) > 0: - y_divisions = 5 * math.ceil(y_divisions / 5) * y_max[index] / 100 - y_range = ( - y_axis_range[0] * y_max[index] / 100, - y_axis_range[1] * y_max[index] / 100, - ) - ax.yaxis.set_major_formatter(yticks) - if len(y_axis_range) > 0: - if y_axis_percent is False: - y_range = y_axis_range - ax.set_ylim(y_range[0], y_range[1]) - ax.yaxis.set_major_locator(mtick.MultipleLocator(y_divisions)) - x_data = [ - m_file.data[scan_2_var_name].get_scan(i + 1) for i in range(n_scan_2) - ] - if len(x_axis_range) > 0: - x_divisions = (x_axis_range[1] - x_axis_range[0]) / 10 - if x_axis_percent: - if x_max[index] is None: - x_max[index] = max(np.abs(x_data)) - xticks = mtick.PercentFormatter(x_max[index]) - if len(x_axis_range) > 0: - x_divisions = 5 * math.ceil(x_divisions / 5) * x_max[index] / 100 - x_range = ( - x_axis_range[0] * x_max[index] / 100, - x_axis_range[1] * x_max[index] / 100, - ) - ax.xaxis.set_major_formatter(xticks) - if len(x_axis_range) > 0: - if x_axis_percent is False: - x_range = x_axis_range - plt.xlim(x_range[0], x_range[1]) - ax.xaxis.set_major_locator(mtick.MultipleLocator(x_divisions)) - plt.rc("xtick", labelsize=8) - plt.rc("ytick", labelsize=8) - plt.tight_layout() - plt.savefig( - outputdir - / f"scan_{output_name}_vs_{scan_var_name}_{scan_2_var_name}.{save_format}" - ) + scan_var.get_val(m_file, scan=conv_j[jj]), + scan_2_var.get_val(m_file, scan=conv_j[jj]), + m_file.get(output_name, scan=conv_j[jj]), + ) + for jj in range(len(conv_j)) + ]).T + + label = f"{get_label(scan_var.name)} = {scan_1_var_array[0]}" + ax.plot(scan_2_var_array, output_array, "--o", label=label) + + ax.set_ylabel(get_label(output_name), fontsize=y_axis.font_size) + fig.legend(loc="best", fontsize=x_axis.legend_size) + + ax.set_xlabel(get_label(scan_2_var.name), fontsize=x_axis.font_size) - # Display plot (used in Jupyter notebooks) - plt.show() - plt.clf() + axis_manipulation(ax, axis=x_axis, index=index, contour=x_contour) + axis_manipulation(ax, axis=y_axis, index=index, contour=y_contour) + ax.grid(True) + fname = f"scan_{output_name}_vs_{scan_var.name}_{scan_2_var.name}.{save_format}" + fig.savefig(outputdir / fname) + plt.show() diff --git a/process/core/scan.py b/process/core/scan.py index 16f1dcc954..b79937c117 100644 --- a/process/core/scan.py +++ b/process/core/scan.py @@ -86,6 +86,9 @@ def data(self): return self._data_ raise ValueError("Data not available") + def get_val(self, mfile, scan): + return mfile.get(self.name, scan=scan) + aspect = (1, SVE.P) pflux_div_heat_load_max_mw = (2, SVE.D) p_plant_electric_net_required_mw = (3, SVE.C) @@ -899,6 +902,5 @@ def scan_1d_write_plot(self): def scan_select(self, nwp, swp, iscn): sv = ScanVariables(int(nwp)) - sweep = swp[iscn - 1] - sv.set(self.data, sweep) + sv.set(self.data, swp[iscn - 1]) return sv From 3fb8220f55eb07422f6e3f346d5252e9ad25e6db Mon Sep 17 00:00:00 2001 From: james <81617086+je-cook@users.noreply.github.com> Date: Mon, 15 Jun 2026 09:21:55 +0100 Subject: [PATCH 5/6] WIP reorganise --- process/core/scan.py | 336 +++++++++++++++++++++---------------------- 1 file changed, 161 insertions(+), 175 deletions(-) diff --git a/process/core/scan.py b/process/core/scan.py index b79937c117..8163f1ee18 100644 --- a/process/core/scan.py +++ b/process/core/scan.py @@ -57,7 +57,7 @@ def _missing_(cls, value): return sv raise ProcessValueError("Illegal scan variable number", nwp=value) - def full_name(self): + def fname(self): if "__" in self.name: return self.name.replace("__", "(") + ")" return self.name @@ -160,25 +160,23 @@ def get_val(self, mfile, scan): class Scan: - """Perform a parameter scan using the Fortran scan module.""" + """Perform a parameter scan + + Parameters + ---------- + models : + Physics and engineering model objects + solver : + Which solver to use, as specified in solver.py + data : + Data structure object + """ def __init__(self, models: Model, solver: str, data: DataStructure): - """Immediately run the run_scan() method. - - Parameters - ---------- - models : - Physics and engineering model objects - solver : - Which solver to use, as specified in solver.py - data : - Data structure object - """ self.models = models self.solver = solver self.data = data self.solver_handler = SolverHandler(models, solver, data) - self.run_scan() def run_scan(self): """Call a solver over a range of values of one of the variables. @@ -188,38 +186,87 @@ def run_scan(self): number of output variable values are written to the MFILE.DAT file at each scan point, for plotting or other post-processing purposes. """ + if self.data.scan.isweep > 0 and self.data.scan.isweep > IPNSCNS: + raise ProcessValueError( + "Illegal value of isweep", + isweep=self.data.scan.isweep, + IPNSCNS=IPNSCNS, + ) + isw = self.data.scan.isweep or 1 + self.scan_array = np.arange(isw * (self.data.scan.isweep_2 or 1)).reshape( + isw, -1 + ) if self.data.scan.isweep == 0: # Solve single problem, rather than an array of problems (scan) # doopt() can also run just an evaluation + self._start_time = time.time() + self._ifail = self.solver_handler.run() + self._finish_time = time.time() + return + + if self.data.scan.scan_dim == 2: + self.scan_2d() + else: + self.scan_1d() + + def write_outputs(self): + write_output_files( + models=self.models, + data=self.data, + ifail=self._ifail, + runtime=self._finish_time - self._start_time, + ) + show_errors(constants.NOUT) + + def setup_scan(self, scan_index): ... + + def scan_1d(self): + for iscan in range(1, self.data.scan.isweep + 1): + self.scan_1d_write_point_header(iscan) start_time = time.time() ifail = self.doopt() + scan_1d_ifail_dict[iscan] = ifail write_output_files( models=self.models, data=self.data, ifail=ifail, runtime=time.time() - start_time, ) + show_errors(constants.NOUT) - return + logging_model_handler.clear_logs() - if self.data.scan.isweep > IPNSCNS: - raise ProcessValueError( - "Illegal value of isweep", - isweep=self.data.scan.isweep, - IPNSCNS=IPNSCNS, - ) + def scan_2d(self): + iscan = 1 - if self.data.scan.scan_dim == 2: - self.scan_2d() - else: - self.scan_1d() + # initialise array which will contain ifail values for each scan point + scan_2d_ifail_list = np.zeros( + (NOUTVARS, IPNSCNS), + dtype=np.float64, + order="F", + ) + for iscan_1 in range(1, self.data.scan.isweep + 1): + for iscan_2 in range(1, self.data.scan.isweep_2 + 1): + self.scan_2d_write_point_header(iscan, iscan_1, iscan_2) + start_time = time.time() + ifail = self.doopt() + write_output_files( + models=self.models, + data=self.data, + ifail=ifail, + runtime=time.time() - start_time, + ) - def doopt(self): - """Run the optimiser or solver.""" - ifail = self.solver_handler.run() - self.post_optimise(ifail) + show_errors(constants.NOUT) + logging_model_handler.clear_logs() + scan_2d_ifail_list[iscan_1][iscan_2] = ifail + iscan += 1 - return ifail + +def scan_summary(scan): + + def write_outputs(self): + self.post_optimise(self._ifail) def post_optimise(self, ifail: int): """Called after calling the optimising equation solver from Python. @@ -410,14 +457,6 @@ def post_optimise(self, ifail: int): f" {bounds[i] * self.data.numerics.scafc[i]}", ) - # Write optimisation parameters to mfile - process_output.ovarre( - constants.MFILE, - self.data.numerics.lablxc[self.data.numerics.ixc[i] - 1], - f"(itvar{i + 1:03d})", - self.data.numerics.xcs[i], - ) - xnorm = ( 1.0 if self.data.numerics.boundu[i] == self.data.numerics.boundl[i] @@ -437,32 +476,33 @@ def post_optimise(self, ifail: int): ) ) - process_output.ovarre( - constants.MFILE, - f"{name} (final value/initial value)", - f"(xcm{i + 1:03d})", - self.data.numerics.xcm[i], - ) - process_output.ovarre( - constants.MFILE, - f"{name} (range normalised)", - f"(nitvar{i + 1:03d})", - xnorm, - ) - process_output.ovarre( - constants.MFILE, - f"{name} (upper bound)", - f"(boundu{i + 1:03d})", - self.data.numerics.itv_scaled_upper_bounds[i] - * self.data.numerics.scafc[i], - ) - process_output.ovarre( - constants.MFILE, - f"{name} (lower bound)", - f"(boundl{i + 1:03d})", - self.data.numerics.itv_scaled_lower_bounds[i] - * self.data.numerics.scafc[i], - ) + # Write optimisation parameters to mfile + for d, var, v in ( + ( + self.data.numerics.lablxc[self.data.numerics.ixc[i] - 1], + f"(itvar{i + 1:03d})", + self.data.numerics.xcs[i], + ), + ( + f"{name} (final value/initial value)", + f"(xcm{i + 1:03d})", + self.data.numerics.xcm[i], + ), + (f"{name} (range normalised)", f"(nitvar{i + 1:03d})", xnorm), + ( + f"{name} (upper bound)", + f"(boundu{i + 1:03d})", + self.data.numerics.itv_scaled_upper_bounds[i] + * self.data.numerics.scafc[i], + ), + ( + f"{name} (lower bound)", + f"(boundl{i + 1:03d})", + self.data.numerics.itv_scaled_lower_bounds[i] + * self.data.numerics.scafc[i], + ), + ): + process_output.ovarre(constants.MFILE, d, var, v) # Write optimisation parameter headings to output file process_output.osubhd( @@ -498,32 +538,30 @@ def post_optimise(self, ifail: int): f"{err[i]} {lab[i]}", con1[i], ]) - process_output.ovarre( - constants.MFILE, - f"{name:<33} normalised residue", - f"(eq_con{self.data.numerics.icc[i]:03d})", - con1[i], - ) - - process_output.ovarre( - constants.MFILE, - f"{name:<33} residual", - f"(res_eq_con{self.data.numerics.icc[i]:03d})", - err[i], - ) - process_output.ovarre( - constants.MFILE, - f"{name} constraint value", - f"(val_eq_con{self.data.numerics.icc[i]:03d})", - con2[i], - ) - process_output.ovarre( - constants.MFILE, - f"{name} units", - f"(eq_units_con{self.data.numerics.icc[i]:03d})", - f"'{lab[i]}'", - ) + for d, var, v in ( + ( + f"{name:<33} normalised residue", + f"(eq_con{self.data.numerics.icc[i]:03d})", + con1[i], + ), + ( + f"{name:<33} residual", + f"(res_eq_con{self.data.numerics.icc[i]:03d})", + err[i], + ), + ( + f"{name} constraint value", + f"(val_eq_con{self.data.numerics.icc[i]:03d})", + con2[i], + ), + ( + f"{name} units", + f"(eq_units_con{self.data.numerics.icc[i]:03d})", + f"'{lab[i]}'", + ), + ): + process_output.ovarre(constants.MFILE, d, var, v) # Write equality constraints to output file process_output.write( @@ -573,39 +611,35 @@ def post_optimise(self, ifail: int): f"{constraint.residual} {constraint.units}", f"{constraint.normalised_residual}", ]) - process_output.ovarre( - constants.MFILE, - f"{name} normalised residue", - f"(ineq_con{self.data.numerics.icc[i]:03d})", - -constraint.normalised_residual, - ) - process_output.ovarre( - constants.MFILE, - f"{name} physical value", - f"(ineq_value_con{self.data.numerics.icc[i]:03d})", - constraint.constraint_value, - ) - process_output.ovarre( - constants.MFILE, - f"{name} symbol", - f"(ineq_symbol_con{self.data.numerics.icc[i]:03d})", - f"'{constraint.symbol}'", - ) - - process_output.ovarre( - constants.MFILE, - f"{name} units", - f"(ineq_units_con{self.data.numerics.icc[i]:03d})", - f"'{constraint.units}'", - ) - - process_output.ovarre( - constants.MFILE, - f"{name} physical bound", - f"(ineq_bound_con{self.data.numerics.icc[i]:03d})", - constraint.constraint_bound, - ) + for d, var, v in ( + ( + "normalised residue", + f"(ineq_con{self.data.numerics.icc[i]:03d})", + -constraint.normalised_residual, + ), + ( + "physical value", + f"(ineq_value_con{self.data.numerics.icc[i]:03d})", + constraint.constraint_value, + ), + ( + "symbol", + f"(ineq_symbol_con{self.data.numerics.icc[i]:03d})", + f"'{constraint.symbol}'", + ), + ( + "units", + f"(ineq_units_con{self.data.numerics.icc[i]:03d})", + f"'{constraint.units}'", + ), + ( + "physical bound", + f"(ineq_bound_con{self.data.numerics.icc[i]:03d})", + constraint.constraint_bound, + ), + ): + process_output.ovarre(constants.MFILE, f"{name} {d}", var, v) process_output.write( constants.NOUT, @@ -693,21 +727,6 @@ def scan_1d(self): # initialise dict which will contain ifail values for each scan point scan_1d_ifail_dict = {} - for iscan in range(1, self.data.scan.isweep + 1): - self.scan_1d_write_point_header(iscan) - start_time = time.time() - ifail = self.doopt() - scan_1d_ifail_dict[iscan] = ifail - write_output_files( - models=self.models, - data=self.data, - ifail=ifail, - runtime=time.time() - start_time, - ) - - show_errors(constants.NOUT) - logging_model_handler.clear_logs() - # outvar now contains results self.scan_1d_write_plot(self.data.scan) print( @@ -726,7 +745,7 @@ def scan_1d(self): ] for iscan in range(self.data.scan.isweep): pstring = ( - f"Scan {iscan:02d}: {nsweep_var.name} = {sweep_values[iscan]} " + f"Scan {iscan:02d}: {nsweep_var.fname} = {sweep_values[iscan]} " + " " * offsets[iscan] + "\u001b[3{}CONVERGED \u001b[0m" ) @@ -739,37 +758,6 @@ def scan_1d(self): converged_percentage = converged_count / self.data.scan.isweep * 100 print(f"\nConvergence Percentage: {converged_percentage:.2f}%") - def scan_2d(self): - """Run a 2-D scan.""" - # Initialise intent(out) arrays - self.scan_2d_init(self.data.scan) - iscan = 1 - - # initialise array which will contain ifail values for each scan point - scan_2d_ifail_list = np.zeros( - (NOUTVARS, IPNSCNS), - dtype=np.float64, - order="F", - ) - for iscan_1 in range(1, self.data.scan.isweep + 1): - for iscan_2 in range(1, self.data.scan.isweep_2 + 1): - self.scan_2d_write_point_header(iscan, iscan_1, iscan_2) - start_time = time.time() - ifail = self.doopt() - write_output_files( - models=self.models, - data=self.data, - ifail=ifail, - runtime=time.time() - start_time, - ) - - show_errors(constants.NOUT) - logging_model_handler.clear_logs() - scan_2d_ifail_list[iscan_1][iscan_2] = ifail - iscan += 1 - - self.output_2d_summary(scan_2d_ifail_list) - def scan_2d_init(self): sv = self.data.scan for d, n, v in ( @@ -814,8 +802,8 @@ def output_2d_summary(self, scan_2d_ifail_list): for iscan_1 in range(1, self.data.scan.isweep + 1): for iscan_2 in range(1, self.data.scan.isweep_2 + 1): string = ( - f"Scan {scan_point:02d}: ({nsweep_var.name} = {sweep_1_values[iscan_1 - 1]}," - f" {nsweep_2_var.name} = {sweep_2_values[iscan_2 - 1]}) " + f"Scan {scan_point:02d}: ({nsweep_var.fname} = {sweep_1_values[iscan_1 - 1]}," + f" {nsweep_2_var.fname} = {sweep_2_values[iscan_2 - 1]}) " + " " * offsets[iscan_1 - 1][iscan_2 - 1] + "\u001b[3{}CONVERGED \u001b[0m" ) @@ -836,7 +824,7 @@ def _set_v_x_label(self, iscan, twod=False): if twod else self.scan_select(self.data.scan.nsweep, self.data.scan.sweep, iscan) ) - self.data.globals.vlabel = sv.name + self.data.globals.vlabel = sv.fname self.data.globals.xlabel = sv.data.description def scan_1d_write_point_header(self, iscan: int): @@ -849,8 +837,7 @@ def scan_1d_write_point_header(self, iscan: int): process_output.write( constants.NOUT, f"Scan point {iscan} of {self.data.scan.isweep} : {self.data.globals.xlabel}" - f", {self.data.globals.vlabel} = {self.data.scan.sweep[iscan - 1]} " - "", + f", {self.data.globals.vlabel} = {self.data.scan.sweep[iscan - 1]} ", ) process_output.ovarin(constants.MFILE, "Scan point number", "(iscan)", iscan) @@ -876,8 +863,7 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): constants.NOUT, f"2D Scan point {iscan} of {self.data.scan.isweep * self.data.scan.isweep_2} : " f"{self.data.globals.vlabel} = {self.data.scan.sweep[iscan_1 - 1]} and" - f" {self.data.globals.vlabel_2} = {self.data.scan.sweep_2[iscan_r - 1]} " - "", + f" {self.data.globals.vlabel_2} = {self.data.scan.sweep_2[iscan_r - 1]} ", ) process_output.ovarin(constants.MFILE, "Scan point number", "(iscan)", iscan) From 1529a059b5f93ee73687722b06fe302ba658c903 Mon Sep 17 00:00:00 2001 From: james <81617086+je-cook@users.noreply.github.com> Date: Mon, 15 Jun 2026 09:28:37 +0100 Subject: [PATCH 6/6] remove unnecessary variable --- process/core/io/plot/scans.py | 53 ++++++++++++++++++++--------------- process/core/scan.py | 7 +++++ 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/process/core/io/plot/scans.py b/process/core/io/plot/scans.py index 2bae68f1a1..0c906bb734 100644 --- a/process/core/io/plot/scans.py +++ b/process/core/io/plot/scans.py @@ -80,7 +80,12 @@ def get_list_padded(inp, names): return inp_array[:target_len] -def value_checks(scan_var, scan_2_var, m_file, is_2D_scan, input_files): +def value_checks( + scan_var: ScanVariables, + scan_2_var: ScanVariables | None, + m_file: MFile, + input_files: Sequence[Path], +): ve_string = ( "`{}` does not exist in PROCESS dicts\n" " The scan variable is probably an upper/lower boundary\n" @@ -91,11 +96,12 @@ def value_checks(scan_var, scan_2_var, m_file, is_2D_scan, input_files): raise ValueError(ve_string.format(scan_var.name)) # Check if the second scan variable is present - if is_2D_scan and (scan_2_var.name not in m_file.data): - raise ValueError(ve_string.format(scan_2_var.name)) + if scan_2_var is not None: + if scan_2_var.name not in m_file.data: + raise ValueError(ve_string.format(scan_2_var.name)) - if is_2D_scan and len(input_files) > 1: - raise ValueError("Only one input file can be used for 2D scans") + if len(input_files) > 1: + raise ValueError("Only one input file can be used for 2D scans") def array_check(output_name: str, m_file: MFile) -> bool: @@ -170,7 +176,6 @@ def plot_scan( """Main plot scans script.""" outputdir = outputdir or Path.cwd() input_files = mfiles if isinstance(mfiles, Iterable) else [mfiles] - is_2D_scan = False # If the input file is a directory, add MFILE.DAT for ii, if_ in enumerate(input_files): @@ -183,11 +188,13 @@ def plot_scan( scan_var = ScanVariables(nsweep_ref) # Get the eventual second scan variable - if "nsweep_2" in m_file.data: - is_2D_scan = True - scan_2_var = ScanVariables(int(m_file.get("nsweep_2", scan=-1))) + scan_2_var = ( + ScanVariables(int(m_file.get("nsweep_2", scan=-1))) + if "nsweep_2" in m_file.data + else None + ) - value_checks(scan_var, scan_2_var, m_file, is_2D_scan, input_files) + value_checks(scan_var, scan_2_var, m_file, input_files) x_max = get_list_padded(x_axis_max, output_names) x_axis = AxisData( @@ -198,19 +205,7 @@ def plot_scan( y_axis = AxisData( "y", y_axis_percent, y_max, y_axis_range, axis_tick_size, axis_font_size ) - if is_2D_scan: - twod_scan( - input_files, - scan_var, - scan_2_var, - output_names, - outputdir, - save_format, - x_axis, - y_axis, - twod_contour=twod_contour, - ) - else: + if scan_2_var is None: y_axis2 = AxisData( "y", y_axis_percent2, @@ -249,6 +244,18 @@ def plot_scan( y_axis2, stack_plots=stack_plots, ) + else: + twod_scan( + input_files, + scan_var, + scan_2_var, + output_names, + outputdir, + save_format, + x_axis, + y_axis, + twod_contour=twod_contour, + ) def oned_scan( diff --git a/process/core/scan.py b/process/core/scan.py index 8163f1ee18..c45c6569e9 100644 --- a/process/core/scan.py +++ b/process/core/scan.py @@ -758,6 +758,13 @@ def scan_1d(self): converged_percentage = converged_count / self.data.scan.isweep * 100 print(f"\nConvergence Percentage: {converged_percentage:.2f}%") + def scan_2d(self): + """Run a 2-D scan.""" + # Initialise intent(out) arrays + self.scan_2d_init() + + self.output_2d_summary(scan_2d_ifail_list) + def scan_2d_init(self): sv = self.data.scan for d, n, v in (