diff --git a/Cargo.lock b/Cargo.lock index 8af8ada..448c46d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -166,7 +166,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "socket2", - "time", + "time 0.3.14", "url", ] @@ -351,8 +351,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" dependencies = [ "iana-time-zone", + "js-sys", "num-integer", "num-traits", + "time 0.1.44", + "wasm-bindgen", "winapi", ] @@ -393,7 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05" dependencies = [ "percent-encoding", - "time", + "time 0.3.14", "version_check", ] @@ -648,7 +651,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -929,7 +932,7 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -1505,6 +1508,17 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "time" version = "0.3.14" @@ -1686,6 +1700,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -1708,6 +1731,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1902,11 +1931,14 @@ name = "zoidberg_server" version = "0.1.0" dependencies = [ "actix-web", + "chrono", "clap", "env_logger", "futures", "log", "serde_json", + "tokio", + "uuid", "zoidberg_lib", ] diff --git a/zoidberg_client/src/main.rs b/zoidberg_client/src/main.rs index 2e92972..faac711 100644 --- a/zoidberg_client/src/main.rs +++ b/zoidberg_client/src/main.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use std::time::Duration; use tokio::{process::Command, time}; -use zoidberg_lib::types::{FetchResponse, Heartbeat, Job, RegisterResponse, Status, Update}; +use zoidberg_lib::types::{ + FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, Update, +}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -32,7 +34,7 @@ fn build_client(secret: &str) -> Client { #[derive(Debug, Clone)] struct Worker { - id: i32, + id: String, secret: String, server: String, } @@ -48,17 +50,17 @@ impl Worker { let r: RegisterResponse = serde_json::from_str(&body)?; log::info!("registered worker with id: {}", &r.id); Ok(Worker { - id: r.id, + id: r.id.to_string(), secret: secret.to_string(), server: server.to_string(), }) } - async fn update(self: &Self, jobs: &[Job]) -> Result<(), Box> { + async fn update(&self, jobs: &[Job]) -> Result<(), Box> { let updates: Vec = jobs .iter() .map(|job| Update { - worker: self.id, + worker: self.id.clone(), job: job.id, status: job.status.clone(), }) @@ -76,9 +78,12 @@ impl Worker { Ok(()) } - async fn fetch(self: &Self) -> Result> { + async fn fetch(&self) -> Result> { let res = build_client(&self.secret) - .get(format!("{}/fetch", self.server)) + .post(format!("{}/fetch", self.server)) + .json(&FetchRequest { + worker_id: self.id.clone(), + }) .send() .await?; let body = res.text().await?; @@ -86,10 +91,12 @@ impl Worker { Ok(resp) } - async fn heartbeat(self: &Self) { + async fn heartbeat(&self) { let _ = build_client(&self.secret) .post(format!("{}/heartbeat", self.server)) - .json(&Heartbeat { id: self.id }) + .json(&Heartbeat { + id: self.id.clone(), + }) .send() .await; } @@ -143,36 +150,47 @@ async fn main() -> Result<(), Box> { ); let pause = time::Duration::from_secs(1); - let long_pause = time::Duration::from_secs(20); + let long_pause = time::Duration::from_secs(40); let heartbeat_pause = time::Duration::from_secs(30); + let (heartbeat_handle, abort_registration) = AbortHandle::new_pair(); + let c = Arc::clone(&client); + tokio::spawn(Abortable::new( + async move { + loop { + time::sleep(heartbeat_pause).await; + c.heartbeat().await; + } + }, + abort_registration, + )); + + let mut fail_counter = 0; loop { let jobs = if let Ok(fetch) = client.fetch().await { + fail_counter = 0; match fetch { FetchResponse::Nop => { time::sleep(pause).await; continue; } - FetchResponse::StopWorking => break, + FetchResponse::Terminate(m) => { + println!("Terminate worker: {}", m); + break; + } FetchResponse::Jobs(jobs) => jobs, } } else { + fail_counter += 1; + if fail_counter == 3 { + log::error!("failed to fetch three times, assume that server crashed and exit"); + std::process::exit(1); + } + log::error!("failed to fetch new jobs"); time::sleep(long_pause).await; continue; }; - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let c = Arc::clone(&client); - tokio::spawn(Abortable::new( - async move { - loop { - time::sleep(heartbeat_pause).await; - c.heartbeat().await; - } - }, - abort_registration, - )); - for job in jobs { let status = match run(&job).await { Ok(()) => Status::Completed, @@ -186,7 +204,7 @@ async fn main() -> Result<(), Box> { log::info!("Could not update job: {}", error); } } - abort_handle.abort(); } + heartbeat_handle.abort(); Ok(()) } diff --git a/zoidberg_lib/src/types.rs b/zoidberg_lib/src/types.rs index 6188b81..8fcb0db 100644 --- a/zoidberg_lib/src/types.rs +++ b/zoidberg_lib/src/types.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct Update { - pub worker: i32, + pub worker: String, pub job: i32, pub status: Status, } @@ -12,7 +12,7 @@ pub struct Update { #[derive(Serialize, Deserialize, Clone, Debug)] pub enum Status { Submitted, - Running, + Running(String), Completed, Failed, } @@ -27,7 +27,7 @@ impl fmt::Display for Status { fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Status::Submitted => write!(f, "submitted"), - Status::Running => write!(f, "running"), + Status::Running(w) => write!(f, "running on worker {}", w), Status::Completed => write!(f, "completed"), Status::Failed => write!(f, "failed"), } @@ -55,13 +55,18 @@ pub struct Node { #[derive(Serialize, Deserialize)] pub struct RegisterResponse { - pub id: i32, + pub id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct FetchRequest { + pub worker_id: String, } #[derive(Serialize, Deserialize)] pub enum FetchResponse { Jobs(Vec), - StopWorking, + Terminate(String), Nop, } @@ -73,11 +78,13 @@ pub struct Submit { #[derive(Serialize, Deserialize)] pub struct Worker { #[serde(default)] - pub id: i32, + pub id: String, + #[serde(default)] + pub last_heartbeat: Option, } #[derive(Serialize, Deserialize)] pub struct Heartbeat { #[serde(default)] - pub id: i32, + pub id: String, } diff --git a/zoidberg_server/Cargo.toml b/zoidberg_server/Cargo.toml index c0e6220..8fab7dc 100644 --- a/zoidberg_server/Cargo.toml +++ b/zoidberg_server/Cargo.toml @@ -16,3 +16,6 @@ clap = "3.2" env_logger = "0.9" log = "0.4" futures = "0.3.24" +tokio = { version = "1", features = ["full"] } +chrono = "0.4.22" +uuid = { version = "1.1.2", features = ["v4"] } diff --git a/zoidberg_server/src/main.rs b/zoidberg_server/src/main.rs index 6571841..12e4d0c 100644 --- a/zoidberg_server/src/main.rs +++ b/zoidberg_server/src/main.rs @@ -3,13 +3,17 @@ use actix_web::{ dev, get, middleware::Logger, post, web, App, Error, FromRequest, HttpRequest, HttpResponse, HttpServer, Responder, Result, }; +use chrono::Utc; use clap; use env_logger::Env; use futures::future::{err, ok, Ready}; use log; use std::sync::Mutex; +use std::time::Duration; +use uuid::Uuid; use zoidberg_lib::types::{ - FetchResponse, Heartbeat, Job, RegisterResponse, StatusRequest, Update, Worker, + FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, StatusRequest, Update, + Worker, }; mod webpage; @@ -17,7 +21,6 @@ mod webpage; const VERSION: &str = env!("CARGO_PKG_VERSION"); struct State { - counter_workers: Mutex, counter_jobs: Mutex, workers: Mutex>, new_jobs: Mutex>, @@ -27,7 +30,6 @@ struct State { impl State { fn new() -> Self { Self { - counter_workers: Mutex::new(0), counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), new_jobs: Mutex::new(Vec::new()), @@ -68,24 +70,40 @@ async fn index(data: web::Data) -> impl Responder { #[get("/register")] async fn register(data: web::Data, _: Authorization) -> Result { - let mut counter_workers = data.counter_workers.lock().unwrap(); - *counter_workers += 1; - let mut workers = data.workers.lock().unwrap(); + let uuid = Uuid::new_v4().to_string(); workers.push(Worker { - id: *counter_workers, + id: uuid.clone(), + last_heartbeat: None, }); - log::info!("Registered worker node with id: {}", *counter_workers); - Ok(web::Json(RegisterResponse { - id: *counter_workers, - })) + log::info!("Registered worker node with id: {}", uuid); + Ok(web::Json(RegisterResponse { id: uuid })) } -#[get("/fetch")] -async fn fetch(data: web::Data, _: Authorization) -> Result { +#[post("/fetch")] +async fn fetch( + data: web::Data, + f: web::Json, + _: Authorization, +) -> Result { + let requesting_worker = f.into_inner().worker_id; + { + let workers = data.workers.lock().unwrap(); + if workers.iter().filter(|w| w.id == requesting_worker).count() != 1 { + return Ok(web::Json(FetchResponse::Terminate( + "Worker not found".into(), + ))); + } + } let mut new_jobs = data.new_jobs.lock().unwrap(); if let Some(j) = new_jobs.pop() { + let mut jobs = data.jobs.lock().unwrap(); + for cj in jobs.iter_mut() { + if cj.id == j.id { + cj.status = Status::Running(requesting_worker.clone()) + } + } return Ok(web::Json(FetchResponse::Jobs(vec![j]))); } Ok(web::Json(FetchResponse::Nop)) @@ -133,8 +151,18 @@ async fn update( } #[post("/heartbeat")] -async fn heartbeat(heartbeat: web::Json, _: Authorization) -> Result { +async fn heartbeat( + heartbeat: web::Json, + data: web::Data, + _: Authorization, +) -> Result { log::info!("Heartbeat from worker {}", heartbeat.id); + let mut workers = data.workers.lock().unwrap(); + for w in workers.iter_mut() { + if w.id == heartbeat.id { + w.last_heartbeat = Some(Utc::now().timestamp()); + } + } Ok(format!("Heartbeat from worker {}", heartbeat.id)) } @@ -181,6 +209,18 @@ async fn main() -> std::io::Result<()> { let state = web::Data::new(State::new()); + let s = state.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + let mut workers = s.workers.lock().unwrap(); + workers.retain(|w| match w.last_heartbeat { + None => true, + Some(t) => Utc::now().timestamp() - t < 60, + }) + } + }); + HttpServer::new(move || { App::new() .wrap(Logger::default()) @@ -264,8 +304,8 @@ mod tests { FetchResponse::Nop => { panic!("did not expect FetchResponse::Nop") } - FetchResponse::StopWorking => { - panic!("did not expect FetchResponse::NotWorking") + FetchResponse::Terminate(w) => { + panic!("did not expect FetchResponse::Terminate from worker {}", w) } FetchResponse::Jobs(new_jobs) => { assert_eq!(new_jobs[0].id, 0); @@ -288,7 +328,7 @@ mod tests { jobs: Mutex::new(vec![Job { id: 1, cmd: cmd.clone(), - status: Status::Running, + status: Status::Running(0), }]), })) .service(status), @@ -317,7 +357,7 @@ mod tests { .set_json(vec![Update { worker: 0, job: 0, - status: Status::Running, + status: Status::Running(0), }]) .uri("/update") .to_request(); @@ -339,7 +379,7 @@ mod tests { .set_json(vec![Job { id: 0, cmd: String::from("hi"), - status: Status::Running, + status: Status::Running(0), }]) .uri("/submit") .to_request(); diff --git a/zoidberg_server/src/webpage.rs b/zoidberg_server/src/webpage.rs index f74c095..136c043 100644 --- a/zoidberg_server/src/webpage.rs +++ b/zoidberg_server/src/webpage.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use zoidberg_lib::types::{Job, Worker}; // TODO: write nicer frontend @@ -17,10 +18,17 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String { + ""; let workers_html: String = String::from("") - + "" + + "" + &workers .iter() - .map(|w| format!("", w.id)) + .map(|w| { + let ts = if let Some(ts) = w.last_heartbeat { + format!("{}", Utc::now().timestamp() - ts) + } else { + String::from("") + }; + format!("", w.id, ts) + }) .collect::>() .join("\n") + "
ID
IDlast heartbeat
{}
{}{}
"; @@ -43,7 +51,8 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String { - Hello Bulma! + Zoidberg + {}