Compare commits

..

9 Commits

Author SHA1 Message Date
6707e7e403 fix failed pre-commit checks
All checks were successful
continuous-integration/drone/push Build is passing
2024-02-08 18:02:09 +01:00
9971b46e9e Merge branch 'main' of gitea:jheuel/zoidberg
Some checks failed
continuous-integration/drone/push Build is failing
2024-02-08 17:57:16 +01:00
302e47c342 . 2024-02-08 17:56:32 +01:00
Johannes Heuel
461cdb2cc7 add environment variable for server
Some checks failed
continuous-integration/drone/push Build is failing
2022-12-20 18:15:56 +01:00
Johannes Heuel
126145ede7 add max width to column
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-20 12:02:25 +02:00
Johannes Heuel
ea30f57ea9 add thread requirement for jobs
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-20 11:38:43 +02:00
Johannes Heuel
71098b44e6 fail jobs if workers die
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-20 10:03:15 +02:00
Johannes Heuel
949801fc4d clean up
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-09 20:21:37 +02:00
Johannes Heuel
32bd7d5b62 fix tests
All checks were successful
continuous-integration/drone/push Build is passing
2022-09-27 15:00:12 +02:00
9 changed files with 193 additions and 83 deletions

9
backends/htcondor/run.sh Normal file → Executable file
View File

@@ -1,9 +1,8 @@
#!/usr/bin/bash
# apparently this vodoo kills all processes opened in this script
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
# trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
# ssh -N -L 8080:localhost:8080 lxplus7103 &
# sleep 10
ssh -N -L 8080:localhost:8080 lxplus7103 &
sleep 10
/afs/cern.ch/work/j/jheuel/zoidberg/target/debug/zoidberg_client http://localhost:8080
/home/home4/institut_1b/jheuel/repositories/zoidberg/target/release/zoidberg_client

View File

@@ -6,9 +6,10 @@ error = output/stderr.$(Process)
log = output/log.$(Process)
request_cpus = 1
#request_memory = 1024
request_memory = 4096
#request_disk = 10240
getenv = True
should_transfer_files = no
queue 150
queue 20

View File

