From bf37fb101e4722f497f47d8d46884eea0ffdb7a2 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Thu, 28 May 2026 12:04:42 +0100 Subject: [PATCH 1/4] Making estimate_transmission its own pipeline, trigger service looks for transmission in ispyb and mimas invokes the recipe --- src/dlstbx/mimas/core.py | 7 ++ src/dlstbx/services/trigger.py | 27 ++++- src/dlstbx/wrapper/estimate_transmission.py | 108 ++++++++++++++------ 3 files changed, 106 insertions(+), 36 deletions(-) diff --git a/src/dlstbx/mimas/core.py b/src/dlstbx/mimas/core.py index 10e9ecf60..9710a3640 100644 --- a/src/dlstbx/mimas/core.py +++ b/src/dlstbx/mimas/core.py @@ -193,6 +193,13 @@ def handle_characterization( source="automatic", displayname="align_crystal", ), + mimas.MimasISPyBJobInvocation( + DCID=scenario.DCID, + autostart=True, + recipe="strategy-estimate-transmission", + source="automatic", + displayname="estimate_transmission", + ), ] diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 6aa4f5514..00d8c5344 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -27,6 +27,9 @@ ProcessingJob, Proposal, Protein, + Screening, + ScreeningOutput, + ScreeningStrategy, ) from sqlalchemy import or_ from sqlalchemy.orm import Load, contains_eager, joinedload @@ -2225,9 +2228,7 @@ def trigger_multiplex( message = {"recipes": [], "parameters": {"ispyb_process": jobid}} rw.transport.send("processing_recipe", message) - self.log.info( - f"xia2.multiplex trigger: Processing job {jobid} triggered" - ) + self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered") return {"success": True, "return_value": jobids} @@ -2952,6 +2953,25 @@ def trigger_strategy( ) return {"success": True} + transmission = ( + session.query(ScreeningStrategy.transmission) + .join( + ScreeningOutput, + ScreeningOutput.screeningOutputId + == ScreeningStrategy.screeningOutputId, + ) + .join(Screening, Screening.screeningId == ScreeningOutput.screeningId) + .filter(Screening.dataCollectionId == parameters.dcid) + .filter(ScreeningStrategy.program == "estimate_transmission") + .scalar() + ) + + if not transmission: + self.log.info( + f"Skipping strategy trigger: no transmission recommendation found for dcid={parameters.dcid}" + ) + return {"success": True} + jp = self.ispyb.mx_processing.get_job_params() jp["comments"] = parameters.comment jp["datacollectionid"] = parameters.dcid @@ -2965,6 +2985,7 @@ def trigger_strategy( "beamline": parameters.beamline, "resolution": resolution, "wavelength": parameters.wavelength, + "transmission": transmission, } for key, value in strategy_parameters.items(): diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 6ff52a6d7..4f99b658f 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -20,6 +20,67 @@ class EstimateTransmissionWrapper(Wrapper): _logger_name = "dlstbx.wrap.estimate_transmission" + def collect_ispyb_command_list(self, transmission, max_pixel_count_pct): + if max_pixel_count_pct < 0.7: + warning_level = 0 + elif max_pixel_count_pct < 0.85: + warning_level = 1 + else: + warning_level = 2 + + warning_message = { + 0: "Diffraction spots are unlikely to have detector count rate issues", + 1: "Some diffraction spots may have detector count rate issues", + 2: "Some diffraction spots are likely to have detector count rate issues", + }.get(warning_level) + + warning_description = f"The most intense pixel is {max_pixel_count_pct * 100}% of the detector's limit" + warning_severity = {0: "INFO", 1: "WARNING", 2: "ERROR"}.get(warning_level) + + # Step 0: Add a program message about the count rate warning, + # store the autoproc program + ispyb_command_list = [] + d = { + "ispyb_command": "add_program_message", + "program_id": "$ispyb_autoprocprogram_id", + "message": warning_message, + "description": warning_description, + "severity": warning_severity, + } + ispyb_command_list.append(d) + + # Step 1: Create screeningOutput record for recipe, linked to the screeningId + # Keep the screeningOutputId + d = { + "program": "estimate_transmission", + "strategysuccess": 1, + "ispyb_command": "insert_screening_output", + "screening_id": "$ispyb_screening_id", + "store_result": "ispyb_screening_output_id", + } + ispyb_command_list.append(d) + + # Step 2: Store screeningStrategy results, linked to the screeningOutputId + # Keep the screeningStrategyId + d = { + "program": "estimate_transmission", + "ispyb_command": "insert_screening_strategy", + "transmission": transmission, + "screening_output_id": "$ispyb_screening_output_id", + } + ispyb_command_list.append(d) + + d = { + "ispyb_command": "update_processing_status", + "program_id": "$ispyb_autoprocprogram_id", + "message": "Processing successful", + "status": "success", + } + ispyb_command_list.append(d) + + self.log.info("Sending %s", str(ispyb_command_list)) + self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + def run(self): """Entrypoint for the estimate_transmission wrapper. @@ -90,40 +151,8 @@ def run(self): scaled_transmission = min(1, (transmission * scale_factor) / 100) self.log.info(f"Scaled transmission is : {scaled_transmission}") - self.recwrap.send_to( - "strategy", - {"parameters": {"scaled_transmission": float(scaled_transmission)}}, - ) - max_pixel_count_pct = int(num_counts[-1]) / trusted_range - if max_pixel_count_pct < 0.7: - warning_level = 0 - elif max_pixel_count_pct < 0.85: - warning_level = 1 - else: - warning_level = 2 - - warning_message = { - 0: "Diffraction spots are unlikely to have detector count rate issues", - 1: "Some diffraction spots may have detector count rate issues", - 2: "Some diffraction spots are likely to have detector count rate issues", - }.get(warning_level) - - warning_description = f"The most intense pixel is {max_pixel_count_pct * 100}% of the detector's limit" - warning_severity = ({0: "INFO", 1: "WARNING", 2: "ERROR"}.get(warning_level),) - - ispyb_command_list = [ - { - "ispyb_command": "add_program_message", - "program_id": "$ispyb_autoprocprogram_id", - "message": warning_message, - "description": warning_description, - "severity": warning_severity, - } - ] - - self.log.info("Sending %s", str(ispyb_command_list)) - self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + self.collect_ispyb_command_list(transmission, max_pixel_count_pct) results_directory.mkdir(parents=True, exist_ok=True) output_file = "dials.find_spots.log" @@ -140,6 +169,19 @@ def run(self): self.save_plot(num_counts, num_pixels, results_directory) self.save_hist_to_json(counts_hist, trusted_range, results_directory) + output_files = { + "pixel_intensities.png": "result", + "pixel_counts.json": "result", + output_file: "log", + } + for file, filetype in output_files.items(): + self.record_result_individual_file( + { + "file_path": str(results_directory), + "file_name": file, + "file_type": filetype, + } + ) self.log.info("Done.") return True From 4671c6408dd216bf2f32396a585ddcf968f33842 Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Thu, 28 May 2026 16:18:02 +0100 Subject: [PATCH 2/4] Making trigger service run off estimate_transmission and waiting for a auto processing step to complete --- src/dlstbx/services/trigger.py | 94 ++++++++++++++++++--- src/dlstbx/wrapper/estimate_transmission.py | 2 + 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 00d8c5344..489957ca4 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -295,6 +295,15 @@ class StrategyParameters(pydantic.BaseModel): experiment_type: str program_id: int = pydantic.Field(gt=0) wavelength: float = pydantic.Field(gt=0) + backoff_delay: Dict[str, float] = pydantic.Field( + default={"default": 8}, alias="backoff-delay" + ) + backoff_max_try: Dict[str, int] = pydantic.Field( + default={"default": 10}, alias="backoff-max-try" + ) + backoff_multiplier: Dict[str, float] = pydantic.Field( + default={"default": 2}, alias="backoff-multiplier" + ) class DLSTrigger(CommonService): @@ -2895,6 +2904,7 @@ def trigger_align_crystal( def trigger_strategy( self, rw: workflows.recipe.RecipeWrapper, + message: Dict, *, parameters: StrategyParameters, session: sqlalchemy.orm.session.Session, @@ -2912,24 +2922,44 @@ def trigger_strategy( ) return {"success": True} - udc_strategy_previously_triggered = ( + # Check that the processing program is from estimate_transmission, if not skip strategy trigger + processing_program = ( session.query(AutoProcProgram.processingPrograms) - .join( - ProcessingJob, - AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) + .scalar() + ) + + if "estimate_transmission" != processing_program: + self.log.info( + f"Skipping strategy trigger: processing program {processing_program[0]} does not contain estimate_transmission" ) - .filter(ProcessingJob.dataCollectionId == parameters.dcid) - .filter(AutoProcProgram.processingPrograms == "UDC strategy") - .all() + return {"success": True} + + status = { + "ntry": 0, + } + backoff_delay = parameters.backoff_delay.get( + parameters.beamline, parameters.backoff_delay["default"] + ) + backoff_multiplier = parameters.backoff_multiplier.get( + parameters.beamline, parameters.backoff_multiplier["default"] + ) + backoff_max_try = parameters.backoff_max_try.get( + parameters.beamline, parameters.backoff_max_try["default"] ) - if udc_strategy_previously_triggered: + if isinstance(message, dict): + status.update(message.get("trigger-status", {})) + status["ntry"] += 1 + message_delay = int(backoff_delay * backoff_multiplier ** status["ntry"]) + if status["ntry"] > backoff_max_try: self.log.info( - f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}." + f"Skipping strategy trigger: maximum number of retries exceeded for dcid={parameters.dcid}" ) return {"success": True} # Get resolution estimate from ispyb records for upstream pipeline - returns None if not found. + # Need to change to use the dcid and find the minimum resolution = ( session.query(AutoProcScalingStatistics.resolutionLimitHigh) .join( @@ -2942,17 +2972,57 @@ def trigger_strategy( AutoProcProgram, AutoProcProgram.autoProcProgramId == AutoProc.autoProcProgramId, ) - .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) + .join( + ProcessingJob, + AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + ) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) .filter(AutoProcScalingStatistics.scalingStatisticsType == "overall") - .scalar() + .all() + ) + + if resolution is None: + # Send results to myself for next round of processing + self.log.info( + f"Waiting for a transmission recommendation for dcid={parameters.dcid}" + ) + rw.checkpoint( + { + "trigger-status": status, + }, + delay=message_delay, + ) + return {"success": True} + + min_resolution = min(resolution, key=lambda x: x[0])[0] + self.log.info( + f"Strategy trigger: found minumum resolution {min_resolution} for dcid={parameters.dcid}" ) + udc_strategy_previously_triggered = ( + session.query(AutoProcProgram.processingPrograms) + .join( + ProcessingJob, + AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + ) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) + .filter(AutoProcProgram.processingPrograms == "UDC strategy") + .all() + ) + + if udc_strategy_previously_triggered: + self.log.info( + f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}." + ) + return {"success": True} + if not resolution: self.log.info( f"Skipping strategy trigger: no resolution estimate found for dcid={parameters.dcid} auto_proc_program_id={parameters.program_id}" ) return {"success": True} + # Trigger service will be triggered by estimate_transmission therefore can se autoprocId transmission = ( session.query(ScreeningStrategy.transmission) .join( @@ -2983,7 +3053,7 @@ def trigger_strategy( strategy_parameters = { "beamline": parameters.beamline, - "resolution": resolution, + "resolution": min_resolution, "wavelength": parameters.wavelength, "transmission": transmission, } diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 4f99b658f..8e69c5cab 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -182,6 +182,8 @@ def run(self): "file_type": filetype, } ) + + self.recwrap.send_to("trigger", {}) self.log.info("Done.") return True From e64fb606cb9e8efedef9a5d2affb9e18bda29ddf Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Thu, 28 May 2026 17:06:25 +0100 Subject: [PATCH 3/4] Integrating standalone into strategy --- src/dlstbx/services/strategy.py | 7 +++++-- src/dlstbx/services/trigger.py | 8 +++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index aa759f66e..5749de737 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -192,7 +192,6 @@ def generate_strategy( parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, recipe_params.get("ispyb_parameters", {}), - recipe_params, substitutions=rw.environment, ) self.log.info(f"Received parameters for strategy generation:\n{parameters}") @@ -215,6 +214,11 @@ def generate_strategy( if isinstance(parameters["resolution"], list) else float(parameters["resolution"]) ) + recommended_max_transmission = ( + float(parameters["transmission_estimate"][0]) + if isinstance(parameters["transmission_estimate"], list) + else float(parameters.get("transmission_estimate", 100)) + ) dc_transmission = float(parameters.get("transmission", 100)) / 100 resolution_offset = 0.5 min_resolution = 0.9 @@ -232,7 +236,6 @@ def generate_strategy( ) beamline_config = parse_config_file(beamline_config_file) - recommended_max_transmission = parameters.get("scaled_transmission", 1.0) base_recipe_home = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes") agamemnon_recipe_config = base_recipe_home / "recipe_config.yaml" agamemnon_limits: dict[str, AgamemnonLimits] = parse_agamemnon_config( diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 489957ca4..a09ad0498 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -2931,7 +2931,7 @@ def trigger_strategy( if "estimate_transmission" != processing_program: self.log.info( - f"Skipping strategy trigger: processing program {processing_program[0]} does not contain estimate_transmission" + f"Skipping strategy trigger: processing program {processing_program} not supported" ) return {"success": True} @@ -2959,7 +2959,6 @@ def trigger_strategy( return {"success": True} # Get resolution estimate from ispyb records for upstream pipeline - returns None if not found. - # Need to change to use the dcid and find the minimum resolution = ( session.query(AutoProcScalingStatistics.resolutionLimitHigh) .join( @@ -2981,6 +2980,9 @@ def trigger_strategy( .all() ) + self.log.info( + f"Strategy trigger: resolution estimate from ispyb for dcid={parameters.dcid} is {resolution}" + ) if resolution is None: # Send results to myself for next round of processing self.log.info( @@ -3055,7 +3057,7 @@ def trigger_strategy( "beamline": parameters.beamline, "resolution": min_resolution, "wavelength": parameters.wavelength, - "transmission": transmission, + "transmission_estimate": transmission, } for key, value in strategy_parameters.items(): From 6c526d0cccc0559dd805d643851052643d3c7f1a Mon Sep 17 00:00:00 2001 From: Paul Naughton Date: Fri, 29 May 2026 12:06:55 +0100 Subject: [PATCH 4/4] Fixing bugs --- src/dlstbx/services/strategy.py | 2 +- src/dlstbx/wrapper/estimate_transmission.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index 5749de737..fd046f8d6 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -218,7 +218,7 @@ def generate_strategy( float(parameters["transmission_estimate"][0]) if isinstance(parameters["transmission_estimate"], list) else float(parameters.get("transmission_estimate", 100)) - ) + ) / 100 dc_transmission = float(parameters.get("transmission", 100)) / 100 resolution_offset = 0.5 min_resolution = 0.9 diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 8e69c5cab..31b820193 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -148,11 +148,11 @@ def run(self): ) scale_factor = target_countrate_pct / pixel_countrate_pct - scaled_transmission = min(1, (transmission * scale_factor) / 100) + scaled_transmission = min(100, (transmission * scale_factor)) self.log.info(f"Scaled transmission is : {scaled_transmission}") max_pixel_count_pct = int(num_counts[-1]) / trusted_range - self.collect_ispyb_command_list(transmission, max_pixel_count_pct) + self.collect_ispyb_command_list(scaled_transmission, max_pixel_count_pct) results_directory.mkdir(parents=True, exist_ok=True) output_file = "dials.find_spots.log"