From 65a50f8b74017ae3e966727b3a86427505474f1c Mon Sep 17 00:00:00 2001 From: Johannes Heuel Date: Tue, 20 Sep 2022 19:15:45 +0200 Subject: [PATCH] add logging --- Cargo.lock | 23 ++++ zoidberg_client/Cargo.toml | 2 + zoidberg_client/src/main.rs | 14 ++- zoidberg_lib/src/types.rs | 20 ++-- zoidberg_server/Cargo.toml | 6 +- zoidberg_server/src/main.rs | 202 +++++++++++++----------------------- 6 files changed, 121 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22ed4a0..50032d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,6 +473,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_logger" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c90bf5f19754d10198ccb95b70664fc925bd1fc090a0fd9a6ebc54acc8cd6272" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "fastrand" version = "1.8.0" @@ -706,6 +719,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.20" @@ -1859,6 +1878,8 @@ name = "zoidberg_client" version = "0.1.0" dependencies = [ "clap", + "env_logger", + "log", "reqwest", "reqwest-middleware", "reqwest-retry", @@ -1881,6 +1902,8 @@ version = "0.1.0" dependencies = [ "actix-web", "clap", + "env_logger", + "log", "serde_json", "zoidberg_lib", ] diff --git a/zoidberg_client/Cargo.toml b/zoidberg_client/Cargo.toml index 5d48d6e..f0116cc 100644 --- a/zoidberg_client/Cargo.toml +++ b/zoidberg_client/Cargo.toml @@ -18,3 +18,5 @@ reqwest-retry = "0.1.5" reqwest-middleware = "0.1.6" tokio = { version = "1", features = ["full"] } clap = "3.2.22" +env_logger = "0.9" +log = "0.4" diff --git a/zoidberg_client/src/main.rs b/zoidberg_client/src/main.rs index ba4d321..8026a07 100644 --- a/zoidberg_client/src/main.rs +++ b/zoidberg_client/src/main.rs @@ -1,4 +1,6 @@ use clap::{App, Arg}; +use env_logger::Env; +use log; use reqwest::{header, Client, ClientBuilder}; use std::error::Error; use std::process::Command; @@ -7,6 +9,8 @@ use std::{thread, time}; use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update}; +const VERSION: &str = env!("CARGO_PKG_VERSION"); + fn build_client(secret: &str) -> Client { let cookie = format!("secret={}", secret); @@ -38,7 +42,7 @@ impl Worker { let body = res.text().await?; let r: RegisterResponse = serde_json::from_str(&body)?; - println!("registered worker with id: {}", &r.id); + log::info!("registered worker with id: {}", &r.id); Ok(Worker { id: r.id }) } @@ -60,7 +64,7 @@ impl Worker { .text() .await?; - println!("Body:\n{}", body); + log::info!("Body: {}", body); Ok(()) } @@ -77,7 +81,7 @@ impl Worker { async fn run(self: &Self, job: &Job) -> Result<(), Box> { let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?; - println!( + log::info!( "command: {}\nstdout: {}\nstderr: {}", &job.cmd, String::from_utf8_lossy(&output.stdout), @@ -100,7 +104,7 @@ impl Worker { ..job.clone() }]; if let Err(error) = self.update(n).await { - println!("Could not update job: {}", error); + log::info!("Could not update job: {}", error); } } } @@ -108,7 +112,7 @@ impl Worker { #[tokio::main] async fn main() -> Result<(), Box> { - const VERSION: &str = env!("CARGO_PKG_VERSION"); + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); let matches = App::new("Zoidberg client") .version(VERSION) diff --git a/zoidberg_lib/src/types.rs b/zoidberg_lib/src/types.rs index cb82ea3..94d893b 100644 --- a/zoidberg_lib/src/types.rs +++ b/zoidberg_lib/src/types.rs @@ -39,13 +39,6 @@ pub struct StatusRequest { pub id: i32, } -#[derive(Serialize, Deserialize)] -pub enum FetchResponse { - Jobs(Vec), - StopWorking, - Nop, -} - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Job { #[serde(default)] @@ -65,7 +58,20 @@ pub struct RegisterResponse { pub id: i32, } +#[derive(Serialize, Deserialize)] +pub enum FetchResponse { + Jobs(Vec), + StopWorking, + Nop, +} + #[derive(Serialize, Deserialize)] pub struct Submit { pub cmd: String, } + +#[derive(Serialize, Deserialize)] +pub struct Worker { + #[serde(default)] + pub id: i32, +} diff --git a/zoidberg_server/Cargo.toml b/zoidberg_server/Cargo.toml index b0258e4..723dab4 100644 --- a/zoidberg_server/Cargo.toml +++ b/zoidberg_server/Cargo.toml @@ -11,5 +11,7 @@ version = "0.1.0" [dependencies] actix-web = "4" -serde_json = "1.0.85" -clap = "3.2.22" +serde_json = "1.0" +clap = "3.2" +env_logger = "0.9" +log = "0.4" diff --git a/zoidberg_server/src/main.rs b/zoidberg_server/src/main.rs index a0a4396..5ad6e0c 100644 --- a/zoidberg_server/src/main.rs +++ b/zoidberg_server/src/main.rs @@ -1,114 +1,50 @@ use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result}; use clap; +use env_logger::Env; +use log; use std::sync::Mutex; +use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update, Worker}; -use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update}; +mod webpage; + +const VERSION: &str = env!("CARGO_PKG_VERSION"); struct State { - counter: Mutex, - jobcounter: Mutex, - workers: Mutex>, + counter_workers: Mutex, + counter_jobs: Mutex, + workers: Mutex>, + new_jobs: Mutex>, jobs: Mutex>, - running_jobs: Mutex>, } #[get("/")] async fn index(data: web::Data) -> impl Responder { let workers = data.workers.lock().unwrap(); - let jobs = data.running_jobs.lock().unwrap(); - - let jobs_html: String = String::from("") - + "" - + &jobs - .iter() - .map(|j| { - format!( - "", - j.id, j.cmd, j.status - ) - }) - .collect::>() - .join("\n") - + "
IDcommandstatus
{}{}{}
"; - - let workers_html: String = String::from("") - + "" - + &workers - .iter() - .map(|w| format!("", w)) - .collect::>() - .join("\n") - + "
ID
{}
"; - - let _debug_html = r#""#; - let _debug_html = ""; - - let page = format!( - r#" - - - - - - Hello Bulma! - - {} - - -
-
-
-
-
-

