diff --git a/pipit/tests/trace.py b/pipit/tests/trace.py index 7731fe61..22038ca6 100644 --- a/pipit/tests/trace.py +++ b/pipit/tests/trace.py @@ -68,6 +68,45 @@ def test_comm_by_process(data_dir, ping_pong_otf2_trace): assert counts.loc[1]["Received"] == 8 +def test_match_charm_messages(ping_pong_projections_trace): + trace = Trace.from_projections(str(ping_pong_projections_trace)) + trace._match_charm_messages() + + df = trace.events + + receive_events = df[df["Attributes"].apply( + lambda x: False if not x else ("Entry Type" in x) and (x["Entry Type"] == "Processing") + )] + + # Filter out unmatched receive events + receive_events = receive_events.loc[receive_events["_matching_message"].notnull()] + + receive_indices = list(receive_events.index) + receive_matching_message = list(receive_events["_matching_message"]) + receive_process = list(receive_events["Process"]) + + + for i in range(len(receive_matching_message)): + receive_index = receive_indices[i] + send_index = receive_matching_message[i] + + if send_index != 200: + corresponding_send = df.iloc[send_index] + + # Ensure that, for each receive event, the corresponding send event has + # that receive event index in its matching message column + # + # Testing if the distance between the two DataFrame indices is <= 17 accounts + # accounts for some weirdness in the trace where some message receives are divided into + # several enter events. Most distances will be 0 (equal indices). + assert abs(corresponding_send["_matching_message"] - receive_index) <= 17 + + # With a few exceptions, messages send and receive on opposite processes + exceptions = [541, 549, 553, 1120, 1124, 1305, 1306, 1320, 1882, 1893, 1894] + if receive_index >= 360 and receive_index not in exceptions: + assert receive_process[i] != corresponding_send["Process"] + + def test_match_events(data_dir, ping_pong_otf2_trace): trace = Trace.from_otf2(str(ping_pong_otf2_trace)) trace._match_events() diff --git a/pipit/trace.py b/pipit/trace.py index 2b4f111a..ad235cf1 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -190,6 +190,151 @@ def _match_events(self): self.events = self.events.astype({"_matching_event": "Int32"}) + + def _match_charm_messages(self): + + if "_matching_message" not in self.events.columns: + self.events["_matching_message"] = None + + if "_matching_timestamp" not in self.events.columns: + self.events["_matching_message_timestamp"] = np.nan + + matching_events = list(self.events["_matching_message"]) + matching_times = list(self.events["_matching_message_timestamp"]) + + charm_events = self.events[self.events["Attributes"].apply( + lambda x: False if not x else ("Entry Type" in x) and (x["Entry Type"] == "Processing" + or x["Entry Type"] == "Create") + )] + + df_indices = list(charm_events.index) + timestamps = list(charm_events["Timestamp (ns)"]) + attrs = list(charm_events["Attributes"]) + + # queue is indexed by event ID and will store (dataframe index, timestamp) from + # message send events + queue = {} + + # Iterate through send/receive events + for i in range(len(charm_events)): + curr_df_index = df_indices[i] + curr_timestamp = timestamps[i] + curr_event_type = attrs[i]["Entry Type"] + curr_event_id = attrs[i]["Event ID"] + + if curr_event_type == "Create": + queue[curr_event_id] = (curr_df_index, curr_timestamp) + elif curr_event_type == "Processing" and curr_event_id in queue: + send_df_index = queue[curr_event_id][0] + send_timestamp = queue[curr_event_id][1] + + matching_events[send_df_index] = curr_df_index + matching_events[curr_df_index] = send_df_index + + matching_times[send_df_index] = curr_timestamp + matching_times[curr_df_index] = send_timestamp + + + self.events["_matching_message"] = matching_events + self.events["_matching_message_timestamp"] = matching_times + + self.events = self.events.astype({"_matching_message": "Int32"}) + + + + + def _match_messages(self): + """ + Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events. + Creates new columns _matching_message and _matching_message_timestamp. + """ + if "_matching_message" not in self.events.columns: + self.events["_matching_message"] = None + + if "_matching_message_timestamp" not in self.events.columns: + self.events["_matching_message_timestamp"] = np.nan + + matching_events = list(self.events["_matching_message"]) + matching_times = list(self.events["_matching_message_timestamp"]) + + # Filter by send/receive events + send_events_names = ["MpiSend", "MpiISend"] + + send_events = self.events[self.events["Name"].isin(send_events_names)] + + receive_events_names = ["MpiRecv", "MpiIrecv"] + + receive_events = self.events[self.events["Name"].isin(receive_events_names)] + + # Queue is a dictionary in which each receiving process (key) will have + # another dictionary of sending processes (key) that will have a list of + # tuple that each contain information about an associated send event + queue: dict[int, dict[int, (int, int)]] = {} + + df_indices = list(send_events.index) + timestamps = list(send_events["Timestamp (ns)"]) + attrs = list(send_events["Attributes"]) + processes = list(send_events["Process"]) + + # First iterate over send events, adding each event's information + # to the receiver's list in the dictionary + for i in range(len(send_events)): + curr_df_index = df_indices[i] + curr_timestamp = timestamps[i] + curr_attrs = attrs[i] + curr_process = processes[i] + + # Add current dataframe index, timestamp, and process to stack + if "receiver" in curr_attrs: + receiving_process = curr_attrs["receiver"] + + # Add receiving process to queue if not already present + if receiving_process not in queue: + queue[receiving_process] = {} + # Add sending process to receiving process's queue + # if not already present + if curr_process not in queue[receiving_process]: + queue[receiving_process][curr_process] = [] + + # Add current dataframe index and timestamp to the correct queue + queue[receiving_process][curr_process].append( + (curr_df_index, curr_timestamp) + ) + + df_indices = list(receive_events.index) + timestamps = list(receive_events["Timestamp (ns)"]) + attrs = list(receive_events["Attributes"]) + processes = list(receive_events["Process"]) + + # Now iterate over receive events + for i in range(len(receive_events)): + curr_df_index = df_indices[i] + curr_timestamp = timestamps[i] + curr_attrs = attrs[i] + curr_process = processes[i] + + if "sender" in curr_attrs: + # send_process = None + send_process = curr_attrs["sender"] + + if len(queue[curr_process][send_process]) > 0: + # Get the corresponding "send" event + send_df_index, send_timestamp = queue[curr_process][ + send_process + ].pop(0) + + # Fill in the lists with the matching values + matching_events[send_df_index] = curr_df_index + matching_events[curr_df_index] = send_df_index + + matching_times[send_df_index] = curr_timestamp + matching_times[curr_df_index] = send_timestamp + + self.events["_matching_message"] = matching_events + self.events["_matching_message_timestamp"] = matching_times + + self.events = self.events.astype({"_matching_message": "Int32"}) + def _match_caller_callee(self): """Matches callers (parents) to callees (children) and adds three columns to the dataframe: