Compare commits

...

11 Commits

Author SHA1 Message Date
6707e7e403 fix failed pre-commit checks
All checks were successful
continuous-integration/drone/push Build is passing
2024-02-08 18:02:09 +01:00
9971b46e9e Merge branch 'main' of gitea:jheuel/zoidberg
Some checks failed
continuous-integration/drone/push Build is failing
2024-02-08 17:57:16 +01:00
302e47c342 . 2024-02-08 17:56:32 +01:00
Johannes Heuel
461cdb2cc7 add environment variable for server
Some checks failed
continuous-integration/drone/push Build is failing
2022-12-20 18:15:56 +01:00
Johannes Heuel
126145ede7 add max width to column
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-20 12:02:25 +02:00
Johannes Heuel
ea30f57ea9 add thread requirement for jobs
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-20 11:38:43 +02:00
Johannes Heuel
71098b44e6 fail jobs if workers die
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-20 10:03:15 +02:00
Johannes Heuel
949801fc4d clean up
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-09 20:21:37 +02:00
Johannes Heuel
32bd7d5b62 fix tests
All checks were successful
continuous-integration/drone/push Build is passing
2022-09-27 15:00:12 +02:00
Johannes Heuel
6fd1cda2a2 change worker id to be a uuid
Some checks failed
continuous-integration/drone/push Build is failing
2022-09-27 14:00:09 +02:00
Johannes Heuel
60df9aca1a add worker heartbeat 2022-09-21 16:28:10 +02:00
12 changed files with 420 additions and 149 deletions

41
Cargo.lock generated
View File

@@ -166,7 +166,7 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"smallvec", "smallvec",
"socket2", "socket2",
"time", "time 0.3.14",
"url", "url",
] ]
@@ -351,8 +351,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
dependencies = [ dependencies = [
"iana-time-zone", "iana-time-zone",
"js-sys",
"num-integer", "num-integer",
"num-traits", "num-traits",
"time 0.1.44",
"wasm-bindgen",
"winapi", "winapi",
] ]
@@ -393,7 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05" checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05"
dependencies = [ dependencies = [
"percent-encoding", "percent-encoding",
"time", "time 0.3.14",
"version_check", "version_check",
] ]
@@ -648,7 +651,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
] ]
[[package]] [[package]]
@@ -929,7 +932,7 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys", "windows-sys",
] ]
@@ -1505,6 +1508,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.14" version = "0.3.14"
@@ -1686,6 +1700,15 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "uuid"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.15" version = "0.2.15"
@@ -1708,6 +1731,12 @@ dependencies = [
"try-lock", "try-lock",
] ]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"
@@ -1879,6 +1908,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"clap", "clap",
"env_logger", "env_logger",
"futures",
"log", "log",
"reqwest", "reqwest",
"reqwest-middleware", "reqwest-middleware",
@@ -1901,11 +1931,14 @@ name = "zoidberg_server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"chrono",
"clap", "clap",
"env_logger", "env_logger",
"futures", "futures",
"log", "log",
"serde_json", "serde_json",
"tokio",
"uuid",
"zoidberg_lib", "zoidberg_lib",
] ]

9
backends/htcondor/run.sh Normal file → Executable file
View File

@@ -1,9 +1,8 @@
#!/usr/bin/bash #!/usr/bin/bash
# apparently this vodoo kills all processes opened in this script # apparently this vodoo kills all processes opened in this script
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT # trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
# ssh -N -L 8080:localhost:8080 lxplus7103 &
# sleep 10
ssh -N -L 8080:localhost:8080 lxplus7103 & /home/home4/institut_1b/jheuel/repositories/zoidberg/target/release/zoidberg_client
sleep 10
/afs/cern.ch/work/j/jheuel/zoidberg/target/debug/zoidberg_client http://localhost:8080

View File

@@ -6,9 +6,10 @@ error = output/stderr.$(Process)
log = output/log.$(Process) log = output/log.$(Process)
request_cpus = 1 request_cpus = 1
#request_memory = 1024 request_memory = 4096
#request_disk = 10240 #request_disk = 10240
getenv = True
should_transfer_files = no should_transfer_files = no
queue 150 queue 20

View File

@@ -2,8 +2,17 @@
import sys import sys
import requests import requests
from os import environ
resp = requests.post("http://localhost:8080/status", json=[{"id": int(sys.argv[1])}]) def print_and_exit(s):
print(s)
exit(0)
resp = requests.post(
"http://localhost:8080/status",
json=[{"id": int(sys.argv[1])}],
headers={"cookie": environ["ZOIDBERG_SECRET"]},
)
translation = { translation = {
"Submitted": "running", "Submitted": "running",
@@ -11,4 +20,12 @@ translation = {
"Failed": "failed", "Failed": "failed",
} }
print(translation[resp.json()[0]["status"]]) j = resp.json()
if len(j) == 0:
print_and_exit("failed")
if "Running" in j[0]["status"]:
print_and_exit("running")
print_and_exit(translation[resp.json()[0]["status"]])

View File

@@ -2,14 +2,25 @@
import sys import sys
import requests import requests
from os import environ
from snakemake.utils import read_job_properties
jobscript = sys.argv[1] jobscript = sys.argv[1]
job_properties = read_job_properties(jobscript)
payload = {
"cmd": jobscript
}
payload["threads"] = job_properties.get("threads", 1)
resp = requests.post( resp = requests.post(
"http://localhost:8080/submit", "http://localhost:8080/submit",
json=[ json=[
{"cmd": jobscript}, payload,
], ],
headers={"cookie": environ["ZOIDBERG_SECRET"]},
) )
assert resp.ok, "http request failed" assert resp.ok, "http request failed"

View File

@@ -20,3 +20,4 @@ tokio = { version = "1", features = ["full"] }
clap = "3.2.22" clap = "3.2.22"
env_logger = "0.9" env_logger = "0.9"
log = "0.4" log = "0.4"
futures = "0.3.24"

View File

@@ -1,18 +1,22 @@
use clap::{App, Arg}; use clap::{arg, value_parser, App, Arg};
use env_logger::Env; use env_logger::Env;
use futures::future::{AbortHandle, Abortable};
use log; use log;
use reqwest::{header, Client, ClientBuilder}; use reqwest::{header, Client, ClientBuilder};
use std::error::Error; use std::error::Error;
use std::process::Command; use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{thread, time}; use tokio::{process::Command, time};
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update}; use zoidberg_lib::types::{
FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, Update,
};
const VERSION: &str = env!("CARGO_PKG_VERSION"); const VERSION: &str = env!("CARGO_PKG_VERSION");
fn build_client(secret: &str) -> Client { fn build_client(secret: &str) -> Client {
let cookie = format!("{}", secret); let cookie = secret.to_string();
let mut headers = header::HeaderMap::new(); let mut headers = header::HeaderMap::new();
headers.insert( headers.insert(
@@ -28,14 +32,16 @@ fn build_client(secret: &str) -> Client {
.expect("Could not create client") .expect("Could not create client")
} }
#[derive(Debug)] #[derive(Debug, Clone)]
struct Worker { struct Worker {
id: i32, id: String,
secret: String, secret: String,
server: String,
threads: i32,
} }
impl Worker { impl Worker {
async fn new(server: &str, secret: &str) -> Result<Worker, Box<dyn Error>> { async fn new(server: &str, secret: &str, threads: i32) -> Result<Worker, Box<dyn Error>> {
let res = build_client(secret) let res = build_client(secret)
.get(format!("{}/register", server)) .get(format!("{}/register", server))
.send() .send()
@@ -47,21 +53,23 @@ impl Worker {
Ok(Worker { Ok(Worker {
id: r.id, id: r.id,
secret: secret.to_string(), secret: secret.to_string(),
server: server.to_string(),
threads,
}) })
} }
async fn update(self: &Self, jobs: &[Job]) -> Result<(), Box<dyn Error>> { async fn update(&self, jobs: &[Job]) -> Result<(), Box<dyn Error>> {
let updates: Vec<Update> = jobs let updates: Vec<Update> = jobs
.iter() .iter()
.map(|job| Update { .map(|job| Update {
worker: self.id, worker: self.id.clone(),
job: job.id, job: job.id,
status: job.status.clone(), status: job.status.clone(),
}) })
.collect(); .collect();
let body = build_client(&self.secret) let body = build_client(&self.secret)
.post("http://localhost:8080/update") .post(format!("{}/update", self.server))
.json(&updates) .json(&updates)
.send() .send()
.await? .await?
@@ -72,9 +80,13 @@ impl Worker {
Ok(()) Ok(())
} }
async fn fetch(self: &Self) -> Result<FetchResponse, Box<dyn Error>> { async fn fetch(&self) -> Result<FetchResponse, Box<dyn Error>> {
let res = build_client(&self.secret) let res = build_client(&self.secret)
.get("http://localhost:8080/fetch") .post(format!("{}/fetch", self.server))
.json(&FetchRequest {
worker_id: self.id.clone(),
threads: self.threads,
})
.send() .send()
.await?; .await?;
let body = res.text().await?; let body = res.text().await?;
@@ -82,8 +94,25 @@ impl Worker {
Ok(resp) Ok(resp)
} }
async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> { async fn heartbeat(&self) {
let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?; let _ = build_client(&self.secret)
.post(format!("{}/heartbeat", self.server))
.json(&Heartbeat {
id: self.id.clone(),
})
.send()
.await;
}
}
async fn run(job: &Job) -> Result<(), Box<dyn Error>> {
let output = Command::new("bash")
.arg("-c")
.arg(&job.cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output();
let output = output.await?;
log::info!( log::info!(
"command: {}\nstdout: {}\nstderr: {}", "command: {}\nstdout: {}\nstderr: {}",
@@ -95,23 +124,6 @@ impl Worker {
true => Ok(()), true => Ok(()),
false => Err(Box::from("Job failed")), false => Err(Box::from("Job failed")),
} }
}
async fn process(self: &Self, jobs: &[Job]) {
for job in jobs {
let status = match self.run(&job).await {
Ok(()) => Status::Completed,
Err(..) => Status::Failed,
};
let n = &[Job {
status,
..job.clone()
}];
if let Err(error) = self.update(n).await {
log::info!("Could not update job: {}", error);
}
}
}
} }
#[tokio::main] #[tokio::main]
@@ -120,37 +132,94 @@ async fn main() -> Result<(), Box<dyn Error>> {
let matches = App::new("Zoidberg client") let matches = App::new("Zoidberg client")
.version(VERSION) .version(VERSION)
.author("Johannes Heuel") .author("by Johannes Heuel")
.arg( .arg(
Arg::with_name("server") Arg::with_name("server")
.takes_value(true) .takes_value(true)
.required(true)
.help("Set Zoidberg server address"), .help("Set Zoidberg server address"),
) )
.arg(
arg!(-j --threads <VALUE> "Sets number of threads")
.required(false)
.value_parser(value_parser!(i32)),
)
.get_matches(); .get_matches();
let server = matches.value_of("server").unwrap(); let threads: i32 = if let Some(t) = matches.get_one::<i32>("threads") {
*t
} else {
1
};
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| { let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
println!("Please set the $ZOIDBERG_SECRET environment variable"); eprintln!("Please set the $ZOIDBERG_SECRET environment variable");
std::process::exit(1); std::process::exit(1);
}); });
let client = Worker::new(server, &secret) let server = std::env::var("ZOIDBERG_SERVER")
.unwrap_or_else(|_| String::from(matches.value_of("server").unwrap()));
let client = Arc::new(
Worker::new(&server, &secret, threads)
.await .await
.expect("Could not create client"); .expect("Could not create client"),
);
let pause = time::Duration::from_secs(1); let pause = time::Duration::from_secs(1);
let long_pause = time::Duration::from_secs(20); let long_pause = time::Duration::from_secs(40);
let heartbeat_pause = time::Duration::from_secs(30);
let (heartbeat_handle, abort_registration) = AbortHandle::new_pair();
let c = Arc::clone(&client);
tokio::spawn(Abortable::new(
async move {
loop { loop {
if let Ok(fetch) = client.fetch().await { time::sleep(heartbeat_pause).await;
c.heartbeat().await;
}
},
abort_registration,
));
let mut fail_counter = 0;
loop {
let jobs = if let Ok(fetch) = client.fetch().await {
fail_counter = 0;
match fetch { match fetch {
FetchResponse::Nop => thread::sleep(pause), FetchResponse::Nop => {
FetchResponse::StopWorking => break, time::sleep(pause).await;
FetchResponse::Jobs(jobs) => client.process(&jobs).await, continue;
}
FetchResponse::Terminate(m) => {
println!("Terminate worker: {}", m);
break;
}
FetchResponse::Jobs(jobs) => jobs,
} }
} else { } else {
thread::sleep(long_pause); fail_counter += 1;
if fail_counter == 3 {
log::error!("failed to fetch three times, assume that server crashed and exit");
std::process::exit(1);
}
log::error!("failed to fetch new jobs");
time::sleep(long_pause).await;
continue;
};
for job in jobs {
let status = match run(&job).await {
Ok(()) => Status::Completed,
Err(..) => Status::Failed,
};
let update = &[Job {
status,
..job.clone()
}];
if let Err(error) = client.update(update).await {
log::info!("Could not update job: {}", error);
} }
} }
}
heartbeat_handle.abort();
Ok(()) Ok(())
} }

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Update { pub struct Update {
pub worker: i32, pub worker: String,
pub job: i32, pub job: i32,
pub status: Status, pub status: Status,
} }
@@ -12,7 +12,7 @@ pub struct Update {
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Status { pub enum Status {
Submitted, Submitted,
Running, Running(String),
Completed, Completed,
Failed, Failed,
} }
@@ -27,7 +27,7 @@ impl fmt::Display for Status {
fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Status::Submitted => write!(f, "submitted"), Status::Submitted => write!(f, "submitted"),
Status::Running => write!(f, "running"), Status::Running(w) => write!(f, "running on worker {}", w),
Status::Completed => write!(f, "completed"), Status::Completed => write!(f, "completed"),
Status::Failed => write!(f, "failed"), Status::Failed => write!(f, "failed"),
} }
@@ -46,6 +46,8 @@ pub struct Job {
pub cmd: String, pub cmd: String,
#[serde(default = "Status::default")] #[serde(default = "Status::default")]
pub status: Status, pub status: Status,
#[serde(default)]
pub threads: i32,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@@ -55,13 +57,20 @@ pub struct Node {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct RegisterResponse { pub struct RegisterResponse {
pub id: i32, pub id: String,
}
#[derive(Serialize, Deserialize)]
pub struct FetchRequest {
pub worker_id: String,
#[serde(default)]
pub threads: i32,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub enum FetchResponse { pub enum FetchResponse {
Jobs(Vec<Job>), Jobs(Vec<Job>),
StopWorking, Terminate(String),
Nop, Nop,
} }
@@ -73,5 +82,13 @@ pub struct Submit {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Worker { pub struct Worker {
#[serde(default)] #[serde(default)]
pub id: i32, pub id: String,
#[serde(default)]
pub last_heartbeat: Option<i64>,
}
#[derive(Serialize, Deserialize)]
pub struct Heartbeat {
#[serde(default)]
pub id: String,
} }

View File

@@ -16,3 +16,6 @@ clap = "3.2"
env_logger = "0.9" env_logger = "0.9"
log = "0.4" log = "0.4"
futures = "0.3.24" futures = "0.3.24"
tokio = { version = "1", features = ["full"] }
chrono = "0.4.22"
uuid = { version = "1.1.2", features = ["v4"] }

View File

@@ -0,0 +1,24 @@
use actix_web::{dev, error::ErrorBadRequest, Error, FromRequest, HttpRequest, Result};
use futures::future::{err, ok, Ready};
pub struct Authorization {}
impl FromRequest for Authorization {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
if let Some(head) = req.headers().get("cookie") {
if let Ok(cookie) = head.to_str() {
if let Some(secret) = req.app_data::<String>() {
if secret == cookie {
return ok(Authorization {});
} else {
return err(ErrorBadRequest("no auth"));
}
}
}
}
err(ErrorBadRequest("no auth"))
}
}

View File

@@ -1,21 +1,25 @@
use actix_web::error::ErrorBadRequest;
use actix_web::{ use actix_web::{
dev, get, middleware::Logger, post, web, App, Error, FromRequest, HttpRequest, HttpResponse, get, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder, Result,
HttpServer, Responder, Result,
}; };
use clap; use chrono::Utc;
use env_logger::Env; use env_logger::Env;
use futures::future::{err, ok, Ready};
use log;
use std::sync::Mutex;
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update, Worker};
use std::sync::Mutex;
use std::time::Duration;
use uuid::Uuid;
use zoidberg_lib::types::{
FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, StatusRequest, Update,
Worker,
};
mod auth;
mod webpage; mod webpage;
use auth::Authorization;
const VERSION: &str = env!("CARGO_PKG_VERSION"); const VERSION: &str = env!("CARGO_PKG_VERSION");
struct State { struct State {
counter_workers: Mutex<i32>,
counter_jobs: Mutex<i32>, counter_jobs: Mutex<i32>,
workers: Mutex<Vec<Worker>>, workers: Mutex<Vec<Worker>>,
new_jobs: Mutex<Vec<Job>>, new_jobs: Mutex<Vec<Job>>,
@@ -25,7 +29,6 @@ struct State {
impl State { impl State {
fn new() -> Self { fn new() -> Self {
Self { Self {
counter_workers: Mutex::new(0),
counter_jobs: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()), new_jobs: Mutex::new(Vec::new()),
@@ -34,59 +37,69 @@ impl State {
} }
} }
struct Authorization {}
impl FromRequest for Authorization {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
if let Some(head) = req.headers().get("cookie") {
if let Ok(cookie) = head.to_str() {
if let Some(secret) = req.app_data::<String>() {
println!("{} == {}", secret, cookie);
if secret == cookie {
return ok(Authorization {});
} else {
return err(ErrorBadRequest("no auth"));
}
}
}
}
err(ErrorBadRequest("no auth"))
}
}
#[get("/")] #[get("/")]
async fn index(data: web::Data<State>) -> impl Responder { async fn index(data: web::Data<State>) -> impl Responder {
let workers = data.workers.lock().unwrap(); let workers = data.workers.lock().unwrap();
let new_jobs = data.new_jobs.lock().unwrap(); let jobs = data.jobs.lock().unwrap();
let page = webpage::render(&*new_jobs, &*workers); let filtered_jobs: Vec<Job> = jobs
.to_vec()
.iter()
.filter(|x| !matches!(x.status, Status::Completed))
.cloned()
.collect();
let page = webpage::render(&filtered_jobs, &*workers);
HttpResponse::Ok().body(page) HttpResponse::Ok().body(page)
} }
#[get("/register")] #[get("/register")]
async fn register(data: web::Data<State>, _: Authorization) -> Result<impl Responder> { async fn register(data: web::Data<State>, _: Authorization) -> Result<impl Responder> {
let mut counter_workers = data.counter_workers.lock().unwrap();
*counter_workers += 1;
let mut workers = data.workers.lock().unwrap(); let mut workers = data.workers.lock().unwrap();
let uuid = Uuid::new_v4().to_string();
workers.push(Worker { workers.push(Worker {
id: *counter_workers, id: uuid.clone(),
last_heartbeat: None,
}); });
log::info!("Registered worker node with id: {}", *counter_workers); log::info!("Registered worker node with id: {}", uuid);
Ok(web::Json(RegisterResponse { Ok(web::Json(RegisterResponse { id: uuid }))
id: *counter_workers,
}))
} }
#[get("/fetch")] #[post("/fetch")]
async fn fetch(data: web::Data<State>, _: Authorization) -> Result<impl Responder> { async fn fetch(
let mut new_jobs = data.new_jobs.lock().unwrap(); data: web::Data<State>,
if let Some(j) = new_jobs.pop() { f: web::Json<FetchRequest>,
return Ok(web::Json(FetchResponse::Jobs(vec![j]))); _: Authorization,
) -> Result<impl Responder> {
let f = f.into_inner();
let requesting_worker = f.worker_id;
let threads = f.threads;
{
let workers = data.workers.lock().unwrap();
if workers.iter().filter(|w| w.id == requesting_worker).count() != 1 {
return Ok(web::Json(FetchResponse::Terminate(
"Worker not found".into(),
)));
} }
}
let mut new_jobs = data.new_jobs.lock().unwrap();
new_jobs.sort_by(|a, b| b.threads.cmp(&a.threads));
if let Some(j) = new_jobs
.iter()
.filter(|x| x.threads <= threads)
.cloned()
.collect::<Vec<Job>>()
.first()
{
let mut jobs = data.jobs.lock().unwrap();
for cj in jobs.iter_mut() {
if cj.id == j.id {
cj.status = Status::Running(requesting_worker.clone())
}
}
new_jobs.retain(|x| x.id != j.id);
return Ok(web::Json(FetchResponse::Jobs(vec![j.clone()])));
};
Ok(web::Json(FetchResponse::Nop)) Ok(web::Json(FetchResponse::Nop))
} }
@@ -131,6 +144,22 @@ async fn update(
Ok(format!("Worker updated {} job(s)", n)) Ok(format!("Worker updated {} job(s)", n))
} }
#[post("/heartbeat")]
async fn heartbeat(
heartbeat: web::Json<Heartbeat>,
data: web::Data<State>,
_: Authorization,
) -> Result<String> {
log::debug!("Heartbeat from worker {}", heartbeat.id);
let mut workers = data.workers.lock().unwrap();
for w in workers.iter_mut() {
if w.id == heartbeat.id {
w.last_heartbeat = Some(Utc::now().timestamp());
}
}
Ok(format!("Heartbeat from worker {}", heartbeat.id))
}
#[post("/submit")] #[post("/submit")]
async fn submit( async fn submit(
data: web::Data<State>, data: web::Data<State>,
@@ -163,17 +192,41 @@ async fn main() -> std::io::Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init();
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| { let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
println!("Please set the $ZOIDBERG_SECRET environment variable"); eprintln!("Please set the $ZOIDBERG_SECRET environment variable");
std::process::exit(1); std::process::exit(1);
}); });
let _matches = clap::App::new("Zoidberg server") let _matches = clap::App::new("Zoidberg server")
.version(VERSION) .version(VERSION)
.author("Johannes Heuel") .author("by Johannes Heuel")
.get_matches(); .get_matches();
let state = web::Data::new(State::new()); let state = web::Data::new(State::new());
let s = state.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
{
let mut workers = s.workers.lock().unwrap();
workers.retain(|w| match w.last_heartbeat {
None => true,
Some(t) => Utc::now().timestamp() - t < 60,
})
}
let workers = s.workers.lock().unwrap();
let mut jobs = s.jobs.lock().unwrap();
for job in jobs.iter_mut() {
if let Status::Running(w) = &job.status {
let exists = workers.iter().filter(|x| &x.id == w).count() > 0;
if !exists {
job.status = Status::Failed;
}
}
}
}
});
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.wrap(Logger::default()) .wrap(Logger::default())
@@ -184,9 +237,10 @@ async fn main() -> std::io::Result<()> {
.service(fetch) .service(fetch)
.service(status) .service(status)
.service(update) .service(update)
.service(heartbeat)
.service(submit) .service(submit)
}) })
.bind(("127.0.0.1", 8080))? .bind(("0.0.0.0", 8080))?
.run() .run()
.await .await
} }
@@ -224,31 +278,53 @@ mod tests {
.uri("/register") .uri("/register")
.to_request(); .to_request();
let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await; let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp.id, 1); assert!(!resp.id.is_empty());
} }
#[actix_web::test] #[actix_web::test]
async fn test_fetch() { async fn test_fetch() {
let cmd = String::from("hi"); let cmd = String::from("hi");
let jobid = 11;
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(String::from("secret")) .app_data(String::from("secret"))
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter_workers: Mutex::new(0),
counter_jobs: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(vec![Worker {
new_jobs: Mutex::new(vec![Job { id: "some_worker".to_string(),
id: 0, last_heartbeat: None,
}]),
new_jobs: Mutex::new(vec![
Job {
id: jobid,
cmd: cmd.clone(), cmd: cmd.clone(),
status: Status::Submitted, status: Status::Submitted,
}]), threads: 1,
},
Job {
id: jobid + 1,
cmd: cmd.clone(),
status: Status::Submitted,
threads: 2,
},
Job {
id: jobid + 2,
cmd: cmd.clone(),
status: Status::Submitted,
threads: 3,
},
]),
jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
})) }))
.service(fetch), .service(fetch),
) )
.await; .await;
let req = test::TestRequest::get() let req = test::TestRequest::post()
.append_header(("cookie", "secret")) .append_header(("cookie", "secret"))
.set_json(FetchRequest {
worker_id: "some_worker".to_string(),
threads: 1,
})
.uri("/fetch") .uri("/fetch")
.to_request(); .to_request();
let resp: FetchResponse = test::call_and_read_body_json(&app, req).await; let resp: FetchResponse = test::call_and_read_body_json(&app, req).await;
@@ -256,11 +332,11 @@ mod tests {
FetchResponse::Nop => { FetchResponse::Nop => {
panic!("did not expect FetchResponse::Nop") panic!("did not expect FetchResponse::Nop")
} }
FetchResponse::StopWorking => { FetchResponse::Terminate(w) => {
panic!("did not expect FetchResponse::NotWorking") panic!("did not expect FetchResponse::Terminate from worker {}", w)
} }
FetchResponse::Jobs(new_jobs) => { FetchResponse::Jobs(new_jobs) => {
assert_eq!(new_jobs[0].id, 0); assert_eq!(new_jobs[0].id, jobid);
assert_eq!(new_jobs[0].cmd, cmd); assert_eq!(new_jobs[0].cmd, cmd);
} }
} }
@@ -269,18 +345,19 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_status() { async fn test_status() {
let cmd = String::from("hi"); let cmd = String::from("hi");
let jobid = 1;
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(String::from("secret")) .app_data(String::from("secret"))
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter_workers: Mutex::new(0),
counter_jobs: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()), new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(vec![Job { jobs: Mutex::new(vec![Job {
id: 1, id: jobid,
cmd: cmd.clone(), cmd: cmd.clone(),
status: Status::Running, status: Status::Submitted,
threads: 1,
}]), }]),
})) }))
.service(status), .service(status),
@@ -288,11 +365,11 @@ mod tests {
.await; .await;
let req = test::TestRequest::post() let req = test::TestRequest::post()
.append_header(("cookie", "secret")) .append_header(("cookie", "secret"))
.set_json(vec![StatusRequest { id: 1 }]) .set_json(vec![StatusRequest { id: jobid }])
.uri("/status") .uri("/status")
.to_request(); .to_request();
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await; let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp[0].id, 1); assert_eq!(resp[0].id, jobid);
} }
#[actix_web::test] #[actix_web::test]
@@ -307,9 +384,9 @@ mod tests {
let req = test::TestRequest::post() let req = test::TestRequest::post()
.append_header(("cookie", "secret")) .append_header(("cookie", "secret"))
.set_json(vec![Update { .set_json(vec![Update {
worker: 0, worker: "some_worker".to_string(),
job: 0, job: 0,
status: Status::Running, status: Status::Submitted,
}]) }])
.uri("/update") .uri("/update")
.to_request(); .to_request();
@@ -331,7 +408,8 @@ mod tests {
.set_json(vec![Job { .set_json(vec![Job {
id: 0, id: 0,
cmd: String::from("hi"), cmd: String::from("hi"),
status: Status::Running, status: Status::Submitted,
threads: 1,
}]) }])
.uri("/submit") .uri("/submit")
.to_request(); .to_request();

View File

@@ -1,14 +1,15 @@
use chrono::Utc;
use zoidberg_lib::types::{Job, Worker}; use zoidberg_lib::types::{Job, Worker};
// TODO: write nicer frontend // TODO: write nicer frontend
pub fn render(jobs: &[Job], workers: &[Worker]) -> String { pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
let jobs_html: String = String::from("<table class=\"table is-hoverable\">") let jobs_html: String = String::from("<table class=\"table is-hoverable\">")
+ "<thead><tr><th><td>ID</td><td>command</td><td>status</td></th></tr></thead><tbody>" + "<thead><tr><th>ID</th><th style=\"width: 150px;\">command</th><th>status</th></tr></thead><tbody>"
+ &jobs + &jobs
.iter() .iter()
.map(|j| { .map(|j| {
format!( format!(
"<tr><th></th><td>{}</td><td>{}</td><td>{}</td></tr>", "<tr><td>{}</td><td>{}</td><td>{}</td></tr>",
j.id, j.cmd, j.status j.id, j.cmd, j.status
) )
}) })
@@ -17,14 +18,28 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
+ "</tbody></table>"; + "</tbody></table>";
let workers_html: String = String::from("<table class=\"table is-hoverable\">") let workers_html: String = String::from("<table class=\"table is-hoverable\">")
+ "<thead><tr><th><td>ID</td></th></tr></thead><tbody>" + "<thead><tr><th>ID</th><th>last heartbeat</th></tr></thead><tbody>"
+ &workers + &workers
.iter() .iter()
.map(|w| format!("<tr><th></th><td>{}</td></tr>", w.id)) .map(|w| {
let ts = if let Some(ts) = w.last_heartbeat {
format!("{}", Utc::now().timestamp() - ts)
} else {
String::from("")
};
format!("<tr><td>{}</td><td>{}</td></tr>", w.id, ts)
})
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join("\n") .join("\n")
+ "</tbody></table>"; + "</tbody></table>";
let style = r#"<style>
td {
max-width: 40vw;
word-wrap: break-word;
}
</style>"#;
let _debug_html = r#"<style> let _debug_html = r#"<style>
*:not(path):not(g) {{ *:not(path):not(g) {{
color: hsla(210, 100%, 100%, 0.9) !important; color: hsla(210, 100%, 100%, 0.9) !important;
@@ -43,9 +58,12 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
<head> <head>
<meta charset="utf-8"> <meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<title>Hello Bulma!</title> <title>Zoidberg</title>
<link rel="icon" href="data:image/svg+xml,%3Csvg%20xmlns='http://www.w3.org/2000/svg'%20viewBox='0%200%2016%2016'%3E%3Ctext%20x='0'%20y='14'%3E🦀%3C/text%3E%3C/svg%3E" type="image/svg+xml" />
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
{} {}
{}
</head> </head>
<body> <body>
<section class="section"> <section class="section">
@@ -73,7 +91,7 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
</body> </body>
</html> </html>
"#, "#,
_debug_html, jobs_html, workers_html style, _debug_html, jobs_html, workers_html
); );
page page
} }