diff --git a/zoidberg_client/src/main.rs b/zoidberg_client/src/main.rs index 8308fe3..ba4d321 100644 --- a/zoidberg_client/src/main.rs +++ b/zoidberg_client/src/main.rs @@ -5,7 +5,7 @@ use std::process::Command; use std::time::Duration; use std::{thread, time}; -use zoidberg_lib::types::{Job, RegisterResponse, Status, Update}; +use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update}; fn build_client(secret: &str) -> Client { let cookie = format!("secret={}", secret); @@ -64,14 +64,14 @@ impl Worker { Ok(()) } - async fn fetch(self: &Self) -> Result, Box> { + async fn fetch(self: &Self) -> Result> { let res = build_client("some_secret") .get("http://localhost:8080/fetch") .send() .await?; let body = res.text().await?; - let jobs: Vec = serde_json::from_str(&body)?; - Ok(jobs) + let resp: FetchResponse = serde_json::from_str(&body)?; + Ok(resp) } async fn run(self: &Self, job: &Job) -> Result<(), Box> { @@ -88,6 +88,22 @@ impl Worker { false => Err(Box::from("Job failed")), } } + + async fn process(self: &Self, jobs: &[Job]) { + for job in jobs { + let status = match self.run(&job).await { + Ok(()) => Status::Completed, + Err(..) => Status::Failed, + }; + let n = &[Job { + status, + ..job.clone() + }]; + if let Err(error) = self.update(n).await { + println!("Could not update job: {}", error); + } + } + } } #[tokio::main] @@ -108,31 +124,19 @@ async fn main() -> Result<(), Box> { let client = Worker::new(server).await.expect("Could not create client"); - let pause = time::Duration::from_secs(0); - let long_pause = time::Duration::from_secs(2); - let extra_long_pause = time::Duration::from_secs(4); + let pause = time::Duration::from_secs(1); + let long_pause = time::Duration::from_secs(20); loop { - if let Ok(jobs) = client.fetch().await { - // if there are no jobs, wait a little longer - if jobs.len() == 0 { - thread::sleep(long_pause); - } - - for job in jobs { - let status = match client.run(&job).await { - Ok(()) => Status::Completed, - Err(..) => Status::Failed, - }; - let n = &[Job { status, ..job }]; - if let Err(error) = client.update(n).await { - println!("Could not update job: {}", error); - } + if let Ok(fetch) = client.fetch().await { + match fetch { + FetchResponse::Nop => thread::sleep(pause), + FetchResponse::StopWorking => break, + FetchResponse::Jobs(jobs) => client.process(&jobs).await, } } else { - // wait a little longer whenever job fetching fails - thread::sleep(extra_long_pause); + thread::sleep(long_pause); } - thread::sleep(pause); } + Ok(()) } diff --git a/zoidberg_lib/src/types.rs b/zoidberg_lib/src/types.rs index 9dc63fe..cb82ea3 100644 --- a/zoidberg_lib/src/types.rs +++ b/zoidberg_lib/src/types.rs @@ -39,6 +39,13 @@ pub struct StatusRequest { pub id: i32, } +#[derive(Serialize, Deserialize)] +pub enum FetchResponse { + Jobs(Vec), + StopWorking, + Nop, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Job { #[serde(default)] diff --git a/zoidberg_server/src/main.rs b/zoidberg_server/src/main.rs index c15e710..a0a4396 100644 --- a/zoidberg_server/src/main.rs +++ b/zoidberg_server/src/main.rs @@ -2,7 +2,7 @@ use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result use clap; use std::sync::Mutex; -use zoidberg_lib::types::{Job, RegisterResponse, StatusRequest, Update}; +use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update}; struct State { counter: Mutex, @@ -40,7 +40,7 @@ async fn index(data: web::Data) -> impl Responder { .join("\n") + ""; - let debug_html = r#""#; - let debug_html = ""; + let _debug_html = ""; let page = format!( r#" @@ -88,7 +88,7 @@ async fn index(data: web::Data) -> impl Responder { "#, - debug_html, jobs_html, workers_html + _debug_html, jobs_html, workers_html ); HttpResponse::Ok().body(page) } @@ -109,9 +109,9 @@ async fn register(data: web::Data) -> Result { async fn fetch(data: web::Data) -> Result { let mut jobs = data.jobs.lock().unwrap(); if let Some(j) = jobs.pop() { - return Ok(web::Json(vec![j])); + return Ok(web::Json(FetchResponse::Jobs(vec![j]))); } - Ok(web::Json(Vec::new())) + Ok(web::Json(FetchResponse::Nop)) } #[post("/status")] @@ -267,13 +267,19 @@ mod tests { ) .await; let req = test::TestRequest::get().uri("/fetch").to_request(); - let resp: Vec = test::call_and_read_body_json(&app, req).await; - assert_eq!(resp[0].id, 0); - assert_eq!(resp[0].cmd, cmd); - - let req = test::TestRequest::get().uri("/fetch").to_request(); - let resp: Vec = test::call_and_read_body_json(&app, req).await; - assert_eq!(resp.len(), 0); + let resp: FetchResponse = test::call_and_read_body_json(&app, req).await; + match resp { + FetchResponse::Nop => { + panic!("did not expect FetchResponse::Nop") + } + FetchResponse::StopWorking => { + panic!("did not expect FetchResponse::NotWorking") + } + FetchResponse::Jobs(jobs) => { + assert_eq!(jobs[0].id, 0); + assert_eq!(jobs[0].cmd, cmd); + } + } } #[actix_web::test]