Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions app/s3df/compute_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
from slurmrestd_client.models.slurm_v0041_post_job_submit_request_jobs_inner_time_limit import (
SlurmV0041PostJobSubmitRequestJobsInnerTimeLimit,
)
from slurmrestd_client.models.slurm_v0041_post_job_submit_request_jobs_inner_memory_per_cpu import (
SlurmV0041PostJobSubmitRequestJobsInnerMemoryPerCpu,
)
from fastapi import HTTPException, Response
from pydantic import ConfigDict, ValidationError

Expand Down Expand Up @@ -86,6 +89,14 @@ class SlurmV0041PostJobSubmitRequestJobStrict(SlurmV0041PostJobSubmitRequestJob)
"STOPPED": JobState.CANCELED,
}

# Map from Slurm partition name → GPU type string for GRES
PARTITION_GPU_TYPE: dict[str, str] = {
"ampere": "a100",
"turing": "geforce_rtx_2080_ti",
"ada": "l40s",
"hopper": "h200",
}


# ---------------------------------------------------------------------------
# JWT minting — IRI signs tokens using the shared key
Expand Down Expand Up @@ -257,44 +268,77 @@ async def submit_job(

# --- resource fields with safe defaults ---
node_count = 1
tasks = None
tasks_per_node = None
cpus_per_task = None
tres_per_task = None
exclusive = ["true"]
memory_per_node = None
duration_mins = 60
partition = None
account = None
reservation = None
environment = ["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"]

name = job_spec.name
executable = job_spec.executable
argv = job_spec.arguments or None
cwd = str(job_spec.directory) if job_spec.directory else None
stdin = job_spec.stdin_path
stdout = job_spec.stdout_path
stderr = job_spec.stderr_path

if job_spec.environment:
environment = [f"{k}={v}" for k, v in job_spec.environment.items()]

if job_spec.resources:
node_count = job_spec.resources.node_count or 1

if job_spec.attributes:
if job_spec.attributes.duration is not None:
duration_mins = max(1, int(job_spec.attributes.duration // 60))
partition = job_spec.attributes.queue_name
account = job_spec.attributes.account
reservation = job_spec.attributes.reservation_id

partition = partition or os.environ.get("SLURM_DEFAULT_PARTITION")
account = account or os.environ.get("SLURM_DEFAULT_ACCOUNT")

if job_spec.resources:
node_count = job_spec.resources.node_count or 1
tasks = job_spec.resources.process_count
tasks_per_node = job_spec.resources.processes_per_node
cpus_per_task = job_spec.resources.cpu_cores_per_process
if job_spec.resources.gpu_cores_per_process:
gpu_type = PARTITION_GPU_TYPE.get(partition or "")
if gpu_type:
tres_per_task = f"gres/gpu:{gpu_type}:{job_spec.resources.gpu_cores_per_process}"
else:
tres_per_task = f"gres/gpu:{job_spec.resources.gpu_cores_per_process}"
if not job_spec.resources.exclusive_node_use:
exclusive = ["false"]
if job_spec.resources.memory:
memory_mb = max(1, job_spec.resources.memory // (1024 * 1024))
Comment thread
swelborn marked this conversation as resolved.
memory_per_node = SlurmV0041PostJobSubmitRequestJobsInnerMemoryPerCpu(set=True, number=memory_mb)

custom_attributes = job_spec.attributes.custom_attributes if job_spec.attributes else {}

try:
slurm_job = SlurmV0041PostJobSubmitRequestJobStrict(
nodes=str(node_count),
tasks=tasks,
tasks_per_node=tasks_per_node,
cpus_per_task=cpus_per_task,
tres_per_task=tres_per_task,
exclusive=exclusive,
memory_per_node=memory_per_node,
time_limit=SlurmV0041PostJobSubmitRequestJobsInnerTimeLimit(set=True, number=duration_mins),
name=name,
script=executable,
argv=argv,
partition=partition,
account=account,
reservation=reservation,
environment=environment,
current_working_directory=cwd,
standard_input=stdin,
standard_output=stdout,
standard_error=stderr,
**custom_attributes
Comment thread
swelborn marked this conversation as resolved.
Expand Down
Loading