diff --git a/src/dlstbx/mimas/core.py b/src/dlstbx/mimas/core.py index 10e9ecf60..9710a3640 100644 --- a/src/dlstbx/mimas/core.py +++ b/src/dlstbx/mimas/core.py @@ -193,6 +193,13 @@ def handle_characterization( source="automatic", displayname="align_crystal", ), + mimas.MimasISPyBJobInvocation( + DCID=scenario.DCID, + autostart=True, + recipe="strategy-estimate-transmission", + source="automatic", + displayname="estimate_transmission", + ), ] diff --git a/src/dlstbx/services/strategy.py b/src/dlstbx/services/strategy.py index aa759f66e..fd046f8d6 100644 --- a/src/dlstbx/services/strategy.py +++ b/src/dlstbx/services/strategy.py @@ -192,7 +192,6 @@ def generate_strategy( parameters = ChainMapWithReplacement( message.get("parameters", {}) if isinstance(message, dict) else {}, recipe_params.get("ispyb_parameters", {}), - recipe_params, substitutions=rw.environment, ) self.log.info(f"Received parameters for strategy generation:\n{parameters}") @@ -215,6 +214,11 @@ def generate_strategy( if isinstance(parameters["resolution"], list) else float(parameters["resolution"]) ) + recommended_max_transmission = ( + float(parameters["transmission_estimate"][0]) + if isinstance(parameters["transmission_estimate"], list) + else float(parameters.get("transmission_estimate", 100)) + ) / 100 dc_transmission = float(parameters.get("transmission", 100)) / 100 resolution_offset = 0.5 min_resolution = 0.9 @@ -232,7 +236,6 @@ def generate_strategy( ) beamline_config = parse_config_file(beamline_config_file) - recommended_max_transmission = parameters.get("scaled_transmission", 1.0) base_recipe_home = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes") agamemnon_recipe_config = base_recipe_home / "recipe_config.yaml" agamemnon_limits: dict[str, AgamemnonLimits] = parse_agamemnon_config( diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 6aa4f5514..a09ad0498 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -27,6 +27,9 @@ ProcessingJob, Proposal, Protein, + Screening, + ScreeningOutput, + ScreeningStrategy, ) from sqlalchemy import or_ from sqlalchemy.orm import Load, contains_eager, joinedload @@ -292,6 +295,15 @@ class StrategyParameters(pydantic.BaseModel): experiment_type: str program_id: int = pydantic.Field(gt=0) wavelength: float = pydantic.Field(gt=0) + backoff_delay: Dict[str, float] = pydantic.Field( + default={"default": 8}, alias="backoff-delay" + ) + backoff_max_try: Dict[str, int] = pydantic.Field( + default={"default": 10}, alias="backoff-max-try" + ) + backoff_multiplier: Dict[str, float] = pydantic.Field( + default={"default": 2}, alias="backoff-multiplier" + ) class DLSTrigger(CommonService): @@ -2225,9 +2237,7 @@ def trigger_multiplex( message = {"recipes": [], "parameters": {"ispyb_process": jobid}} rw.transport.send("processing_recipe", message) - self.log.info( - f"xia2.multiplex trigger: Processing job {jobid} triggered" - ) + self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered") return {"success": True, "return_value": jobids} @@ -2894,6 +2904,7 @@ def trigger_align_crystal( def trigger_strategy( self, rw: workflows.recipe.RecipeWrapper, + message: Dict, *, parameters: StrategyParameters, session: sqlalchemy.orm.session.Session, @@ -2911,20 +2922,39 @@ def trigger_strategy( ) return {"success": True} - udc_strategy_previously_triggered = ( + # Check that the processing program is from estimate_transmission, if not skip strategy trigger + processing_program = ( session.query(AutoProcProgram.processingPrograms) - .join( - ProcessingJob, - AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) + .scalar() + ) + + if "estimate_transmission" != processing_program: + self.log.info( + f"Skipping strategy trigger: processing program {processing_program} not supported" ) - .filter(ProcessingJob.dataCollectionId == parameters.dcid) - .filter(AutoProcProgram.processingPrograms == "UDC strategy") - .all() + return {"success": True} + + status = { + "ntry": 0, + } + backoff_delay = parameters.backoff_delay.get( + parameters.beamline, parameters.backoff_delay["default"] + ) + backoff_multiplier = parameters.backoff_multiplier.get( + parameters.beamline, parameters.backoff_multiplier["default"] + ) + backoff_max_try = parameters.backoff_max_try.get( + parameters.beamline, parameters.backoff_max_try["default"] ) - if udc_strategy_previously_triggered: + if isinstance(message, dict): + status.update(message.get("trigger-status", {})) + status["ntry"] += 1 + message_delay = int(backoff_delay * backoff_multiplier ** status["ntry"]) + if status["ntry"] > backoff_max_try: self.log.info( - f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}." + f"Skipping strategy trigger: maximum number of retries exceeded for dcid={parameters.dcid}" ) return {"success": True} @@ -2941,17 +2971,79 @@ def trigger_strategy( AutoProcProgram, AutoProcProgram.autoProcProgramId == AutoProc.autoProcProgramId, ) - .filter(AutoProcProgram.autoProcProgramId == parameters.program_id) + .join( + ProcessingJob, + AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + ) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) .filter(AutoProcScalingStatistics.scalingStatisticsType == "overall") - .scalar() + .all() + ) + + self.log.info( + f"Strategy trigger: resolution estimate from ispyb for dcid={parameters.dcid} is {resolution}" + ) + if resolution is None: + # Send results to myself for next round of processing + self.log.info( + f"Waiting for a transmission recommendation for dcid={parameters.dcid}" + ) + rw.checkpoint( + { + "trigger-status": status, + }, + delay=message_delay, + ) + return {"success": True} + + min_resolution = min(resolution, key=lambda x: x[0])[0] + self.log.info( + f"Strategy trigger: found minumum resolution {min_resolution} for dcid={parameters.dcid}" + ) + + udc_strategy_previously_triggered = ( + session.query(AutoProcProgram.processingPrograms) + .join( + ProcessingJob, + AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + ) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) + .filter(AutoProcProgram.processingPrograms == "UDC strategy") + .all() ) + if udc_strategy_previously_triggered: + self.log.info( + f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}." + ) + return {"success": True} + if not resolution: self.log.info( f"Skipping strategy trigger: no resolution estimate found for dcid={parameters.dcid} auto_proc_program_id={parameters.program_id}" ) return {"success": True} + # Trigger service will be triggered by estimate_transmission therefore can se autoprocId + transmission = ( + session.query(ScreeningStrategy.transmission) + .join( + ScreeningOutput, + ScreeningOutput.screeningOutputId + == ScreeningStrategy.screeningOutputId, + ) + .join(Screening, Screening.screeningId == ScreeningOutput.screeningId) + .filter(Screening.dataCollectionId == parameters.dcid) + .filter(ScreeningStrategy.program == "estimate_transmission") + .scalar() + ) + + if not transmission: + self.log.info( + f"Skipping strategy trigger: no transmission recommendation found for dcid={parameters.dcid}" + ) + return {"success": True} + jp = self.ispyb.mx_processing.get_job_params() jp["comments"] = parameters.comment jp["datacollectionid"] = parameters.dcid @@ -2963,8 +3055,9 @@ def trigger_strategy( strategy_parameters = { "beamline": parameters.beamline, - "resolution": resolution, + "resolution": min_resolution, "wavelength": parameters.wavelength, + "transmission_estimate": transmission, } for key, value in strategy_parameters.items(): diff --git a/src/dlstbx/wrapper/estimate_transmission.py b/src/dlstbx/wrapper/estimate_transmission.py index 6ff52a6d7..31b820193 100644 --- a/src/dlstbx/wrapper/estimate_transmission.py +++ b/src/dlstbx/wrapper/estimate_transmission.py @@ -20,6 +20,67 @@ class EstimateTransmissionWrapper(Wrapper): _logger_name = "dlstbx.wrap.estimate_transmission" + def collect_ispyb_command_list(self, transmission, max_pixel_count_pct): + if max_pixel_count_pct < 0.7: + warning_level = 0 + elif max_pixel_count_pct < 0.85: + warning_level = 1 + else: + warning_level = 2 + + warning_message = { + 0: "Diffraction spots are unlikely to have detector count rate issues", + 1: "Some diffraction spots may have detector count rate issues", + 2: "Some diffraction spots are likely to have detector count rate issues", + }.get(warning_level) + + warning_description = f"The most intense pixel is {max_pixel_count_pct * 100}% of the detector's limit" + warning_severity = {0: "INFO", 1: "WARNING", 2: "ERROR"}.get(warning_level) + + # Step 0: Add a program message about the count rate warning, + # store the autoproc program + ispyb_command_list = [] + d = { + "ispyb_command": "add_program_message", + "program_id": "$ispyb_autoprocprogram_id", + "message": warning_message, + "description": warning_description, + "severity": warning_severity, + } + ispyb_command_list.append(d) + + # Step 1: Create screeningOutput record for recipe, linked to the screeningId + # Keep the screeningOutputId + d = { + "program": "estimate_transmission", + "strategysuccess": 1, + "ispyb_command": "insert_screening_output", + "screening_id": "$ispyb_screening_id", + "store_result": "ispyb_screening_output_id", + } + ispyb_command_list.append(d) + + # Step 2: Store screeningStrategy results, linked to the screeningOutputId + # Keep the screeningStrategyId + d = { + "program": "estimate_transmission", + "ispyb_command": "insert_screening_strategy", + "transmission": transmission, + "screening_output_id": "$ispyb_screening_output_id", + } + ispyb_command_list.append(d) + + d = { + "ispyb_command": "update_processing_status", + "program_id": "$ispyb_autoprocprogram_id", + "message": "Processing successful", + "status": "success", + } + ispyb_command_list.append(d) + + self.log.info("Sending %s", str(ispyb_command_list)) + self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + def run(self): """Entrypoint for the estimate_transmission wrapper. @@ -87,43 +148,11 @@ def run(self): ) scale_factor = target_countrate_pct / pixel_countrate_pct - scaled_transmission = min(1, (transmission * scale_factor) / 100) + scaled_transmission = min(100, (transmission * scale_factor)) self.log.info(f"Scaled transmission is : {scaled_transmission}") - self.recwrap.send_to( - "strategy", - {"parameters": {"scaled_transmission": float(scaled_transmission)}}, - ) - max_pixel_count_pct = int(num_counts[-1]) / trusted_range - if max_pixel_count_pct < 0.7: - warning_level = 0 - elif max_pixel_count_pct < 0.85: - warning_level = 1 - else: - warning_level = 2 - - warning_message = { - 0: "Diffraction spots are unlikely to have detector count rate issues", - 1: "Some diffraction spots may have detector count rate issues", - 2: "Some diffraction spots are likely to have detector count rate issues", - }.get(warning_level) - - warning_description = f"The most intense pixel is {max_pixel_count_pct * 100}% of the detector's limit" - warning_severity = ({0: "INFO", 1: "WARNING", 2: "ERROR"}.get(warning_level),) - - ispyb_command_list = [ - { - "ispyb_command": "add_program_message", - "program_id": "$ispyb_autoprocprogram_id", - "message": warning_message, - "description": warning_description, - "severity": warning_severity, - } - ] - - self.log.info("Sending %s", str(ispyb_command_list)) - self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + self.collect_ispyb_command_list(scaled_transmission, max_pixel_count_pct) results_directory.mkdir(parents=True, exist_ok=True) output_file = "dials.find_spots.log" @@ -140,6 +169,21 @@ def run(self): self.save_plot(num_counts, num_pixels, results_directory) self.save_hist_to_json(counts_hist, trusted_range, results_directory) + output_files = { + "pixel_intensities.png": "result", + "pixel_counts.json": "result", + output_file: "log", + } + for file, filetype in output_files.items(): + self.record_result_individual_file( + { + "file_path": str(results_directory), + "file_name": file, + "file_type": filetype, + } + ) + + self.recwrap.send_to("trigger", {}) self.log.info("Done.") return True