diff --git a/pipit/trace.py b/pipit/trace.py index 2b4f111a..6db60d94 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -861,3 +861,166 @@ def detect_pattern( patterns.append(match_original) return patterns + + def critical_path_analysis(self): + self._match_events() + + instant_recv_events = ["MpiRecv", "MpiIrecv"] + recv_events = ["MPI_Recv", "MPI_Irecv"] + send_events = ["MPI_Send", "MPI_Isend"] + collective_functions = [ + "MPI_Allgather", + "MPI_Allgatherv", + "MPI_Allreduce", + "MPI_Alltoall", + "MPI_Alltoallv", + "MPI_Alltoallw", + "MPI_Barrier", + "MPI_Bcast", + "MPI_Gather", + "MPI_Gatherv", + "MPI_Iallgather", + "MPI_Iallreduce", + "MPI_Ibarrier", + "MPI_Ibcast", + "MPI_Igather", + "MPI_Igatherv", + "MPI_Ireduce", + "MPI_Iscatter", + "MPI_Iscatterv", + "MPI_Reduce", + "MPI_Scatter", + "MPI_Scatterv", + "MPI_Exscan", + "MPI_Op_create", + "MPI_Op_free", + "MPI_Reduce_local", + "MPI_Reduce_scatter", + "MPI_Scan", + "MPI_File_iread_at_all", + "MPI_File_iwrite_at_all", + "MPI_File_iread_all", + "MPI_File_iwrite_all", + "MPI_File_read_all_begin", + "MPI_File_write_all_begin", + "MPI_File_write_all_end", + "MPI_File_close", + ] + + critical_paths = [] + critical_path = [] + last_event = None + leave_events = self.events[(self.events["Event Type"] == "Leave")] + num_of_processes = leave_events["Process"].astype(int).max() + + if "MPI_Finalize" in self.events["Name"].values: + last_event = self.events[ + (self.events["Event Type"] == "Leave") + & (self.events["Name"] == "MPI_Finalize") + ].iloc[-1] + else: + last_event = self.events[self.events["Event Type"] == "Leave"].iloc[-1] + last_name = last_event["Name"] + last_process = last_event["Process"] + last_timestamp = last_event["Timestamp (ns)"] + critical_path.append(last_event) + after_recieve = False + after_collective = False + # Main loop to trace back + while True: + # Filter for events from the same process before the last timestamp + candidate_events = leave_events[ + (leave_events["Process"] == last_event["Process"]) + & (leave_events["Timestamp (ns)"] < last_event["Timestamp (ns)"]) + ] + + # obtain the latest function after the collective function call. + # we basically do the something similar to starting with MPI_Finalize + # but this time we use a different function. + if after_collective: + candidate_name = candidate_events.iloc[-1]["Name"] + candidate_events = leave_events[ + (leave_events["Name"] == candidate_name) + & (leave_events["Timestamp (ns)"] < last_event["Timestamp (ns)"]) + ] + + # No more events to trace back from. + if candidate_events.empty: + break + + # Select the last event from the candidates if + # we the previous event is not a receive. + if not after_recieve: + last_event = candidate_events.iloc[-1] + critical_path.append(last_event) + + # If we continue after a receive function. + if last_event["Name"] in recv_events: + # Get the corresponding instant event for the recv function. + last_instant_event = self.events[ + (self.events["Event Type"] == "Instant") + & (self.events["Timestamp (ns)"] < last_timestamp) + & (self.events["Process"] == last_process) + & (self.events["Name"].isin(instant_recv_events)) + ] + # Sometimes recv function have some instant events which + # do not include the sender information. We ignore them. + if last_instant_event.empty: + continue + else: + last_instant_event = last_instant_event.iloc[-1] + + # Get the corresponding send event. + last_event = self.events[ + (self.events["Timestamp (ns)"] < last_timestamp) + & ( + self.events["Process"] + == last_instant_event["Attributes"]["sender"] + ) + & ( + self.events["Name"].isin(send_events) + & (self.events["Event Type"] == "Leave") + ) + ].iloc[-1] + last_timestamp = last_event["Timestamp (ns)"] + last_process = last_event["Process"] + + after_receive = True + after_collective = False + critical_path.append(last_event) + pass + + # Restart the detection after a collective function. + if last_event["Name"] in collective_functions: + critical_paths.append(critical_path) + critical_path = [] + after_collective = True + after_receive = False + + start_times = [] + check_if_done = leave_events[leave_events["Name"] == last_event["Name"]] + for start_time in check_if_done.iloc[0 : num_of_processes + 1][ + "Timestamp (ns)" + ]: + start_times.append(start_time) + + # Exit if we have traced back to the beginning + leave_events = leave_events.reset_index(drop=True) + if ( + leave_events[ + (leave_events["Timestamp (ns)"] == last_event["Timestamp (ns)"]) + ].index + <= num_of_processes + 1 + ): + if ( + last_event["Name"] == leave_events.iloc[0]["Name"] + and last_event["Timestamp (ns)"] in start_times + ): + critical_paths.append(critical_path) + break + + critical_dfs = [] + for critical_path in critical_paths: + if len(critical_path) > 1: + critical_dfs.append(pd.DataFrame(critical_path)) + return critical_dfs