change fetch to allow for killing worker nodes
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Johannes Heuel
2022-09-20 18:24:09 +02:00
parent 47c453ff82
commit 37d7177126
3 changed files with 55 additions and 38 deletions

View File

@@ -5,7 +5,7 @@ use std::process::Command;
use std::time::Duration; use std::time::Duration;
use std::{thread, time}; use std::{thread, time};
use zoidberg_lib::types::{Job, RegisterResponse, Status, Update}; use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update};
fn build_client(secret: &str) -> Client { fn build_client(secret: &str) -> Client {
let cookie = format!("secret={}", secret); let cookie = format!("secret={}", secret);
@@ -64,14 +64,14 @@ impl Worker {
Ok(()) Ok(())
} }
async fn fetch(self: &Self) -> Result<Vec<Job>, Box<dyn Error>> { async fn fetch(self: &Self) -> Result<FetchResponse, Box<dyn Error>> {
let res = build_client("some_secret") let res = build_client("some_secret")
.get("http://localhost:8080/fetch") .get("http://localhost:8080/fetch")
.send() .send()
.await?; .await?;
let body = res.text().await?; let body = res.text().await?;
let jobs: Vec<Job> = serde_json::from_str(&body)?; let resp: FetchResponse = serde_json::from_str(&body)?;
Ok(jobs) Ok(resp)
} }
async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> { async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> {
@@ -88,6 +88,22 @@ impl Worker {
false => Err(Box::from("Job failed")), false => Err(Box::from("Job failed")),
} }
} }
async fn process(self: &Self, jobs: &[Job]) {
for job in jobs {
let status = match self.run(&job).await {
Ok(()) => Status::Completed,
Err(..) => Status::Failed,
};
let n = &[Job {
status,
..job.clone()
}];
if let Err(error) = self.update(n).await {
println!("Could not update job: {}", error);
}
}
}
} }
#[tokio::main] #[tokio::main]
@@ -108,31 +124,19 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client = Worker::new(server).await.expect("Could not create client"); let client = Worker::new(server).await.expect("Could not create client");
let pause = time::Duration::from_secs(0); let pause = time::Duration::from_secs(1);
let long_pause = time::Duration::from_secs(2); let long_pause = time::Duration::from_secs(20);
let extra_long_pause = time::Duration::from_secs(4);
loop { loop {
if let Ok(jobs) = client.fetch().await { if let Ok(fetch) = client.fetch().await {
// if there are no jobs, wait a little longer match fetch {
if jobs.len() == 0 { FetchResponse::Nop => thread::sleep(pause),
thread::sleep(long_pause); FetchResponse::StopWorking => break,
} FetchResponse::Jobs(jobs) => client.process(&jobs).await,
for job in jobs {
let status = match client.run(&job).await {
Ok(()) => Status::Completed,
Err(..) => Status::Failed,
};
let n = &[Job { status, ..job }];
if let Err(error) = client.update(n).await {
println!("Could not update job: {}", error);
}
} }
} else { } else {
// wait a little longer whenever job fetching fails thread::sleep(long_pause);
thread::sleep(extra_long_pause);
} }
thread::sleep(pause);
} }
Ok(())
} }

View File

@@ -39,6 +39,13 @@ pub struct StatusRequest {
pub id: i32, pub id: i32,
} }
#[derive(Serialize, Deserialize)]
pub enum FetchResponse {
Jobs(Vec<Job>),
StopWorking,
Nop,
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Job { pub struct Job {
#[serde(default)] #[serde(default)]

View File

@@ -2,7 +2,7 @@ use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result
use clap; use clap;
use std::sync::Mutex; use std::sync::Mutex;
use zoidberg_lib::types::{Job, RegisterResponse, StatusRequest, Update}; use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update};
struct State { struct State {
counter: Mutex<i32>, counter: Mutex<i32>,
@@ -40,7 +40,7 @@ async fn index(data: web::Data<State>) -> impl Responder {
.join("\n") .join("\n")
+ "</tbody></table>"; + "</tbody></table>";
let debug_html = r#"<style> let _debug_html = r#"<style>
*:not(path):not(g) {{ *:not(path):not(g) {{
color: hsla(210, 100%, 100%, 0.9) !important; color: hsla(210, 100%, 100%, 0.9) !important;
background: hsla(210, 100%, 50%, 0.5) !important; background: hsla(210, 100%, 50%, 0.5) !important;
@@ -49,7 +49,7 @@ async fn index(data: web::Data<State>) -> impl Responder {
box-shadow: none !important; box-shadow: none !important;
}} }}
</style>"#; </style>"#;
let debug_html = ""; let _debug_html = "";
let page = format!( let page = format!(
r#" r#"
@@ -88,7 +88,7 @@ async fn index(data: web::Data<State>) -> impl Responder {
</body> </body>
</html> </html>
"#, "#,
debug_html, jobs_html, workers_html _debug_html, jobs_html, workers_html
); );
HttpResponse::Ok().body(page) HttpResponse::Ok().body(page)
} }
@@ -109,9 +109,9 @@ async fn register(data: web::Data<State>) -> Result<impl Responder> {
async fn fetch(data: web::Data<State>) -> Result<impl Responder> { async fn fetch(data: web::Data<State>) -> Result<impl Responder> {
let mut jobs = data.jobs.lock().unwrap(); let mut jobs = data.jobs.lock().unwrap();
if let Some(j) = jobs.pop() { if let Some(j) = jobs.pop() {
return Ok(web::Json(vec![j])); return Ok(web::Json(FetchResponse::Jobs(vec![j])));
} }
Ok(web::Json(Vec::new())) Ok(web::Json(FetchResponse::Nop))
} }
#[post("/status")] #[post("/status")]
@@ -267,13 +267,19 @@ mod tests {
) )
.await; .await;
let req = test::TestRequest::get().uri("/fetch").to_request(); let req = test::TestRequest::get().uri("/fetch").to_request();
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await; let resp: FetchResponse = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp[0].id, 0); match resp {
assert_eq!(resp[0].cmd, cmd); FetchResponse::Nop => {
panic!("did not expect FetchResponse::Nop")
let req = test::TestRequest::get().uri("/fetch").to_request(); }
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await; FetchResponse::StopWorking => {
assert_eq!(resp.len(), 0); panic!("did not expect FetchResponse::NotWorking")
}
FetchResponse::Jobs(jobs) => {
assert_eq!(jobs[0].id, 0);
assert_eq!(jobs[0].cmd, cmd);
}
}
} }
#[actix_web::test] #[actix_web::test]