diff --git a/ghmoon b/ghmoon index 61a0df3..87e711a 100755 --- a/ghmoon +++ b/ghmoon @@ -227,7 +227,7 @@ class GHCommit: else: status("error", "Internal error", gist) - return state == "success" + return state, gist class GHRepo: @@ -260,7 +260,10 @@ class GHRepo: if artifact.sha != sha: continue - self.process({ "sha": sha, "artifact": artifact.data }, interactive) + return self.process({ "sha": sha, "artifact": artifact.data }, interactive) + + sys.stderr.write(f"No matching artifact for {sha} in {str(self)}\n") + return "error", None def process(self, job, interactive=True): sha = job["sha"] @@ -271,7 +274,7 @@ class GHRepo: if not c.exists(): sys.stderr.write(f"Ignoring non-existing commit {sha} from {str(self)}\n") - return False + return "error", None c.checkout() @@ -286,12 +289,15 @@ class WorkQueue: self.publish = publish self.paths = { "todo": os.path.join(path, "todo"), + "forced": os.path.join(path, "todo", "forced"), "doing": os.path.join(path, "doing"), "done": os.path.join(path, "done"), } + self.pause_file = os.path.join(path, "paused") + self.result_path = os.path.join(path, "result") - for path in self.paths.values(): - os.makedirs(path, exist_ok=True) + for p in list(self.paths.values()) + [self.result_path]: + os.makedirs(p, exist_ok=True) if manage: for job in os.listdir(self.paths["doing"]): @@ -299,6 +305,21 @@ class WorkQueue: os.rename(os.path.join(self.paths["doing"], job), os.path.join(self.paths["todo"], job)) + def write_result(self, name, state, gist=None, report_log=None, error=None, heading=None): + result = { + "state": state, + "gist": gist, + "report_log": report_log, + "finished_at": time.time(), + "error": error, + "heading": heading, + } + path = os.path.join(self.result_path, f"{name}.json") + tmp = path + ".tmp" + with open(tmp, "w") as f: + f.write(json.dumps(result)) + os.replace(tmp, path) + def _job_paths(self, name): return { state: os.path.join(self.paths[state], f"{name}.json") @@ -329,6 +350,11 @@ class WorkQueue: os.remove(paths["todo"]) sys.stderr.write(f"Ignoring {name}: already processed\n") return False + elif os.path.exists(paths["forced"]): + os.close(fd) + os.remove(paths["todo"]) + sys.stderr.write(f"Ignoring {name}: already forced\n") + return False with os.fdopen(fd, "w") as f: f.write(json.dumps(data)) @@ -348,6 +374,7 @@ class WorkQueue: "type": "process", "repo": str(repo), "sha": artifact.sha, + "heading": repo.api(f"commits/{artifact.sha}")["commit"]["message"].splitlines()[0], "artifact": artifact.data, }) @@ -362,11 +389,11 @@ class WorkQueue: out = subprocess.check_output(["ls", "-rt", "--time=birth", path], text=True) return out.splitlines() - def _dequeue(self): + def _dequeue(self, lane): assert not os.listdir(self.paths["doing"]), "Another job is alredy in progress" - abspaths = [os.path.join(self.paths["todo"], job) - for job in self.listdir(self.paths["todo"])] + entries = [e for e in self.listdir(self.paths[lane]) if e.endswith(".json")] + abspaths = [os.path.join(self.paths[lane], e) for e in entries] for abspath in abspaths: try: with open(abspath) as f: @@ -390,43 +417,66 @@ class WorkQueue: return None - def _process_job(self, job): + def _process_job(self, job, src_lane): name = job["name"] paths = self._job_paths(name) + src = paths[src_lane] if not ("repo" in job and "sha" in job): sys.stderr.write(f"Skipping {name}: Malformed process job\n") - os.rename(paths["todo"], paths["done"]) + self.write_result(name, "error", error="Malformed process job") + os.rename(src, paths["done"]) return if not job["repo"] in repos: sys.stderr.write(f"Skipping {name}: Unknown repo {job['repo']}\n") - os.rename(paths["todo"], paths["done"]) + self.write_result(name, "error", error=f"Unknown repo {job['repo']}") + os.rename(src, paths["done"]) return try: - os.rename(paths["todo"], paths["doing"]) + os.rename(src, paths["doing"]) except: sys.stderr.write(f"Skipping {name}: Job disappeared\n") return + state, gist, error = "error", None, None try: sys.stderr.write(f"Processing {name}\n") - repos[job["repo"]].process(job, not self.publish) + state, gist = repos[job["repo"]].process(job, not self.publish) sys.stderr.write(f"Processed {name}\n") except Exception as e: sys.stderr.write(f"Aborting {name}: {str(e)}\n") traceback.print_exc() + error = str(e) + + repo_obj = repos.get(job["repo"]) + report_log = None + if repo_obj: + candidate = f"{repo_obj.path}/report-{job['sha']}.log" + if os.path.exists(candidate): + report_log = candidate + + self.write_result(name, state, gist=gist, report_log=report_log, error=error, + heading=job.get("heading")) os.rename(paths["doing"], paths["done"]) return def process_next(self): - job = self._dequeue() + paused = os.path.exists(self.pause_file) + + src_lane = "forced" + job = self._dequeue(src_lane) + if not job and not paused: + src_lane = "todo" + job = self._dequeue(src_lane) if not job: + if paused and any(e.endswith(".json") for e in os.listdir(self.paths["todo"])): + sys.stderr.write("Skipping todo, work is paused\n") return False if job["type"] == "process": - self._process_job(job) + self._process_job(job, src_lane) return True @@ -515,4 +565,11 @@ if __name__ == "__main__": elif args.cmd == "enqueue": wq.enqueue() elif args.cmd == "process": - sys.exit(0 if repos[args.repo].process_standalone(args.sha, not args.publish) else 1) + repo_obj = repos[args.repo] + state, gist = repo_obj.process_standalone(args.sha, not args.publish) + name = f"{args.repo.replace('/', '-')}-{args.sha}" + heading = repo_obj.api(f"commits/{args.sha}")["commit"]["message"].splitlines()[0] + candidate = f"{repo_obj.path}/report-{args.sha}.log" + report_log = candidate if os.path.exists(candidate) else None + wq.write_result(name, state, gist=gist, report_log=report_log, heading=heading) + sys.exit(0 if state == "success" else 1)