Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 10 additions & 98 deletions src/apps/competitions/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import json
import os
import re
import traceback
Expand All @@ -9,13 +8,14 @@
from io import BytesIO
from tempfile import TemporaryDirectory, NamedTemporaryFile

import urllib
# import json
# import urllib

import oyaml as yaml
import requests
from celery._state import app_or_default
from django.conf import settings
from django_redis import get_redis_connection
# from django_redis import get_redis_connection
from django.core.exceptions import ObjectDoesNotExist
from django.core.files.base import ContentFile
from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F
Expand All @@ -24,10 +24,10 @@
from django.utils.timezone import now
from rest_framework.exceptions import ValidationError

from celery_config import app, app_for_vhost
from celery_config import app # , app_for_vhost
from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \
CompetitionDump, Phase
from queues.models import Queue
# from queues.models import Queue
from competitions.unpackers.utils import CompetitionUnpackingException
from competitions.unpackers.v1 import V15Unpacker
from competitions.unpackers.v2 import V2Unpacker
Expand All @@ -41,7 +41,7 @@

import logging

from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY, extract_queue_names, is_compute_worker, known_compute_queue_names
# from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY, extract_queue_names, is_compute_worker, known_compute_queue_names
logger = logging.getLogger(__name__)

COMPETITION_FIELDS = [
Expand Down Expand Up @@ -187,6 +187,10 @@ def _send_to_compute_worker(submission, is_scoring):
if is_scoring:
run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name)

if not submission.data:
logger.error("Submission %s has no data file; marking as failed.", submission.pk)
submission.cancel(status=Submission.FAILED)
return
run_args['submission_data'] = make_url_sassy(path=submission.data.data_file.name)

if not is_scoring:
Expand Down Expand Up @@ -815,95 +819,3 @@ def _broadcast_worker_state(payload):
"worker": payload,
},
)


@app.task(queue="site-worker", soft_time_limit=120)
def refresh_compute_worker_health():
celery_app = app
r = get_redis_connection("default")
known_queue_names = known_compute_queue_names()
broker_sources = []
broker_sources.append(("default", celery_app.conf.broker_url, celery_app))

private_queues = (
Queue.objects.filter(competitions__isnull=False)
.exclude(name__isnull=True)
.exclude(name="")
.distinct()
)
for queue in private_queues:
if not queue.broker_url:
continue
parsed = urllib.parse.urlparse(queue.broker_url)
vhost = parsed.path
broker_url = urllib.parse.urljoin(celery_app.conf.broker_url, vhost)
broker_sources.append((queue.name, broker_url, app_for_vhost(vhost)))

inspected_brokers = set()
for source_name, broker_url, broker_app in broker_sources:
if broker_url in inspected_brokers:
continue
inspected_brokers.add(broker_url)

try:
# timeout=5 : 4 appels × 5s × N brokers
inspector = broker_app.control.inspect(timeout=5)
if inspector is None:
logger.warning(
"Celery inspect returned None for broker=%s", source_name
)
continue
stats = inspector.stats() or {}
active = inspector.active() or {}
reserved = inspector.reserved() or {}
active_queues = inspector.active_queues() or {}
except Exception:
logger.exception(
"Unable to inspect Celery workers for broker %s", source_name
)
continue

for worker_name in stats.keys():
queues = active_queues.get(worker_name, []) or []
queue_names = extract_queue_names(queues)
if not is_compute_worker(worker_name, queue_names, known_queue_names):
continue

running_jobs = len(active.get(worker_name, [])) + len(
reserved.get(worker_name, [])
)
status = "busy" if running_jobs > 0 else "available"
payload = {
"hostname": worker_name,
"status": status,
"running_jobs": running_jobs,
"timestamp": now().timestamp(),
"queue_source": source_name,
"queue_names": sorted(queue_names),
}
heartbeat_key = f"worker:{source_name}:{worker_name}:heartbeat"
r.set(heartbeat_key, json.dumps(payload), ex=WORKER_HEARTBEAT_TTL)
r.hset(
WORKERS_REGISTRY_KEY,
f"{source_name}:{worker_name}",
json.dumps(
{
"hostname": worker_name,
"status": status,
"running_jobs": running_jobs,
"last_seen": payload["timestamp"],
"queue_source": source_name,
"queue_names": sorted(queue_names),
}
),
)
_broadcast_worker_state(payload)
# Logs about CW health HERE
# logger.info(
# "[WORKER-HEALTH] source=%s worker=%s status=%s jobs=%d queues=%s",
# source_name,
# worker_name,
# status,
# running_jobs,
# sorted(queue_names),
# )
2 changes: 2 additions & 0 deletions src/static/riot/competitions/detail/_header.tag
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
Migrate
</button>

<!--
<worker-monitor-toggle
if="{competition.admin}"
can_view_workers_panel="true"
competition_id="{ competition.id }">
</worker-monitor-toggle>
-->

</div>
<div class="row">
Expand Down
6 changes: 3 additions & 3 deletions version.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"tag_name": "v1.27",
"release_name": "v1.27",
"html_url": "https://github.com/codalab/codabench/releases/tag/v1.27"
"tag_name": "v1.27.1",
"release_name": "v1.27.1",
"html_url": "https://github.com/codalab/codabench/releases/tag/v1.27.1"
}
Loading