Skip to content

feature: add critical path detection #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,3 +861,166 @@ def detect_pattern(
patterns.append(match_original)

return patterns

def critical_path_analysis(self):
self._match_events()

instant_recv_events = ["MpiRecv", "MpiIrecv"]
recv_events = ["MPI_Recv", "MPI_Irecv"]
send_events = ["MPI_Send", "MPI_Isend"]
collective_functions = [
"MPI_Allgather",
"MPI_Allgatherv",
"MPI_Allreduce",
"MPI_Alltoall",
"MPI_Alltoallv",
"MPI_Alltoallw",
"MPI_Barrier",
"MPI_Bcast",
"MPI_Gather",
"MPI_Gatherv",
"MPI_Iallgather",
"MPI_Iallreduce",
"MPI_Ibarrier",
"MPI_Ibcast",
"MPI_Igather",
"MPI_Igatherv",
"MPI_Ireduce",
"MPI_Iscatter",
"MPI_Iscatterv",
"MPI_Reduce",
"MPI_Scatter",
"MPI_Scatterv",
"MPI_Exscan",
"MPI_Op_create",
"MPI_Op_free",
"MPI_Reduce_local",
"MPI_Reduce_scatter",
"MPI_Scan",
"MPI_File_iread_at_all",
"MPI_File_iwrite_at_all",
"MPI_File_iread_all",
"MPI_File_iwrite_all",
"MPI_File_read_all_begin",
"MPI_File_write_all_begin",
"MPI_File_write_all_end",
"MPI_File_close",
]

critical_paths = []
critical_path = []
last_event = None
leave_events = self.events[(self.events["Event Type"] == "Leave")]
num_of_processes = leave_events["Process"].astype(int).max()

if "MPI_Finalize" in self.events["Name"].values:
last_event = self.events[
(self.events["Event Type"] == "Leave")
& (self.events["Name"] == "MPI_Finalize")
].iloc[-1]
else:
last_event = self.events[self.events["Event Type"] == "Leave"].iloc[-1]
last_name = last_event["Name"]
last_process = last_event["Process"]
last_timestamp = last_event["Timestamp (ns)"]
critical_path.append(last_event)
after_recieve = False
after_collective = False
# Main loop to trace back
while True:
# Filter for events from the same process before the last timestamp
candidate_events = leave_events[
(leave_events["Process"] == last_event["Process"])
& (leave_events["Timestamp (ns)"] < last_event["Timestamp (ns)"])
]

# obtain the latest function after the collective function call.
# we basically do the something similar to starting with MPI_Finalize
# but this time we use a different function.
if after_collective:
candidate_name = candidate_events.iloc[-1]["Name"]
candidate_events = leave_events[
(leave_events["Name"] == candidate_name)
& (leave_events["Timestamp (ns)"] < last_event["Timestamp (ns)"])
]

# No more events to trace back from.
if candidate_events.empty:
break

# Select the last event from the candidates if
# we the previous event is not a receive.
if not after_recieve:
last_event = candidate_events.iloc[-1]
critical_path.append(last_event)

# If we continue after a receive function.
if last_event["Name"] in recv_events:
# Get the corresponding instant event for the recv function.
last_instant_event = self.events[
(self.events["Event Type"] == "Instant")
& (self.events["Timestamp (ns)"] < last_timestamp)
& (self.events["Process"] == last_process)
& (self.events["Name"].isin(instant_recv_events))
]
# Sometimes recv function have some instant events which
# do not include the sender information. We ignore them.
if last_instant_event.empty:
continue
else:
last_instant_event = last_instant_event.iloc[-1]

# Get the corresponding send event.
last_event = self.events[
(self.events["Timestamp (ns)"] < last_timestamp)
& (
self.events["Process"]
== last_instant_event["Attributes"]["sender"]
)
& (
self.events["Name"].isin(send_events)
& (self.events["Event Type"] == "Leave")
)
].iloc[-1]
last_timestamp = last_event["Timestamp (ns)"]
last_process = last_event["Process"]

after_receive = True
after_collective = False
critical_path.append(last_event)
pass

# Restart the detection after a collective function.
if last_event["Name"] in collective_functions:
critical_paths.append(critical_path)
critical_path = []
after_collective = True
after_receive = False

start_times = []
check_if_done = leave_events[leave_events["Name"] == last_event["Name"]]
for start_time in check_if_done.iloc[0 : num_of_processes + 1][
"Timestamp (ns)"
]:
start_times.append(start_time)

# Exit if we have traced back to the beginning
leave_events = leave_events.reset_index(drop=True)
if (
leave_events[
(leave_events["Timestamp (ns)"] == last_event["Timestamp (ns)"])
].index
<= num_of_processes + 1
):
if (
last_event["Name"] == leave_events.iloc[0]["Name"]
and last_event["Timestamp (ns)"] in start_times
):
critical_paths.append(critical_path)
break

critical_dfs = []
for critical_path in critical_paths:
if len(critical_path) > 1:
critical_dfs.append(pd.DataFrame(critical_path))
return critical_dfs