Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions src/dvsim/launcher/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self, deploy) -> None:

# Popen object when launching the job.
self.process = None
self._log_file = None
self.slurm_log_file = f"{self.job_spec.log_path}.slurm"

def _do_launch(self) -> None:
Expand All @@ -57,34 +58,35 @@ def _do_launch(self) -> None:
slurm_setup_cmd += ";"

# Encapsulate the run command with the slurm invocation
full_cmd = f"{slurm_setup_cmd} {self.job_spec.cmd}".strip()
slurm_cmd = (
f"srun -p {SLURM_QUEUE} --mem={SLURM_MEM} --mincpus={SLURM_MINCPUS} "
f"srun -p {SLURM_QUEUE} --mem={SLURM_MEM} "
f"--mincpus={SLURM_MINCPUS} "
f"--time={SLURM_TIMEOUT} --cpus-per-task={SLURM_CPUS_PER_TASK} "
f'bash -c "{slurm_setup_cmd} {self.job_spec.cmd}"'
f"bash -c {shlex.quote(full_cmd)}"
)

try:
with pathlib.Path(self.slurm_log_file).open("w") as out_file:
out_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n")
out_file.flush()

log.info(f"Executing slurm command: {slurm_cmd}")
self.process = subprocess.Popen(
shlex.split(slurm_cmd),
bufsize=4096,
universal_newlines=True,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I'm missing something - do we not still need universal_newlines=True (or text=True) since we're trying to write to a log file opened in w mode and not wb?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review @AlexJones0.
Slurm works after applying the changes in this PR (at least for me). So I guess anyway it is better then what we have now that fails on compilation.
As for the note: honestly, AI claimed it is not needed (see explanation below) and indeed it worked fine without it. But I added it again just in case.
"When Popen redirects stdout/stderr to a file object, it uses dup2 on the underlying file descriptor — the child process writes directly to the fd, bypassing the Python file object entirely. So text vs binary mode on the Python side doesn't matter".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks. I think you are right that in this case it doesn't matter, albeit the subprocess docs aren't the most clear about this detail 😅

stdout=out_file,
stderr=out_file,
env=exports,
)
self._log_file = pathlib.Path(self.slurm_log_file).open("w")
self._log_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n")
self._log_file.flush()

log.info(f"Executing slurm command: {slurm_cmd}")
self.process = subprocess.Popen(
shlex.split(slurm_cmd),
text=True,
stdout=self._log_file,
stderr=self._log_file,
env=exports,
)
except OSError as e:
self._close_log_file()
msg = f"File Error: {e}\nError while handling {self.slurm_log_file}"
raise LauncherError(msg)
except subprocess.SubprocessError as e:
self._close_log_file()
msg = f"IO Error: {e}\nSee {self.job_spec.log_path}"
raise LauncherError(msg)
finally:
self._close_process()

def poll(self) -> JobStatus:
"""Check status of the running process.
Expand Down Expand Up @@ -139,19 +141,24 @@ def kill(self) -> None:
except subprocess.TimeoutExpired:
self.process.kill()
self._post_finish(
JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[])
JobStatus.KILLED,
ErrorMessage(
line_number=None,
message="Job killed!",
context=[],
),
)

def _post_finish(self, status, err_msg) -> None:
self._close_log_file()
super()._post_finish(status, err_msg)
self._close_process()
self.process = None

def _close_process(self) -> None:
"""Close the file descriptors associated with the process."""
assert self.process
if self.process.stdout:
self.process.stdout.close()
def _close_log_file(self) -> None:
"""Close the log file if it is open."""
if self._log_file:
self._log_file.close()
self._log_file = None

@staticmethod
def prepare_workspace(cfg: "WorkspaceConfig") -> None:
Expand Down
Loading