@@ -2,8 +2,17 @@
import sys
import requests
from os import environ
resp = requests.post("http://localhost:8080/status", json=[{"id": int(sys.argv[1])}])
def print_and_exit(s):
print(s)
exit(0)
resp = requests.post(
"http://localhost:8080/status",
json=[{"id": int(sys.argv[1])}],
headers={"cookie": environ["ZOIDBERG_SECRET"]},
)
translation = {
"Submitted": "running",
@@ -11,4 +20,12 @@ translation = {
"Failed": "failed",
}
print(translation[resp.json()[0]["status"]])
j = resp.json()
if len(j) == 0:
print_and_exit("failed")
if "Running" in j[0]["status"]:
print_and_exit("running")
print_and_exit(translation[resp.json()[0]["status"]])

View File

@@ -2,14 +2,25 @@
import sys
import requests
from os import environ
from snakemake.utils import read_job_properties
jobscript = sys.argv[1]
job_properties = read_job_properties(jobscript)
payload = {
"cmd": jobscript
}
payload["threads"] = job_properties.get("threads", 1)
resp = requests.post(
"http://localhost:8080/submit",
json=[
{"cmd": jobscript},
payload,
],
headers={"cookie": environ["ZOIDBERG_SECRET"]},
)
assert resp.ok, "http request failed"

View File

@@ -1,4 +1,4 @@
use clap::{App, Arg};
use clap::{arg, value_parser, App, Arg};
use env_logger::Env;
use futures::future::{AbortHandle, Abortable};
use log;
@@ -16,7 +16,7 @@ use zoidberg_lib::types::{
const VERSION: &str = env!("CARGO_PKG_VERSION");
fn build_client(secret: &str) -> Client {
let cookie = format!("{}", secret);
let cookie = secret.to_string();
let mut headers = header::HeaderMap::new();
headers.insert(
@@ -37,10 +37,11 @@ struct Worker {
id: String,
secret: String,
server: String,
threads: i32,
}
impl Worker {
async fn new(server: &str, secret: &str) -> Result<Worker, Box<dyn Error>> {
async fn new(server: &str, secret: &str, threads: i32) -> Result<Worker, Box<dyn Error>> {
let res = build_client(secret)
.get(format!("{}/register", server))
.send()
@@ -50,9 +51,10 @@ impl Worker {
let r: RegisterResponse = serde_json::from_str(&body)?;
log::info!("registered worker with id: {}", &r.id);
Ok(Worker {
id: r.id.to_string(),
id: r.id,
secret: secret.to_string(),
server: server.to_string(),
threads,
})
}
@@ -83,6 +85,7 @@ impl Worker {
.post(format!("{}/fetch", self.server))
.json(&FetchRequest {
worker_id: self.id.clone(),
threads: self.threads,
})
.send()
.await?;
@@ -129,22 +132,34 @@ async fn main() -> Result<(), Box<dyn Error>> {
let matches = App::new("Zoidberg client")
.version(VERSION)
.author("Johannes Heuel")
.author("by Johannes Heuel")
.arg(
Arg::with_name("server")
.takes_value(true)
.required(true)
.help("Set Zoidberg server address"),
)
.arg(
arg!(-j --threads <VALUE> "Sets number of threads")
.required(false)
.value_parser(value_parser!(i32)),
)
.get_matches();
let server = matches.value_of("server").unwrap();
let threads: i32 = if let Some(t) = matches.get_one::<i32>("threads") {
*t
} else {
1
};
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
eprintln!("Please set the $ZOIDBERG_SECRET environment variable");
std::process::exit(1);
});
let server = std::env::var("ZOIDBERG_SERVER")
.unwrap_or_else(|_| String::from(matches.value_of("server").unwrap()));
let client = Arc::new(
Worker::new(server, &secret)
Worker::new(&server, &secret, threads)
.await
.expect("Could not create client"),
);

View File

@@ -46,6 +46,8 @@ pub struct Job {
pub cmd: String,
#[serde(default = "Status::default")]
pub status: Status,
#[serde(default)]
pub threads: i32,
}
#[derive(Serialize, Deserialize)]
@@ -61,6 +63,8 @@ pub struct RegisterResponse {
#[derive(Serialize, Deserialize)]
pub struct FetchRequest {
pub worker_id: String,
#[serde(default)]
pub threads: i32,
}
#[derive(Serialize, Deserialize)]

View File

@@ -0,0 +1,24 @@
use actix_web::{dev, error::ErrorBadRequest, Error, FromRequest, HttpRequest, Result};
use futures::future::{err, ok, Ready};
pub struct Authorization {}
impl FromRequest for Authorization {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
if let Some(head) = req.headers().get("cookie") {
if let Ok(cookie) = head.to_str() {
if let Some(secret) = req.app_data::<String>() {
if secret == cookie {
return ok(Authorization {});
} else {
return err(ErrorBadRequest("no auth"));
}
}
}
}
err(ErrorBadRequest("no auth"))
}
}

View File

@@ -1,13 +1,9 @@
use actix_web::error::ErrorBadRequest;
use actix_web::{
dev, get, middleware::Logger, post, web, App, Error, FromRequest, HttpRequest, HttpResponse,
HttpServer, Responder, Result,
get, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder, Result,
};
use chrono::Utc;
use clap;
use env_logger::Env;
use futures::future::{err, ok, Ready};
use log;
use std::sync::Mutex;
use std::time::Duration;
use uuid::Uuid;
@@ -16,8 +12,11 @@ use zoidberg_lib::types::{
Worker,
};
mod auth;
mod webpage;
use auth::Authorization;
const VERSION: &str = env!("CARGO_PKG_VERSION");
struct State {
@@ -38,33 +37,17 @@ impl State {
}
}
struct Authorization {}
impl FromRequest for Authorization {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
if let Some(head) = req.headers().get("cookie") {
if let Ok(cookie) = head.to_str() {
if let Some(secret) = req.app_data::<String>() {
if secret == cookie {
return ok(Authorization {});
} else {
return err(ErrorBadRequest("no auth"));
}
}
}
}
err(ErrorBadRequest("no auth"))
}
}
#[get("/")]
async fn index(data: web::Data<State>) -> impl Responder {
let workers = data.workers.lock().unwrap();
let new_jobs = data.new_jobs.lock().unwrap();
let page = webpage::render(&*new_jobs, &*workers);
let jobs = data.jobs.lock().unwrap();
let filtered_jobs: Vec<Job> = jobs
.to_vec()
.iter()
.filter(|x| !matches!(x.status, Status::Completed))
.cloned()
.collect();
let page = webpage::render(&filtered_jobs, &*workers);
HttpResponse::Ok().body(page)
}
@@ -87,7 +70,9 @@ async fn fetch(
f: web::Json<FetchRequest>,
_: Authorization,
) -> Result<impl Responder> {
let requesting_worker = f.into_inner().worker_id;
let f = f.into_inner();
let requesting_worker = f.worker_id;
let threads = f.threads;
{
let workers = data.workers.lock().unwrap();
if workers.iter().filter(|w| w.id == requesting_worker).count() != 1 {
@@ -97,15 +82,24 @@ async fn fetch(
}
}
let mut new_jobs = data.new_jobs.lock().unwrap();
if let Some(j) = new_jobs.pop() {
new_jobs.sort_by(|a, b| b.threads.cmp(&a.threads));
if let Some(j) = new_jobs
.iter()
.filter(|x| x.threads <= threads)
.cloned()
.collect::<Vec<Job>>()
.first()
{
let mut jobs = data.jobs.lock().unwrap();
for cj in jobs.iter_mut() {
if cj.id == j.id {
cj.status = Status::Running(requesting_worker.clone())
}
}
return Ok(web::Json(FetchResponse::Jobs(vec![j])));
}
new_jobs.retain(|x| x.id != j.id);
return Ok(web::Json(FetchResponse::Jobs(vec![j.clone()])));
};
Ok(web::Json(FetchResponse::Nop))
}
@@ -156,7 +150,7 @@ async fn heartbeat(
data: web::Data<State>,
_: Authorization,
) -> Result<String> {
log::info!("Heartbeat from worker {}", heartbeat.id);
log::debug!("Heartbeat from worker {}", heartbeat.id);
let mut workers = data.workers.lock().unwrap();
for w in workers.iter_mut() {
if w.id == heartbeat.id {
@@ -204,7 +198,7 @@ async fn main() -> std::io::Result<()> {
let _matches = clap::App::new("Zoidberg server")
.version(VERSION)
.author("Johannes Heuel")
.author("by Johannes Heuel")
.get_matches();
let state = web::Data::new(State::new());
@@ -213,11 +207,23 @@ async fn main() -> std::io::Result<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let mut workers = s.workers.lock().unwrap();
workers.retain(|w| match w.last_heartbeat {
None => true,
Some(t) => Utc::now().timestamp() - t < 60,
})
{
let mut workers = s.workers.lock().unwrap();
workers.retain(|w| match w.last_heartbeat {
None => true,
Some(t) => Utc::now().timestamp() - t < 60,
})
}
let workers = s.workers.lock().unwrap();
let mut jobs = s.jobs.lock().unwrap();
for job in jobs.iter_mut() {
if let Status::Running(w) = &job.status {
let exists = workers.iter().filter(|x| &x.id == w).count() > 0;
if !exists {
job.status = Status::Failed;
}
}
}
}
});
@@ -234,7 +240,7 @@ async fn main() -> std::io::Result<()> {
.service(heartbeat)
.service(submit)
})
.bind(("127.0.0.1", 8080))?
.bind(("0.0.0.0", 8080))?
.run()
.await
}
@@ -272,31 +278,53 @@ mod tests {
.uri("/register")
.to_request();
let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp.id, 1);
assert!(!resp.id.is_empty());
}
#[actix_web::test]
async fn test_fetch() {
let cmd = String::from("hi");
let jobid = 11;
let app = test::init_service(
App::new()
.app_data(String::from("secret"))
.app_data(web::Data::new(State {
counter_workers: Mutex::new(0),
counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(vec![Job {
id: 0,
cmd: cmd.clone(),
status: Status::Submitted,
workers: Mutex::new(vec![Worker {
id: "some_worker".to_string(),
last_heartbeat: None,
}]),
new_jobs: Mutex::new(vec![
Job {
id: jobid,
cmd: cmd.clone(),
status: Status::Submitted,
threads: 1,
},
Job {
id: jobid + 1,
cmd: cmd.clone(),
status: Status::Submitted,
threads: 2,
},
Job {
id: jobid + 2,
cmd: cmd.clone(),
status: Status::Submitted,
threads: 3,
},
]),
jobs: Mutex::new(Vec::new()),
}))
.service(fetch),
)
.await;
let req = test::TestRequest::get()
let req = test::TestRequest::post()
.append_header(("cookie", "secret"))
.set_json(FetchRequest {
worker_id: "some_worker".to_string(),
threads: 1,
})
.uri("/fetch")
.to_request();
let resp: FetchResponse = test::call_and_read_body_json(&app, req).await;
@@ -308,7 +336,7 @@ mod tests {
panic!("did not expect FetchResponse::Terminate from worker {}", w)
}
FetchResponse::Jobs(new_jobs) => {
assert_eq!(new_jobs[0].id, 0);
assert_eq!(new_jobs[0].id, jobid);
assert_eq!(new_jobs[0].cmd, cmd);
}
}
@@ -317,18 +345,19 @@ mod tests {
#[actix_web::test]
async fn test_status() {
let cmd = String::from("hi");
let jobid = 1;
let app = test::init_service(
App::new()
.app_data(String::from("secret"))
.app_data(web::Data::new(State {
counter_workers: Mutex::new(0),
counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(vec![Job {
id: 1,
id: jobid,
cmd: cmd.clone(),
status: Status::Running(0),
status: Status::Submitted,
threads: 1,
}]),
}))
.service(status),
@@ -336,11 +365,11 @@ mod tests {
.await;
let req = test::TestRequest::post()
.append_header(("cookie", "secret"))
.set_json(vec![StatusRequest { id: 1 }])
.set_json(vec![StatusRequest { id: jobid }])
.uri("/status")
.to_request();
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp[0].id, 1);
assert_eq!(resp[0].id, jobid);
}
#[actix_web::test]
@@ -355,9 +384,9 @@ mod tests {
let req = test::TestRequest::post()
.append_header(("cookie", "secret"))
.set_json(vec![Update {
worker: 0,
worker: "some_worker".to_string(),
job: 0,
status: Status::Running(0),
status: Status::Submitted,
}])
.uri("/update")
.to_request();
@@ -379,7 +408,8 @@ mod tests {
.set_json(vec![Job {
id: 0,
cmd: String::from("hi"),
status: Status::Running(0),
status: Status::Submitted,
threads: 1,
}])
.uri("/submit")
.to_request();

View File

@@ -4,12 +4,12 @@ use zoidberg_lib::types::{Job, Worker};
// TODO: write nicer frontend
pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
let jobs_html: String = String::from("<table class=\"table is-hoverable\">")
+ "<thead><tr><th><td>ID</td><td>command</td><td>status</td></th></tr></thead><tbody>"
+ "<thead><tr><th>ID</th><th style=\"width: 150px;\">command</th><th>status</th></tr></thead><tbody>"
+ &jobs
.iter()
.map(|j| {
format!(
"<tr><th></th><td>{}</td><td>{}</td><td>{}</td></tr>",
"<tr><td>{}</td><td>{}</td><td>{}</td></tr>",
j.id, j.cmd, j.status
)
})
@@ -18,7 +18,7 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
+ "</tbody></table>";
let workers_html: String = String::from("<table class=\"table is-hoverable\">")
+ "<thead><tr><th><td>ID</td><td>last heartbeat</td></th></tr></thead><tbody>"
+ "<thead><tr><th>ID</th><th>last heartbeat</th></tr></thead><tbody>"
+ &workers
.iter()
.map(|w| {
@@ -27,12 +27,19 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
} else {
String::from("")
};
format!("<tr><th></th><td>{}</td><td>{}</td></tr>", w.id, ts)
format!("<tr><td>{}</td><td>{}</td></tr>", w.id, ts)
})
.collect::<Vec<String>>()
.join("\n")
+ "</tbody></table>";
let style = r#"<style>
td {
max-width: 40vw;
word-wrap: break-word;
}
</style>"#;
let _debug_html = r#"<style>
*:not(path):not(g) {{
color: hsla(210, 100%, 100%, 0.9) !important;
@@ -52,9 +59,11 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Zoidberg</title>
<link rel="icon" href="data:,">
<link rel="icon" href="data:image/svg+xml,%3Csvg%20xmlns='http://www.w3.org/2000/svg'%20viewBox='0%200%2016%2016'%3E%3Ctext%20x='0'%20y='14'%3E🦀%3C/text%3E%3C/svg%3E" type="image/svg+xml" />
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
{}
{}
</head>
<body>
<section class="section">
@@ -82,7 +91,7 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
</body>
</html>
"#,
_debug_html, jobs_html, workers_html
style, _debug_html, jobs_html, workers_html
);
page
}