add logging
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Johannes Heuel
2022-09-20 19:15:45 +02:00
parent 37d7177126
commit 65a50f8b74
6 changed files with 121 additions and 146 deletions

23
Cargo.lock generated
View File

@@ -473,6 +473,19 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "env_logger"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c90bf5f19754d10198ccb95b70664fc925bd1fc090a0fd9a6ebc54acc8cd6272"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "1.8.0" version = "1.8.0"
@@ -706,6 +719,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.20" version = "0.14.20"
@@ -1859,6 +1878,8 @@ name = "zoidberg_client"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"clap", "clap",
"env_logger",
"log",
"reqwest", "reqwest",
"reqwest-middleware", "reqwest-middleware",
"reqwest-retry", "reqwest-retry",
@@ -1881,6 +1902,8 @@ version = "0.1.0"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"clap", "clap",
"env_logger",
"log",
"serde_json", "serde_json",
"zoidberg_lib", "zoidberg_lib",
] ]

View File

@@ -18,3 +18,5 @@ reqwest-retry = "0.1.5"
reqwest-middleware = "0.1.6" reqwest-middleware = "0.1.6"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
clap = "3.2.22" clap = "3.2.22"
env_logger = "0.9"
log = "0.4"

View File

@@ -1,4 +1,6 @@
use clap::{App, Arg}; use clap::{App, Arg};
use env_logger::Env;
use log;
use reqwest::{header, Client, ClientBuilder}; use reqwest::{header, Client, ClientBuilder};
use std::error::Error; use std::error::Error;
use std::process::Command; use std::process::Command;
@@ -7,6 +9,8 @@ use std::{thread, time};
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update}; use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update};
const VERSION: &str = env!("CARGO_PKG_VERSION");
fn build_client(secret: &str) -> Client { fn build_client(secret: &str) -> Client {
let cookie = format!("secret={}", secret); let cookie = format!("secret={}", secret);
@@ -38,7 +42,7 @@ impl Worker {
let body = res.text().await?; let body = res.text().await?;
let r: RegisterResponse = serde_json::from_str(&body)?; let r: RegisterResponse = serde_json::from_str(&body)?;
println!("registered worker with id: {}", &r.id); log::info!("registered worker with id: {}", &r.id);
Ok(Worker { id: r.id }) Ok(Worker { id: r.id })
} }
@@ -60,7 +64,7 @@ impl Worker {
.text() .text()
.await?; .await?;
println!("Body:\n{}", body); log::info!("Body: {}", body);
Ok(()) Ok(())
} }
@@ -77,7 +81,7 @@ impl Worker {
async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> { async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> {
let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?; let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?;
println!( log::info!(
"command: {}\nstdout: {}\nstderr: {}", "command: {}\nstdout: {}\nstderr: {}",
&job.cmd, &job.cmd,
String::from_utf8_lossy(&output.stdout), String::from_utf8_lossy(&output.stdout),
@@ -100,7 +104,7 @@ impl Worker {
..job.clone() ..job.clone()
}]; }];
if let Err(error) = self.update(n).await { if let Err(error) = self.update(n).await {
println!("Could not update job: {}", error); log::info!("Could not update job: {}", error);
} }
} }
} }
@@ -108,7 +112,7 @@ impl Worker {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
const VERSION: &str = env!("CARGO_PKG_VERSION"); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let matches = App::new("Zoidberg client") let matches = App::new("Zoidberg client")
.version(VERSION) .version(VERSION)

View File

@@ -39,13 +39,6 @@ pub struct StatusRequest {
pub id: i32, pub id: i32,
} }
#[derive(Serialize, Deserialize)]
pub enum FetchResponse {
Jobs(Vec<Job>),
StopWorking,
Nop,
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Job { pub struct Job {
#[serde(default)] #[serde(default)]
@@ -65,7 +58,20 @@ pub struct RegisterResponse {
pub id: i32, pub id: i32,
} }
#[derive(Serialize, Deserialize)]
pub enum FetchResponse {
Jobs(Vec<Job>),
StopWorking,
Nop,
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Submit { pub struct Submit {
pub cmd: String, pub cmd: String,
} }
#[derive(Serialize, Deserialize)]
pub struct Worker {
#[serde(default)]
pub id: i32,
}

View File

@@ -11,5 +11,7 @@ version = "0.1.0"
[dependencies] [dependencies]
actix-web = "4" actix-web = "4"
serde_json = "1.0.85" serde_json = "1.0"
clap = "3.2.22" clap = "3.2"
env_logger = "0.9"
log = "0.4"

View File

@@ -1,114 +1,50 @@
use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result}; use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result};
use clap; use clap;
use env_logger::Env;
use log;
use std::sync::Mutex; use std::sync::Mutex;
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update, Worker};
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update}; mod webpage;
const VERSION: &str = env!("CARGO_PKG_VERSION");
struct State { struct State {
counter: Mutex<i32>, counter_workers: Mutex<i32>,
jobcounter: Mutex<i32>, counter_jobs: Mutex<i32>,
workers: Mutex<Vec<i32>>, workers: Mutex<Vec<Worker>>,
new_jobs: Mutex<Vec<Job>>,
jobs: Mutex<Vec<Job>>, jobs: Mutex<Vec<Job>>,
running_jobs: Mutex<Vec<Job>>,
} }
#[get("/")] #[get("/")]
async fn index(data: web::Data<State>) -> impl Responder { async fn index(data: web::Data<State>) -> impl Responder {
let workers = data.workers.lock().unwrap(); let workers = data.workers.lock().unwrap();
let jobs = data.running_jobs.lock().unwrap(); let new_jobs = data.new_jobs.lock().unwrap();
let page = webpage::render(&*new_jobs, &*workers);
let jobs_html: String = String::from("<table class=\"table is-hoverable\">")
+ "<thead><tr><th><td>ID</td><td>command</td><td>status</td></th></tr></thead><tbody>"
+ &jobs
.iter()
.map(|j| {
format!(
"<tr><th></th><td>{}</td><td>{}</td><td>{}</td></tr>",
j.id, j.cmd, j.status
)
})
.collect::<Vec<String>>()
.join("\n")
+ "</tbody></table>";
let workers_html: String = String::from("<table class=\"table is-hoverable\">")
+ "<thead><tr><th><td>ID</td></th></tr></thead><tbody>"
+ &workers
.iter()
.map(|w| format!("<tr><th></th><td>{}</td></tr>", w))
.collect::<Vec<String>>()
.join("\n")
+ "</tbody></table>";
let _debug_html = r#"<style>
*:not(path):not(g) {{
color: hsla(210, 100%, 100%, 0.9) !important;
background: hsla(210, 100%, 50%, 0.5) !important;
outline: solid 0.25rem hsla(210, 100%, 100%, 0.5) !important;
box-shadow: none !important;
}}
</style>"#;
let _debug_html = "";
let page = format!(
r#"
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Hello Bulma!</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
{}
</head>
<body>
<section class="section">
<div class="container">
<div class="columns">
<div class="column">
<div class="block">
<h1 class="title">
Jobs
</h1>
{}
</div>
</div>
<div class="column">
<div class="block">
<h1 class="title">
Workers
</h1>
{}
</div>
</div>
</div>
</div>
</section>
</body>
</html>
"#,
_debug_html, jobs_html, workers_html
);
HttpResponse::Ok().body(page) HttpResponse::Ok().body(page)
} }
#[get("/register")] #[get("/register")]
async fn register(data: web::Data<State>) -> Result<impl Responder> { async fn register(data: web::Data<State>) -> Result<impl Responder> {
let mut counter = data.counter.lock().unwrap(); let mut counter_workers = data.counter_workers.lock().unwrap();
*counter += 1; *counter_workers += 1;
let mut workers = data.workers.lock().unwrap(); let mut workers = data.workers.lock().unwrap();
workers.push(*counter); workers.push(Worker {
id: *counter_workers,
});
println!("Registered worker node with id: {}", *counter); log::info!("Registered worker node with id: {}", *counter_workers);
Ok(web::Json(RegisterResponse { id: *counter })) Ok(web::Json(RegisterResponse {
id: *counter_workers,
}))
} }
#[get("/fetch")] #[get("/fetch")]
async fn fetch(data: web::Data<State>) -> Result<impl Responder> { async fn fetch(data: web::Data<State>) -> Result<impl Responder> {
let mut jobs = data.jobs.lock().unwrap(); let mut new_jobs = data.new_jobs.lock().unwrap();
if let Some(j) = jobs.pop() { if let Some(j) = new_jobs.pop() {
return Ok(web::Json(FetchResponse::Jobs(vec![j]))); return Ok(web::Json(FetchResponse::Jobs(vec![j])));
} }
Ok(web::Json(FetchResponse::Nop)) Ok(web::Json(FetchResponse::Nop))
@@ -119,8 +55,8 @@ async fn status(
s: web::Json<Vec<StatusRequest>>, s: web::Json<Vec<StatusRequest>>,
data: web::Data<State>, data: web::Data<State>,
) -> Result<impl Responder> { ) -> Result<impl Responder> {
let running_jobs = data.running_jobs.lock().unwrap(); let jobs = data.jobs.lock().unwrap();
let status_updates: Vec<Job> = running_jobs let status_updates: Vec<Job> = jobs
.iter() .iter()
.filter(|r| s.iter().filter(|i| i.id == r.id).count() > 0) .filter(|r| s.iter().filter(|i| i.id == r.id).count() > 0)
.cloned() .cloned()
@@ -131,16 +67,18 @@ async fn status(
#[post("/update")] #[post("/update")]
async fn update(updates: web::Json<Vec<Update>>, data: web::Data<State>) -> Result<String> { async fn update(updates: web::Json<Vec<Update>>, data: web::Data<State>) -> Result<String> {
let mut running_jobs = data.running_jobs.lock().unwrap(); let mut jobs = data.jobs.lock().unwrap();
let mut n = 0; let mut n = 0;
for update in updates.iter() { for update in updates.iter() {
println!( log::info!(
"Worker {} updated job {} with status {}", "Worker {} updated job {} with status {}",
update.worker, update.job, update.status update.worker,
update.job,
update.status
); );
for i in 0..running_jobs.len() { for i in 0..jobs.len() {
if running_jobs[i].id == update.job { if jobs[i].id == update.job {
running_jobs[i].status = update.status.clone(); jobs[i].status = update.status.clone();
} }
} }
n += 1; n += 1;
@@ -150,30 +88,30 @@ async fn update(updates: web::Json<Vec<Update>>, data: web::Data<State>) -> Resu
#[post("/submit")] #[post("/submit")]
async fn submit(data: web::Data<State>, js: web::Json<Vec<Job>>) -> Result<impl Responder> { async fn submit(data: web::Data<State>, js: web::Json<Vec<Job>>) -> Result<impl Responder> {
let mut new_jobs = data.new_jobs.lock().unwrap();
let mut jobs = data.jobs.lock().unwrap(); let mut jobs = data.jobs.lock().unwrap();
let mut running_jobs = data.running_jobs.lock().unwrap(); let mut counter_jobs = data.counter_jobs.lock().unwrap();
let mut jobcounter = data.jobcounter.lock().unwrap(); let mut new_new_jobs = Vec::new();
let mut new_jobs = Vec::new();
for j in js.into_inner() { for j in js.into_inner() {
*jobcounter += 1; *counter_jobs += 1;
let cmd = j.cmd.clone(); let cmd = j.cmd.clone();
println!("Job submitted with id: {}, cmd: {}", *jobcounter, cmd); log::info!("Job submitted with id: {}, cmd: {}", *counter_jobs, cmd);
new_jobs.push(Job { new_new_jobs.push(Job {
id: *jobcounter, id: *counter_jobs,
..j ..j
}); });
} }
for job in new_jobs.iter() { for job in new_new_jobs.iter() {
new_jobs.push(job.clone());
jobs.push(job.clone()); jobs.push(job.clone());
running_jobs.push(job.clone());
} }
Ok(web::Json(new_jobs)) Ok(web::Json(new_new_jobs))
} }
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
const VERSION: &str = env!("CARGO_PKG_VERSION"); env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init();
let _matches = clap::App::new("Zoidberg server") let _matches = clap::App::new("Zoidberg server")
.version(VERSION) .version(VERSION)
@@ -181,11 +119,11 @@ async fn main() -> std::io::Result<()> {
.get_matches(); .get_matches();
let state = web::Data::new(State { let state = web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
}); });
HttpServer::new(move || { HttpServer::new(move || {
@@ -214,11 +152,11 @@ mod tests {
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
})) }))
.service(index), .service(index),
) )
@@ -233,11 +171,11 @@ mod tests {
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
})) }))
.service(register), .service(register),
) )
@@ -253,15 +191,15 @@ mod tests {
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
jobs: Mutex::new(vec![Job { new_jobs: Mutex::new(vec![Job {
id: 0, id: 0,
cmd: cmd.clone(), cmd: cmd.clone(),
status: Status::Submitted, status: Status::Submitted,
}]), }]),
running_jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
})) }))
.service(fetch), .service(fetch),
) )
@@ -275,9 +213,9 @@ mod tests {
FetchResponse::StopWorking => { FetchResponse::StopWorking => {
panic!("did not expect FetchResponse::NotWorking") panic!("did not expect FetchResponse::NotWorking")
} }
FetchResponse::Jobs(jobs) => { FetchResponse::Jobs(new_jobs) => {
assert_eq!(jobs[0].id, 0); assert_eq!(new_jobs[0].id, 0);
assert_eq!(jobs[0].cmd, cmd); assert_eq!(new_jobs[0].cmd, cmd);
} }
} }
} }
@@ -288,11 +226,11 @@ mod tests {
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()), new_jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(vec![Job { jobs: Mutex::new(vec![Job {
id: 1, id: 1,
cmd: cmd.clone(), cmd: cmd.clone(),
status: Status::Running, status: Status::Running,
@@ -314,11 +252,11 @@ mod tests {
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
})) }))
.service(update), .service(update),
) )
@@ -340,11 +278,11 @@ mod tests {
let app = test::init_service( let app = test::init_service(
App::new() App::new()
.app_data(web::Data::new(State { .app_data(web::Data::new(State {
counter: Mutex::new(0), counter_workers: Mutex::new(0),
jobcounter: Mutex::new(0), counter_jobs: Mutex::new(0),
workers: Mutex::new(Vec::new()), workers: Mutex::new(Vec::new()),
new_jobs: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()), jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
})) }))
.service(submit), .service(submit),
) )