Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/dlstbx/mimas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ def handle_characterization(
source="automatic",
displayname="align_crystal",
),
mimas.MimasISPyBJobInvocation(
DCID=scenario.DCID,
autostart=True,
recipe="strategy-estimate-transmission",
source="automatic",
displayname="estimate_transmission",
),
]


Expand Down
7 changes: 5 additions & 2 deletions src/dlstbx/services/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ def generate_strategy(
parameters = ChainMapWithReplacement(
message.get("parameters", {}) if isinstance(message, dict) else {},
recipe_params.get("ispyb_parameters", {}),
recipe_params,
substitutions=rw.environment,
)
self.log.info(f"Received parameters for strategy generation:\n{parameters}")
Expand All @@ -215,6 +214,11 @@ def generate_strategy(
if isinstance(parameters["resolution"], list)
else float(parameters["resolution"])
)
recommended_max_transmission = (
float(parameters["transmission_estimate"][0])
if isinstance(parameters["transmission_estimate"], list)
else float(parameters.get("transmission_estimate", 100))
) / 100
dc_transmission = float(parameters.get("transmission", 100)) / 100
resolution_offset = 0.5
min_resolution = 0.9
Expand All @@ -232,7 +236,6 @@ def generate_strategy(
)
beamline_config = parse_config_file(beamline_config_file)

recommended_max_transmission = parameters.get("scaled_transmission", 1.0)
base_recipe_home = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes")
agamemnon_recipe_config = base_recipe_home / "recipe_config.yaml"
agamemnon_limits: dict[str, AgamemnonLimits] = parse_agamemnon_config(
Expand Down
123 changes: 108 additions & 15 deletions src/dlstbx/services/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
ProcessingJob,
Proposal,
Protein,
Screening,
ScreeningOutput,
ScreeningStrategy,
)
from sqlalchemy import or_
from sqlalchemy.orm import Load, contains_eager, joinedload
Expand Down Expand Up @@ -292,6 +295,15 @@ class StrategyParameters(pydantic.BaseModel):
experiment_type: str
program_id: int = pydantic.Field(gt=0)
wavelength: float = pydantic.Field(gt=0)
backoff_delay: Dict[str, float] = pydantic.Field(
default={"default": 8}, alias="backoff-delay"
)
backoff_max_try: Dict[str, int] = pydantic.Field(
default={"default": 10}, alias="backoff-max-try"
)
backoff_multiplier: Dict[str, float] = pydantic.Field(
default={"default": 2}, alias="backoff-multiplier"
)


class DLSTrigger(CommonService):
Expand Down Expand Up @@ -2225,9 +2237,7 @@ def trigger_multiplex(
message = {"recipes": [], "parameters": {"ispyb_process": jobid}}
rw.transport.send("processing_recipe", message)

self.log.info(
f"xia2.multiplex trigger: Processing job {jobid} triggered"
)
self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered")

return {"success": True, "return_value": jobids}

Expand Down Expand Up @@ -2894,6 +2904,7 @@ def trigger_align_crystal(
def trigger_strategy(
self,
rw: workflows.recipe.RecipeWrapper,
message: Dict,
*,
parameters: StrategyParameters,
session: sqlalchemy.orm.session.Session,
Expand All @@ -2911,20 +2922,39 @@ def trigger_strategy(
)
return {"success": True}

udc_strategy_previously_triggered = (
# Check that the processing program is from estimate_transmission, if not skip strategy trigger
processing_program = (
session.query(AutoProcProgram.processingPrograms)
.join(
ProcessingJob,
AutoProcProgram.processingJobId == ProcessingJob.processingJobId,
.filter(AutoProcProgram.autoProcProgramId == parameters.program_id)
.scalar()
)

if "estimate_transmission" != processing_program:
self.log.info(
f"Skipping strategy trigger: processing program {processing_program} not supported"
)
.filter(ProcessingJob.dataCollectionId == parameters.dcid)
.filter(AutoProcProgram.processingPrograms == "UDC strategy")
.all()
return {"success": True}

status = {
"ntry": 0,
}
backoff_delay = parameters.backoff_delay.get(
parameters.beamline, parameters.backoff_delay["default"]
)
backoff_multiplier = parameters.backoff_multiplier.get(
parameters.beamline, parameters.backoff_multiplier["default"]
)
backoff_max_try = parameters.backoff_max_try.get(
parameters.beamline, parameters.backoff_max_try["default"]
)

if udc_strategy_previously_triggered:
if isinstance(message, dict):
status.update(message.get("trigger-status", {}))
status["ntry"] += 1
message_delay = int(backoff_delay * backoff_multiplier ** status["ntry"])
if status["ntry"] > backoff_max_try:
self.log.info(
f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}."
f"Skipping strategy trigger: maximum number of retries exceeded for dcid={parameters.dcid}"
)
return {"success": True}

Expand All @@ -2941,17 +2971,79 @@ def trigger_strategy(
AutoProcProgram,
AutoProcProgram.autoProcProgramId == AutoProc.autoProcProgramId,
)
.filter(AutoProcProgram.autoProcProgramId == parameters.program_id)
.join(
ProcessingJob,
AutoProcProgram.processingJobId == ProcessingJob.processingJobId,
)
.filter(ProcessingJob.dataCollectionId == parameters.dcid)
.filter(AutoProcScalingStatistics.scalingStatisticsType == "overall")
.scalar()
.all()
)

self.log.info(
f"Strategy trigger: resolution estimate from ispyb for dcid={parameters.dcid} is {resolution}"
)
if resolution is None:
# Send results to myself for next round of processing
self.log.info(
f"Waiting for a transmission recommendation for dcid={parameters.dcid}"
)
rw.checkpoint(
{
"trigger-status": status,
},
delay=message_delay,
)
return {"success": True}

min_resolution = min(resolution, key=lambda x: x[0])[0]
self.log.info(
f"Strategy trigger: found minumum resolution {min_resolution} for dcid={parameters.dcid}"
)

udc_strategy_previously_triggered = (
session.query(AutoProcProgram.processingPrograms)
.join(
ProcessingJob,
AutoProcProgram.processingJobId == ProcessingJob.processingJobId,
)
.filter(ProcessingJob.dataCollectionId == parameters.dcid)
.filter(AutoProcProgram.processingPrograms == "UDC strategy")
.all()
)

if udc_strategy_previously_triggered:
self.log.info(
f"Skipping strategy trigger: UDC Strategy has already been triggered for dcid={parameters.dcid}."
)
return {"success": True}

if not resolution:
self.log.info(
f"Skipping strategy trigger: no resolution estimate found for dcid={parameters.dcid} auto_proc_program_id={parameters.program_id}"
)
return {"success": True}

# Trigger service will be triggered by estimate_transmission therefore can se autoprocId
transmission = (
session.query(ScreeningStrategy.transmission)
.join(
ScreeningOutput,
ScreeningOutput.screeningOutputId
== ScreeningStrategy.screeningOutputId,
)
.join(Screening, Screening.screeningId == ScreeningOutput.screeningId)
.filter(Screening.dataCollectionId == parameters.dcid)
.filter(ScreeningStrategy.program == "estimate_transmission")
.scalar()
)

if not transmission:
self.log.info(
f"Skipping strategy trigger: no transmission recommendation found for dcid={parameters.dcid}"
)
return {"success": True}

jp = self.ispyb.mx_processing.get_job_params()
jp["comments"] = parameters.comment
jp["datacollectionid"] = parameters.dcid
Expand All @@ -2963,8 +3055,9 @@ def trigger_strategy(

strategy_parameters = {
"beamline": parameters.beamline,
"resolution": resolution,
"resolution": min_resolution,
"wavelength": parameters.wavelength,
"transmission_estimate": transmission,
}

for key, value in strategy_parameters.items():
Expand Down
112 changes: 78 additions & 34 deletions src/dlstbx/wrapper/estimate_transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,67 @@ class EstimateTransmissionWrapper(Wrapper):

_logger_name = "dlstbx.wrap.estimate_transmission"

def collect_ispyb_command_list(self, transmission, max_pixel_count_pct):
if max_pixel_count_pct < 0.7:
warning_level = 0
elif max_pixel_count_pct < 0.85:
warning_level = 1
else:
warning_level = 2

warning_message = {
0: "Diffraction spots are unlikely to have detector count rate issues",
1: "Some diffraction spots may have detector count rate issues",
2: "Some diffraction spots are likely to have detector count rate issues",
}.get(warning_level)

warning_description = f"The most intense pixel is {max_pixel_count_pct * 100}% of the detector's limit"
warning_severity = {0: "INFO", 1: "WARNING", 2: "ERROR"}.get(warning_level)

# Step 0: Add a program message about the count rate warning,
# store the autoproc program
ispyb_command_list = []
d = {
"ispyb_command": "add_program_message",
"program_id": "$ispyb_autoprocprogram_id",
"message": warning_message,
"description": warning_description,
"severity": warning_severity,
}
ispyb_command_list.append(d)

# Step 1: Create screeningOutput record for recipe, linked to the screeningId
# Keep the screeningOutputId
d = {
"program": "estimate_transmission",
"strategysuccess": 1,
"ispyb_command": "insert_screening_output",
"screening_id": "$ispyb_screening_id",
"store_result": "ispyb_screening_output_id",
}
ispyb_command_list.append(d)

# Step 2: Store screeningStrategy results, linked to the screeningOutputId
# Keep the screeningStrategyId
d = {
"program": "estimate_transmission",
"ispyb_command": "insert_screening_strategy",
"transmission": transmission,
"screening_output_id": "$ispyb_screening_output_id",
}
ispyb_command_list.append(d)

d = {
"ispyb_command": "update_processing_status",
"program_id": "$ispyb_autoprocprogram_id",
"message": "Processing successful",
"status": "success",
}
ispyb_command_list.append(d)

self.log.info("Sending %s", str(ispyb_command_list))
self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list})

def run(self):
"""Entrypoint for the estimate_transmission wrapper.

Expand Down Expand Up @@ -87,43 +148,11 @@ def run(self):
)
scale_factor = target_countrate_pct / pixel_countrate_pct

scaled_transmission = min(1, (transmission * scale_factor) / 100)
scaled_transmission = min(100, (transmission * scale_factor))
self.log.info(f"Scaled transmission is : {scaled_transmission}")

self.recwrap.send_to(
"strategy",
{"parameters": {"scaled_transmission": float(scaled_transmission)}},
)

max_pixel_count_pct = int(num_counts[-1]) / trusted_range
if max_pixel_count_pct < 0.7:
warning_level = 0
elif max_pixel_count_pct < 0.85:
warning_level = 1
else:
warning_level = 2

warning_message = {
0: "Diffraction spots are unlikely to have detector count rate issues",
1: "Some diffraction spots may have detector count rate issues",
2: "Some diffraction spots are likely to have detector count rate issues",
}.get(warning_level)

warning_description = f"The most intense pixel is {max_pixel_count_pct * 100}% of the detector's limit"
warning_severity = ({0: "INFO", 1: "WARNING", 2: "ERROR"}.get(warning_level),)

ispyb_command_list = [
{
"ispyb_command": "add_program_message",
"program_id": "$ispyb_autoprocprogram_id",
"message": warning_message,
"description": warning_description,
"severity": warning_severity,
}
]

self.log.info("Sending %s", str(ispyb_command_list))
self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list})
self.collect_ispyb_command_list(scaled_transmission, max_pixel_count_pct)

results_directory.mkdir(parents=True, exist_ok=True)
output_file = "dials.find_spots.log"
Expand All @@ -140,6 +169,21 @@ def run(self):
self.save_plot(num_counts, num_pixels, results_directory)
self.save_hist_to_json(counts_hist, trusted_range, results_directory)

output_files = {
"pixel_intensities.png": "result",
"pixel_counts.json": "result",
output_file: "log",
}
for file, filetype in output_files.items():
self.record_result_individual_file(
{
"file_path": str(results_directory),
"file_name": file,
"file_type": filetype,
}
)

self.recwrap.send_to("trigger", {})
self.log.info("Done.")
return True

Expand Down