- Jobs -

- {} -
-
-
-
-

- Workers -

- {} -
-
-
-
-
- - -"#, - _debug_html, jobs_html, workers_html - ); + let new_jobs = data.new_jobs.lock().unwrap(); + let page = webpage::render(&*new_jobs, &*workers); HttpResponse::Ok().body(page) } #[get("/register")] async fn register(data: web::Data) -> Result { - let mut counter = data.counter.lock().unwrap(); - *counter += 1; + let mut counter_workers = data.counter_workers.lock().unwrap(); + *counter_workers += 1; let mut workers = data.workers.lock().unwrap(); - workers.push(*counter); + workers.push(Worker { + id: *counter_workers, + }); - println!("Registered worker node with id: {}", *counter); - Ok(web::Json(RegisterResponse { id: *counter })) + log::info!("Registered worker node with id: {}", *counter_workers); + Ok(web::Json(RegisterResponse { + id: *counter_workers, + })) } #[get("/fetch")] async fn fetch(data: web::Data) -> Result { - let mut jobs = data.jobs.lock().unwrap(); - if let Some(j) = jobs.pop() { + let mut new_jobs = data.new_jobs.lock().unwrap(); + if let Some(j) = new_jobs.pop() { return Ok(web::Json(FetchResponse::Jobs(vec![j]))); } Ok(web::Json(FetchResponse::Nop)) @@ -119,8 +55,8 @@ async fn status( s: web::Json>, data: web::Data, ) -> Result { - let running_jobs = data.running_jobs.lock().unwrap(); - let status_updates: Vec = running_jobs + let jobs = data.jobs.lock().unwrap(); + let status_updates: Vec = jobs .iter() .filter(|r| s.iter().filter(|i| i.id == r.id).count() > 0) .cloned() @@ -131,16 +67,18 @@ async fn status( #[post("/update")] async fn update(updates: web::Json>, data: web::Data) -> Result { - let mut running_jobs = data.running_jobs.lock().unwrap(); + let mut jobs = data.jobs.lock().unwrap(); let mut n = 0; for update in updates.iter() { - println!( + log::info!( "Worker {} updated job {} with status {}", - update.worker, update.job, update.status + update.worker, + update.job, + update.status ); - for i in 0..running_jobs.len() { - if running_jobs[i].id == update.job { - running_jobs[i].status = update.status.clone(); + for i in 0..jobs.len() { + if jobs[i].id == update.job { + jobs[i].status = update.status.clone(); } } n += 1; @@ -150,30 +88,30 @@ async fn update(updates: web::Json>, data: web::Data) -> Resu #[post("/submit")] async fn submit(data: web::Data, js: web::Json>) -> Result { + let mut new_jobs = data.new_jobs.lock().unwrap(); let mut jobs = data.jobs.lock().unwrap(); - let mut running_jobs = data.running_jobs.lock().unwrap(); - let mut jobcounter = data.jobcounter.lock().unwrap(); - let mut new_jobs = Vec::new(); + let mut counter_jobs = data.counter_jobs.lock().unwrap(); + let mut new_new_jobs = Vec::new(); for j in js.into_inner() { - *jobcounter += 1; + *counter_jobs += 1; let cmd = j.cmd.clone(); - println!("Job submitted with id: {}, cmd: {}", *jobcounter, cmd); + log::info!("Job submitted with id: {}, cmd: {}", *counter_jobs, cmd); - new_jobs.push(Job { - id: *jobcounter, + new_new_jobs.push(Job { + id: *counter_jobs, ..j }); } - for job in new_jobs.iter() { + for job in new_new_jobs.iter() { + new_jobs.push(job.clone()); jobs.push(job.clone()); - running_jobs.push(job.clone()); } - Ok(web::Json(new_jobs)) + Ok(web::Json(new_new_jobs)) } #[actix_web::main] async fn main() -> std::io::Result<()> { - const VERSION: &str = env!("CARGO_PKG_VERSION"); + env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init(); let _matches = clap::App::new("Zoidberg server") .version(VERSION) @@ -181,11 +119,11 @@ async fn main() -> std::io::Result<()> { .get_matches(); let state = web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), + new_jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()), - running_jobs: Mutex::new(Vec::new()), }); HttpServer::new(move || { @@ -214,11 +152,11 @@ mod tests { let app = test::init_service( App::new() .app_data(web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), + new_jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()), - running_jobs: Mutex::new(Vec::new()), })) .service(index), ) @@ -233,11 +171,11 @@ mod tests { let app = test::init_service( App::new() .app_data(web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), + new_jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()), - running_jobs: Mutex::new(Vec::new()), })) .service(register), ) @@ -253,15 +191,15 @@ mod tests { let app = test::init_service( App::new() .app_data(web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), - jobs: Mutex::new(vec![Job { + new_jobs: Mutex::new(vec![Job { id: 0, cmd: cmd.clone(), status: Status::Submitted, }]), - running_jobs: Mutex::new(Vec::new()), + jobs: Mutex::new(Vec::new()), })) .service(fetch), ) @@ -275,9 +213,9 @@ mod tests { FetchResponse::StopWorking => { panic!("did not expect FetchResponse::NotWorking") } - FetchResponse::Jobs(jobs) => { - assert_eq!(jobs[0].id, 0); - assert_eq!(jobs[0].cmd, cmd); + FetchResponse::Jobs(new_jobs) => { + assert_eq!(new_jobs[0].id, 0); + assert_eq!(new_jobs[0].cmd, cmd); } } } @@ -288,11 +226,11 @@ mod tests { let app = test::init_service( App::new() .app_data(web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), - jobs: Mutex::new(Vec::new()), - running_jobs: Mutex::new(vec![Job { + new_jobs: Mutex::new(Vec::new()), + jobs: Mutex::new(vec![Job { id: 1, cmd: cmd.clone(), status: Status::Running, @@ -314,11 +252,11 @@ mod tests { let app = test::init_service( App::new() .app_data(web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), + new_jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()), - running_jobs: Mutex::new(Vec::new()), })) .service(update), ) @@ -340,11 +278,11 @@ mod tests { let app = test::init_service( App::new() .app_data(web::Data::new(State { - counter: Mutex::new(0), - jobcounter: Mutex::new(0), + counter_workers: Mutex::new(0), + counter_jobs: Mutex::new(0), workers: Mutex::new(Vec::new()), + new_jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()), - running_jobs: Mutex::new(Vec::new()), })) .service(submit), )