diff --git a/Cargo.lock b/Cargo.lock index 3f33aa5..8af8ada 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,6 +1879,7 @@ version = "0.1.0" dependencies = [ "clap", "env_logger", + "futures", "log", "reqwest", "reqwest-middleware", diff --git a/zoidberg_client/Cargo.toml b/zoidberg_client/Cargo.toml index f0116cc..a8a4902 100644 --- a/zoidberg_client/Cargo.toml +++ b/zoidberg_client/Cargo.toml @@ -20,3 +20,4 @@ tokio = { version = "1", features = ["full"] } clap = "3.2.22" env_logger = "0.9" log = "0.4" +futures = "0.3.24" diff --git a/zoidberg_client/src/main.rs b/zoidberg_client/src/main.rs index 1461457..2e92972 100644 --- a/zoidberg_client/src/main.rs +++ b/zoidberg_client/src/main.rs @@ -1,13 +1,15 @@ use clap::{App, Arg}; use env_logger::Env; +use futures::future::{AbortHandle, Abortable}; use log; use reqwest::{header, Client, ClientBuilder}; use std::error::Error; -use std::process::Command; +use std::process::Stdio; +use std::sync::Arc; use std::time::Duration; -use std::{thread, time}; +use tokio::{process::Command, time}; -use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update}; +use zoidberg_lib::types::{FetchResponse, Heartbeat, Job, RegisterResponse, Status, Update}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -28,10 +30,11 @@ fn build_client(secret: &str) -> Client { .expect("Could not create client") } -#[derive(Debug)] +#[derive(Debug, Clone)] struct Worker { id: i32, secret: String, + server: String, } impl Worker { @@ -47,6 +50,7 @@ impl Worker { Ok(Worker { id: r.id, secret: secret.to_string(), + server: server.to_string(), }) } @@ -61,7 +65,7 @@ impl Worker { .collect(); let body = build_client(&self.secret) - .post("http://localhost:8080/update") + .post(format!("{}/update", self.server)) .json(&updates) .send() .await? @@ -74,7 +78,7 @@ impl Worker { async fn fetch(self: &Self) -> Result> { let res = build_client(&self.secret) - .get("http://localhost:8080/fetch") + .get(format!("{}/fetch", self.server)) .send() .await?; let body = res.text().await?; @@ -82,35 +86,33 @@ impl Worker { Ok(resp) } - async fn run(self: &Self, job: &Job) -> Result<(), Box> { - let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?; - - log::info!( - "command: {}\nstdout: {}\nstderr: {}", - &job.cmd, - String::from_utf8_lossy(&output.stdout), - String::from_utf8_lossy(&output.stderr) - ); - match output.status.success() { - true => Ok(()), - false => Err(Box::from("Job failed")), - } + async fn heartbeat(self: &Self) { + let _ = build_client(&self.secret) + .post(format!("{}/heartbeat", self.server)) + .json(&Heartbeat { id: self.id }) + .send() + .await; } +} - async fn process(self: &Self, jobs: &[Job]) { - for job in jobs { - let status = match self.run(&job).await { - Ok(()) => Status::Completed, - Err(..) => Status::Failed, - }; - let n = &[Job { - status, - ..job.clone() - }]; - if let Err(error) = self.update(n).await { - log::info!("Could not update job: {}", error); - } - } +async fn run(job: &Job) -> Result<(), Box> { + let output = Command::new("bash") + .arg("-c") + .arg(&job.cmd) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output(); + let output = output.await?; + + log::info!( + "command: {}\nstdout: {}\nstderr: {}", + &job.cmd, + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + match output.status.success() { + true => Ok(()), + false => Err(Box::from("Job failed")), } } @@ -130,27 +132,61 @@ async fn main() -> Result<(), Box> { .get_matches(); let server = matches.value_of("server").unwrap(); let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| { - println!("Please set the $ZOIDBERG_SECRET environment variable"); + eprintln!("Please set the $ZOIDBERG_SECRET environment variable"); std::process::exit(1); }); - let client = Worker::new(server, &secret) - .await - .expect("Could not create client"); + let client = Arc::new( + Worker::new(server, &secret) + .await + .expect("Could not create client"), + ); let pause = time::Duration::from_secs(1); let long_pause = time::Duration::from_secs(20); + let heartbeat_pause = time::Duration::from_secs(30); loop { - if let Ok(fetch) = client.fetch().await { + let jobs = if let Ok(fetch) = client.fetch().await { match fetch { - FetchResponse::Nop => thread::sleep(pause), + FetchResponse::Nop => { + time::sleep(pause).await; + continue; + } FetchResponse::StopWorking => break, - FetchResponse::Jobs(jobs) => client.process(&jobs).await, + FetchResponse::Jobs(jobs) => jobs, } } else { - thread::sleep(long_pause); + time::sleep(long_pause).await; + continue; + }; + + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let c = Arc::clone(&client); + tokio::spawn(Abortable::new( + async move { + loop { + time::sleep(heartbeat_pause).await; + c.heartbeat().await; + } + }, + abort_registration, + )); + + for job in jobs { + let status = match run(&job).await { + Ok(()) => Status::Completed, + Err(..) => Status::Failed, + }; + let update = &[Job { + status, + ..job.clone() + }]; + if let Err(error) = client.update(update).await { + log::info!("Could not update job: {}", error); + } } + abort_handle.abort(); } Ok(()) } diff --git a/zoidberg_lib/src/types.rs b/zoidberg_lib/src/types.rs index 94d893b..6188b81 100644 --- a/zoidberg_lib/src/types.rs +++ b/zoidberg_lib/src/types.rs @@ -75,3 +75,9 @@ pub struct Worker { #[serde(default)] pub id: i32, } + +#[derive(Serialize, Deserialize)] +pub struct Heartbeat { + #[serde(default)] + pub id: i32, +} diff --git a/zoidberg_server/src/main.rs b/zoidberg_server/src/main.rs index e1e271a..6571841 100644 --- a/zoidberg_server/src/main.rs +++ b/zoidberg_server/src/main.rs @@ -8,7 +8,9 @@ use env_logger::Env; use futures::future::{err, ok, Ready}; use log; use std::sync::Mutex; -use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update, Worker}; +use zoidberg_lib::types::{ + FetchResponse, Heartbeat, Job, RegisterResponse, StatusRequest, Update, Worker, +}; mod webpage; @@ -44,7 +46,6 @@ impl FromRequest for Authorization { if let Some(head) = req.headers().get("cookie") { if let Ok(cookie) = head.to_str() { if let Some(secret) = req.app_data::() { - println!("{} == {}", secret, cookie); if secret == cookie { return ok(Authorization {}); } else { @@ -131,6 +132,12 @@ async fn update( Ok(format!("Worker updated {} job(s)", n)) } +#[post("/heartbeat")] +async fn heartbeat(heartbeat: web::Json, _: Authorization) -> Result { + log::info!("Heartbeat from worker {}", heartbeat.id); + Ok(format!("Heartbeat from worker {}", heartbeat.id)) +} + #[post("/submit")] async fn submit( data: web::Data, @@ -163,7 +170,7 @@ async fn main() -> std::io::Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init(); let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| { - println!("Please set the $ZOIDBERG_SECRET environment variable"); + eprintln!("Please set the $ZOIDBERG_SECRET environment variable"); std::process::exit(1); }); @@ -184,6 +191,7 @@ async fn main() -> std::io::Result<()> { .service(fetch) .service(status) .service(update) + .service(heartbeat) .service(submit) }) .bind(("127.0.0.1", 8080))?