From 71098b44e66c87b6bf7fb8f37c894ac59fa1cd64 Mon Sep 17 00:00:00 2001 From: Johannes Heuel Date: Thu, 20 Oct 2022 10:03:15 +0200 Subject: [PATCH] fail jobs if workers die --- submitters/snakemake/zoidberg/grid-status.py | 21 ++++++++++++-- submitters/snakemake/zoidberg/grid-submit.py | 2 ++ zoidberg_server/src/main.rs | 30 ++++++++++++++------ zoidberg_server/src/webpage.rs | 3 +- 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/submitters/snakemake/zoidberg/grid-status.py b/submitters/snakemake/zoidberg/grid-status.py index 758a544..025fd94 100755 --- a/submitters/snakemake/zoidberg/grid-status.py +++ b/submitters/snakemake/zoidberg/grid-status.py @@ -2,8 +2,17 @@ import sys import requests +from os import environ -resp = requests.post("http://localhost:8080/status", json=[{"id": int(sys.argv[1])}]) +def print_and_exit(s): + print(s) + exit(0) + +resp = requests.post( + "http://localhost:8080/status", + json=[{"id": int(sys.argv[1])}], + headers={"cookie": environ["ZOIDBERG_SECRET"]}, +) translation = { "Submitted": "running", @@ -11,4 +20,12 @@ translation = { "Failed": "failed", } -print(translation[resp.json()[0]["status"]]) +j = resp.json() + +if len(j) == 0: + print_and_exit("failed") + +if "Running" in j[0]["status"]: + print_and_exit("running") + +print_and_exit(translation[resp.json()[0]["status"]]) diff --git a/submitters/snakemake/zoidberg/grid-submit.py b/submitters/snakemake/zoidberg/grid-submit.py index 349673f..19da41c 100755 --- a/submitters/snakemake/zoidberg/grid-submit.py +++ b/submitters/snakemake/zoidberg/grid-submit.py @@ -2,6 +2,7 @@ import sys import requests +from os import environ jobscript = sys.argv[1] @@ -10,6 +11,7 @@ resp = requests.post( json=[ {"cmd": jobscript}, ], + headers={"cookie": environ["ZOIDBERG_SECRET"]}, ) assert resp.ok, "http request failed" diff --git a/zoidberg_server/src/main.rs b/zoidberg_server/src/main.rs index f4b6561..1c83231 100644 --- a/zoidberg_server/src/main.rs +++ b/zoidberg_server/src/main.rs @@ -40,8 +40,8 @@ impl State { #[get("/")] async fn index(data: web::Data) -> impl Responder { let workers = data.workers.lock().unwrap(); - let new_jobs = data.new_jobs.lock().unwrap(); - let page = webpage::render(&*new_jobs, &*workers); + let jobs = data.jobs.lock().unwrap(); + let page = webpage::render(&*jobs, &*workers); HttpResponse::Ok().body(page) } @@ -133,7 +133,7 @@ async fn heartbeat( data: web::Data, _: Authorization, ) -> Result { - log::info!("Heartbeat from worker {}", heartbeat.id); + log::debug!("Heartbeat from worker {}", heartbeat.id); let mut workers = data.workers.lock().unwrap(); for w in workers.iter_mut() { if w.id == heartbeat.id { @@ -190,11 +190,23 @@ async fn main() -> std::io::Result<()> { tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_secs(10)).await; - let mut workers = s.workers.lock().unwrap(); - workers.retain(|w| match w.last_heartbeat { - None => true, - Some(t) => Utc::now().timestamp() - t < 60, - }) + { + let mut workers = s.workers.lock().unwrap(); + workers.retain(|w| match w.last_heartbeat { + None => true, + Some(t) => Utc::now().timestamp() - t < 60, + }) + } + let workers = s.workers.lock().unwrap(); + let mut jobs = s.jobs.lock().unwrap(); + for job in jobs.iter_mut() { + if let Status::Running(w) = &job.status { + let exists = workers.iter().filter(|x| &x.id == w).count() > 0; + if !exists { + job.status = Status::Failed; + } + } + } } }); @@ -211,7 +223,7 @@ async fn main() -> std::io::Result<()> { .service(heartbeat) .service(submit) }) - .bind(("127.0.0.1", 8080))? + .bind(("0.0.0.0", 8080))? .run() .await } diff --git a/zoidberg_server/src/webpage.rs b/zoidberg_server/src/webpage.rs index 136c043..e1d1a71 100644 --- a/zoidberg_server/src/webpage.rs +++ b/zoidberg_server/src/webpage.rs @@ -52,7 +52,8 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String { Zoidberg - + + {}