diff --git a/Cargo.lock b/Cargo.lock index c8fc166..22ed4a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,6 +223,43 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "anyhow" +version = "1.0.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" + +[[package]] +name = "async-trait" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -307,6 +344,42 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +dependencies = [ + "iana-time-zone", + "num-integer", + "num-traits", + "winapi", +] + +[[package]] +name = "clap" +version = "3.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750" +dependencies = [ + "atty", + "bitflags", + "clap_lex", + "indexmap", + "strsim", + "termcolor", + "textwrap", +] + +[[package]] +name = "clap_lex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -455,6 +528,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.24" @@ -462,6 +550,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -470,12 +559,34 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +[[package]] +name = "futures-executor" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +[[package]] +name = "futures-macro" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.24" @@ -494,8 +605,11 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -629,6 +743,20 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237a0714f28b1ee39ccec0770ccb544eb02c9ef2c82bb096230eefcffa6468b0" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "js-sys", + "once_cell", + "wasm-bindgen", + "winapi", +] + [[package]] name = "idna" version = "0.3.0" @@ -755,6 +883,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.5.4" @@ -794,6 +932,25 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -864,6 +1021,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_str_bytes" +version = "6.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1027,6 +1190,7 @@ dependencies = [ "lazy_static", "log", "mime", + "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", @@ -1043,6 +1207,62 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest-middleware" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69539cea4148dce683bec9dc95be3f0397a9bb2c248a49c8296a9d21659a8cdd" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "http", + "reqwest", + "serde", + "task-local-extensions", + "thiserror", +] + +[[package]] +name = "reqwest-retry" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce246a729eaa6aff5e215aee42845bf5fed9893cc6cd51aeeb712f34e04dd9f3" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "futures", + "http", + "hyper", + "reqwest", + "reqwest-middleware", + "retry-policies", + "task-local-extensions", + "tokio", + "tracing", +] + +[[package]] +name = "retry" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9166d72162de3575f950507683fac47e30f6f2c3836b71b7fbc61aa517c9c5f4" +dependencies = [ + "rand", +] + +[[package]] +name = "retry-policies" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47f9e19b18c6cdd796cc70aea8a9ea5ee7b813be611c6589e3624fcdbfd05f9d" +dependencies = [ + "anyhow", + "chrono", + "rand", +] + [[package]] name = "rustc_version" version = "0.4.0" @@ -1191,6 +1411,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "syn" version = "1.0.99" @@ -1202,6 +1428,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "task-local-extensions" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4167afbec18ae012de40f8cf1b9bf48420abb390678c34821caa07d924941cc4" +dependencies = [ + "tokio", +] + [[package]] name = "tempfile" version = "3.3.0" @@ -1216,6 +1451,41 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16" + +[[package]] +name = "thiserror" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c53f98874615aea268107765aa1ed8f6116782501d18e53d08b471733bea6c85" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8b463991b4eab2d801e724172285ec4195c650e8ec79b149e6c2a8e6dd3f783" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.14" @@ -1266,9 +1536,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.0" @@ -1308,9 +1590,21 @@ dependencies = [ "cfg-if", "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.29" @@ -1332,6 +1626,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.8" @@ -1484,6 +1787,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -1546,7 +1858,13 @@ dependencies = [ name = "zoidberg_client" version = "0.1.0" dependencies = [ + "clap", "reqwest", + "reqwest-middleware", + "reqwest-retry", + "retry", + "serde_json", + "tokio", "zoidberg_lib", ] @@ -1562,6 +1880,8 @@ name = "zoidberg_server" version = "0.1.0" dependencies = [ "actix-web", + "clap", + "serde_json", "zoidberg_lib", ] diff --git a/snakemake/Snakefile b/snakemake/Snakefile new file mode 100644 index 0000000..4f584ad --- /dev/null +++ b/snakemake/Snakefile @@ -0,0 +1,12 @@ +rule collect: + input: + [f"{i}.txt" for i in range(100)], + run: + shell("rm {input}") + + +rule create: + output: + "{i}.txt", + run: + shell("touch {output}") diff --git a/snakemake/zoidberg/config.yaml b/snakemake/zoidberg/config.yaml new file mode 100644 index 0000000..8118946 --- /dev/null +++ b/snakemake/zoidberg/config.yaml @@ -0,0 +1,9 @@ +jobscript: "grid-jobscript.sh" +cluster: "grid-submit.py" +cluster-status: "grid-status.py" +max-jobs-per-second: 100 +max-status-checks-per-second: 100 +restart-times: 5 +local-cores: 10 +jobs: 5000 +verbose: false diff --git a/snakemake/zoidberg/grid-jobscript.sh b/snakemake/zoidberg/grid-jobscript.sh new file mode 100755 index 0000000..558d853 --- /dev/null +++ b/snakemake/zoidberg/grid-jobscript.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# properties = {properties} + +set -e + +echo "hostname:" +hostname -f + +{exec_job} diff --git a/snakemake/zoidberg/grid-status.py b/snakemake/zoidberg/grid-status.py new file mode 100755 index 0000000..758a544 --- /dev/null +++ b/snakemake/zoidberg/grid-status.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +import sys +import requests + +resp = requests.post("http://localhost:8080/status", json=[{"id": int(sys.argv[1])}]) + +translation = { + "Submitted": "running", + "Completed": "success", + "Failed": "failed", +} + +print(translation[resp.json()[0]["status"]]) diff --git a/snakemake/zoidberg/grid-submit.py b/snakemake/zoidberg/grid-submit.py new file mode 100755 index 0000000..349673f --- /dev/null +++ b/snakemake/zoidberg/grid-submit.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 + +import sys +import requests + +jobscript = sys.argv[1] + +resp = requests.post( + "http://localhost:8080/submit", + json=[ + {"cmd": jobscript}, + ], +) +assert resp.ok, "http request failed" + +print(resp.json()[0]["id"]) diff --git a/zoidberg_client/Cargo.toml b/zoidberg_client/Cargo.toml index 5dc313a..5d48d6e 100644 --- a/zoidberg_client/Cargo.toml +++ b/zoidberg_client/Cargo.toml @@ -10,4 +10,11 @@ path = "../zoidberg_lib" version = "0.1.0" [dependencies] -reqwest = { version = "0.11", features = ["blocking", "json"] } +# reqwest = { version = "0.11", features = ["blocking", "json"] } +reqwest = { version = "0.11"} +serde_json = "1.0.85" +retry = "2.0.0" +reqwest-retry = "0.1.5" +reqwest-middleware = "0.1.6" +tokio = { version = "1", features = ["full"] } +clap = "3.2.22" diff --git a/zoidberg_client/src/main.rs b/zoidberg_client/src/main.rs index 8059a1c..8308fe3 100644 --- a/zoidberg_client/src/main.rs +++ b/zoidberg_client/src/main.rs @@ -1,8 +1,11 @@ -use core::time::Duration; -use reqwest::blocking::Client; -use reqwest::header; +use clap::{App, Arg}; +use reqwest::{header, Client, ClientBuilder}; +use std::error::Error; +use std::process::Command; +use std::time::Duration; +use std::{thread, time}; -use zoidberg_lib::types::Update; +use zoidberg_lib::types::{Job, RegisterResponse, Status, Update}; fn build_client(secret: &str) -> Client { let cookie = format!("secret={}", secret); @@ -14,53 +17,122 @@ fn build_client(secret: &str) -> Client { .unwrap_or_else(|_| panic!("invalid header value {}", &cookie)), ); - Client::builder() - .default_headers(headers) + ClientBuilder::new() .timeout(Duration::from_secs(15)) + .default_headers(headers) .build() - .expect("Could not create HTTP client") + .expect("Could not create client") } -fn main() { - // test get request to index - let res = build_client("some_secret") - .get("http://localhost:8080/") - .send() - .expect("Could not send get request"); +#[derive(Debug)] +struct Worker { + id: i32, +} - println!("Status: {}", res.status()); - println!("Headers:\n{:#?}", res.headers()); - - let body = res.text().unwrap(); - println!("Body:\n{}", body); - - (0..10).for_each(|_| { - // test get request to /register +impl Worker { + async fn new(server: &str) -> Result> { let res = build_client("some_secret") - .get("http://localhost:8080/register") + .get(format!("{}/register", server)) .send() - .expect("Could not send get request"); + .await?; + + let body = res.text().await?; + let r: RegisterResponse = serde_json::from_str(&body)?; + println!("registered worker with id: {}", &r.id); + Ok(Worker { id: r.id }) + } + + async fn update(self: &Self, jobs: &[Job]) -> Result<(), Box> { + let updates: Vec = jobs + .iter() + .map(|job| Update { + worker: self.id, + job: job.id, + status: job.status.clone(), + }) + .collect(); + + let body = build_client("some_secret") + .post("http://localhost:8080/update") + .json(&updates) + .send() + .await? + .text() + .await?; - println!("Status: {}", res.status()); - println!("Headers:\n{:#?}", res.headers()); - let body = res.text().unwrap(); println!("Body:\n{}", body); - }); + Ok(()) + } - // test post request to /update - let update = Update { - id: 99, - status: "hi".to_string(), - }; + async fn fetch(self: &Self) -> Result, Box> { + let res = build_client("some_secret") + .get("http://localhost:8080/fetch") + .send() + .await?; + let body = res.text().await?; + let jobs: Vec = serde_json::from_str(&body)?; + Ok(jobs) + } - let res = build_client("some_secret") - .post("http://localhost:8080/update") - .json(&update) - .send() - .expect("Could not send get request"); + async fn run(self: &Self, job: &Job) -> Result<(), Box> { + let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?; - println!("Status: {}", res.status()); - println!("Headers:\n{:#?}", res.headers()); - let body = res.text().unwrap(); - println!("Body:\n{}", body); + println!( + "command: {}\nstdout: {}\nstderr: {}", + &job.cmd, + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + match output.status.success() { + true => Ok(()), + false => Err(Box::from("Job failed")), + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + const VERSION: &str = env!("CARGO_PKG_VERSION"); + + let matches = App::new("Zoidberg client") + .version(VERSION) + .author("Johannes Heuel") + .arg( + Arg::with_name("server") + .takes_value(true) + .required(true) + .help("Set Zoidberg server address"), + ) + .get_matches(); + let server = matches.value_of("server").unwrap(); + + let client = Worker::new(server).await.expect("Could not create client"); + + let pause = time::Duration::from_secs(0); + let long_pause = time::Duration::from_secs(2); + let extra_long_pause = time::Duration::from_secs(4); + + loop { + if let Ok(jobs) = client.fetch().await { + // if there are no jobs, wait a little longer + if jobs.len() == 0 { + thread::sleep(long_pause); + } + + for job in jobs { + let status = match client.run(&job).await { + Ok(()) => Status::Completed, + Err(..) => Status::Failed, + }; + let n = &[Job { status, ..job }]; + if let Err(error) = client.update(n).await { + println!("Could not update job: {}", error); + } + } + } else { + // wait a little longer whenever job fetching fails + thread::sleep(extra_long_pause); + } + thread::sleep(pause); + } } diff --git a/zoidberg_lib/src/types.rs b/zoidberg_lib/src/types.rs index 0a930a2..9dc63fe 100644 --- a/zoidberg_lib/src/types.rs +++ b/zoidberg_lib/src/types.rs @@ -1,7 +1,64 @@ +use std::fmt; + use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct Update { - pub id: i64, - pub status: String, + pub worker: i32, + pub job: i32, + pub status: Status, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum Status { + Submitted, + Running, + Completed, + Failed, +} + +impl Status { + fn default() -> Self { + Status::Submitted + } +} + +impl fmt::Display for Status { + fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Status::Submitted => write!(f, "submitted"), + Status::Running => write!(f, "running"), + Status::Completed => write!(f, "completed"), + Status::Failed => write!(f, "failed"), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct StatusRequest { + pub id: i32, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Job { + #[serde(default)] + pub id: i32, + pub cmd: String, + #[serde(default = "Status::default")] + pub status: Status, +} + +#[derive(Serialize, Deserialize)] +pub struct Node { + pub id: i32, +} + +#[derive(Serialize, Deserialize)] +pub struct RegisterResponse { + pub id: i32, +} + +#[derive(Serialize, Deserialize)] +pub struct Submit { + pub cmd: String, } diff --git a/zoidberg_server/Cargo.toml b/zoidberg_server/Cargo.toml index 58ead08..b0258e4 100644 --- a/zoidberg_server/Cargo.toml +++ b/zoidberg_server/Cargo.toml @@ -11,3 +11,5 @@ version = "0.1.0" [dependencies] actix-web = "4" +serde_json = "1.0.85" +clap = "3.2.22" diff --git a/zoidberg_server/src/main.rs b/zoidberg_server/src/main.rs index 6402c26..a6a517f 100644 --- a/zoidberg_server/src/main.rs +++ b/zoidberg_server/src/main.rs @@ -1,49 +1,297 @@ use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result}; +use clap; use std::sync::Mutex; -use zoidberg_lib::types::Update; +use zoidberg_lib::types::{Job, RegisterResponse, StatusRequest, Update}; struct State { counter: Mutex, -} - -#[get("/register")] -async fn register(data: web::Data) -> impl Responder { - let mut counter = data.counter.lock().unwrap(); - *counter += 1; - HttpResponse::Ok().body(format!("Worker node {} registered", *counter)) -} - -#[get("/fetch")] -async fn fetch() -> impl Responder { - HttpResponse::Ok().body("Here is some work") -} - -#[post("/update")] -async fn update(u: web::Json) -> Result { - Ok(format!("Job {} updated with status {}", u.id, u.status)) + jobcounter: Mutex, + workers: Mutex>, + jobs: Mutex>, + running_jobs: Mutex>, } #[get("/")] -async fn index() -> impl Responder { - HttpResponse::Ok().body("Hello world!") +async fn index(data: web::Data) -> impl Responder { + let workers = data.workers.lock().unwrap(); + let jobs = data.jobs.lock().unwrap(); + + let s: String = workers + .iter() + .map(|w| w.to_string()) + .collect::>() + .join("\n"); + + let s: String = s + &jobs + .iter() + .map(|j| serde_json::to_string(&j).unwrap()) + .collect::>() + .join("\n"); + HttpResponse::Ok().body(s) +} + +#[get("/register")] +async fn register(data: web::Data) -> Result { + let mut counter = data.counter.lock().unwrap(); + *counter += 1; + + let mut workers = data.workers.lock().unwrap(); + workers.push(*counter); + + println!("Registered worker node with id: {}", *counter); + Ok(web::Json(RegisterResponse { id: *counter })) +} + +#[get("/fetch")] +async fn fetch(data: web::Data) -> Result { + let mut jobs = data.jobs.lock().unwrap(); + if let Some(j) = jobs.pop() { + return Ok(web::Json(vec![j])); + } + Ok(web::Json(Vec::new())) +} + +#[post("/status")] +async fn status( + s: web::Json>, + data: web::Data, +) -> Result { + let running_jobs = data.running_jobs.lock().unwrap(); + let status_updates: Vec = running_jobs + .iter() + .filter(|r| { + s.iter().filter(|i| i.id == r.id).count() > 0 + }) + .cloned() + .collect(); + + Ok(web::Json(status_updates)) +} + +#[post("/update")] +async fn update(updates: web::Json>, data: web::Data) -> Result { + let mut running_jobs = data.running_jobs.lock().unwrap(); + let mut n = 0; + for update in updates.iter() { + println!( + "Worker {} updated job {} with status {}", + update.worker, update.job, update.status + ); + for i in 0..running_jobs.len() { + if running_jobs[i].id == update.job { + running_jobs[i].status = update.status.clone(); + } + } + n += 1; + } + Ok(format!("Worker updated {} job(s)", n)) +} + +#[post("/submit")] +async fn submit(data: web::Data, js: web::Json>) -> Result { + let mut jobs = data.jobs.lock().unwrap(); + let mut running_jobs = data.running_jobs.lock().unwrap(); + let mut jobcounter = data.jobcounter.lock().unwrap(); + let mut new_jobs = Vec::new(); + for j in js.into_inner() { + *jobcounter += 1; + let cmd = j.cmd.clone(); + println!("Job submitted with id: {}, cmd: {}", *jobcounter, cmd); + + new_jobs.push(Job { + id: *jobcounter, + ..j + }); + } + for job in new_jobs.iter() { + jobs.push(job.clone()); + running_jobs.push(job.clone()); + } + Ok(web::Json(new_jobs)) } #[actix_web::main] async fn main() -> std::io::Result<()> { - let counter = web::Data::new(State { + const VERSION: &str = env!("CARGO_PKG_VERSION"); + + let _matches = clap::App::new("Zoidberg server") + .version(VERSION) + .author("Johannes Heuel") + .get_matches(); + + let state = web::Data::new(State { counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(Vec::new()), + running_jobs: Mutex::new(Vec::new()), }); HttpServer::new(move || { App::new() - .app_data(counter.clone()) + .app_data(state.clone()) + .service(index) .service(register) .service(fetch) + .service(status) .service(update) - .service(index) + .service(submit) }) .bind(("127.0.0.1", 8080))? .run() .await } + +#[cfg(test)] +mod tests { + use super::*; + use actix_web::{http, test, web, App}; + use zoidberg_lib::types::Status; + + #[actix_web::test] + async fn test_index() { + let app = test::init_service( + App::new() + .app_data(web::Data::new(State { + counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(Vec::new()), + running_jobs: Mutex::new(Vec::new()), + })) + .service(index), + ) + .await; + let req = test::TestRequest::get().uri("/").to_request(); + let resp = test::call_service(&app, req).await; + assert_eq!(resp.status(), http::StatusCode::OK); + } + + #[actix_web::test] + async fn test_register() { + let app = test::init_service( + App::new() + .app_data(web::Data::new(State { + counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(Vec::new()), + running_jobs: Mutex::new(Vec::new()), + })) + .service(register), + ) + .await; + let req = test::TestRequest::get().uri("/register").to_request(); + let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await; + assert_eq!(resp.id, 1); + } + + #[actix_web::test] + async fn test_fetch() { + let cmd = String::from("hi"); + let app = test::init_service( + App::new() + .app_data(web::Data::new(State { + counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(vec![Job { + id: 0, + cmd: cmd.clone(), + status: Status::Submitted, + }]), + running_jobs: Mutex::new(Vec::new()), + })) + .service(fetch), + ) + .await; + let req = test::TestRequest::get().uri("/fetch").to_request(); + let resp: Vec = test::call_and_read_body_json(&app, req).await; + assert_eq!(resp[0].id, 0); + assert_eq!(resp[0].cmd, cmd); + + let req = test::TestRequest::get().uri("/fetch").to_request(); + let resp: Vec = test::call_and_read_body_json(&app, req).await; + assert_eq!(resp.len(), 0); + } + + #[actix_web::test] + async fn test_status() { + let cmd = String::from("hi"); + let app = test::init_service( + App::new() + .app_data(web::Data::new(State { + counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(vec![Job { + id: 0, + cmd: cmd.clone(), + status: Status::Running, + }]), + running_jobs: Mutex::new(Vec::new()), + })) + .service(status), + ) + .await; + let req = test::TestRequest::post() + .set_json(vec![StatusRequest { id: 0 }]) + .uri("/status") + .to_request(); + let resp: Vec = test::call_and_read_body_json(&app, req).await; + assert_eq!(resp[0].id, 0); + } + + #[actix_web::test] + async fn test_update() { + let app = test::init_service( + App::new() + .app_data(web::Data::new(State { + counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(Vec::new()), + running_jobs: Mutex::new(Vec::new()), + })) + .service(update), + ) + .await; + let req = test::TestRequest::post() + .set_json(vec![Update { + worker: 0, + job: 0, + status: Status::Running, + }]) + .uri("/update") + .to_request(); + let resp = test::call_service(&app, req).await; + assert_eq!(resp.status(), http::StatusCode::OK); + } + + #[actix_web::test] + async fn test_submit() { + let app = test::init_service( + App::new() + .app_data(web::Data::new(State { + counter: Mutex::new(0), + jobcounter: Mutex::new(0), + workers: Mutex::new(Vec::new()), + jobs: Mutex::new(Vec::new()), + running_jobs: Mutex::new(Vec::new()), + })) + .service(submit), + ) + .await; + let req = test::TestRequest::post() + .set_json(vec![Job { + id: 0, + cmd: String::from("hi"), + status: Status::Running, + }]) + .uri("/submit") + .to_request(); + let resp = test::call_service(&app, req).await; + assert_eq!(resp.status(), http::StatusCode::OK); + } +}