implement correct status handling and add snakemake profile

This commit is contained in:
Johannes Heuel
2022-09-17 18:55:37 +02:00
parent 6e925b731e
commit a4a84844fa
11 changed files with 833 additions and 67 deletions

320
Cargo.lock generated
View File

@@ -223,6 +223,43 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602"
[[package]]
name = "async-trait"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -307,6 +344,42 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
dependencies = [
"iana-time-zone",
"num-integer",
"num-traits",
"winapi",
]
[[package]]
name = "clap"
version = "3.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
dependencies = [
"atty",
"bitflags",
"clap_lex",
"indexmap",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "convert_case"
version = "0.4.0"
@@ -455,6 +528,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.24"
@@ -462,6 +550,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@@ -470,12 +559,34 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
[[package]]
name = "futures-executor"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
[[package]]
name = "futures-macro"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.24"
@@ -494,8 +605,11 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
@@ -629,6 +743,20 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a0714f28b1ee39ccec0770ccb544eb02c9ef2c82bb096230eefcffa6468b0"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"js-sys",
"once_cell",
"wasm-bindgen",
"winapi",
]
[[package]]
name = "idna"
version = "0.3.0"
@@ -755,6 +883,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.5.4"
@@ -794,6 +932,25 @@ dependencies = [
"tempfile",
]
[[package]]
name = "num-integer"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.1"
@@ -864,6 +1021,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "os_str_bytes"
version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "parking_lot"
version = "0.12.1"
@@ -1027,6 +1190,7 @@ dependencies = [
"lazy_static",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
@@ -1043,6 +1207,62 @@ dependencies = [
"winreg",
]
[[package]]
name = "reqwest-middleware"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69539cea4148dce683bec9dc95be3f0397a9bb2c248a49c8296a9d21659a8cdd"
dependencies = [
"anyhow",
"async-trait",
"futures",
"http",
"reqwest",
"serde",
"task-local-extensions",
"thiserror",
]
[[package]]
name = "reqwest-retry"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce246a729eaa6aff5e215aee42845bf5fed9893cc6cd51aeeb712f34e04dd9f3"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"http",
"hyper",
"reqwest",
"reqwest-middleware",
"retry-policies",
"task-local-extensions",
"tokio",
"tracing",
]
[[package]]
name = "retry"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9166d72162de3575f950507683fac47e30f6f2c3836b71b7fbc61aa517c9c5f4"
dependencies = [
"rand",
]
[[package]]
name = "retry-policies"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47f9e19b18c6cdd796cc70aea8a9ea5ee7b813be611c6589e3624fcdbfd05f9d"
dependencies = [
"anyhow",
"chrono",
"rand",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
@@ -1191,6 +1411,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.99"
@@ -1202,6 +1428,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "task-local-extensions"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4167afbec18ae012de40f8cf1b9bf48420abb390678c34821caa07d924941cc4"
dependencies = [
"tokio",
]
[[package]]
name = "tempfile"
version = "3.3.0"
@@ -1216,6 +1451,41 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949517c0cf1bf4ee812e2e07e08ab448e3ae0d23472aee8a06c985f0c8815b16"
[[package]]
name = "thiserror"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c53f98874615aea268107765aa1ed8f6116782501d18e53d08b471733bea6c85"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8b463991b4eab2d801e724172285ec4195c650e8ec79b149e6c2a8e6dd3f783"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.3.14"
@@ -1266,9 +1536,21 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
@@ -1308,9 +1590,21 @@ dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.29"
@@ -1332,6 +1626,15 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.8"
@@ -1484,6 +1787,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@@ -1546,7 +1858,13 @@ dependencies = [
name = "zoidberg_client"
version = "0.1.0"
dependencies = [
"clap",
"reqwest",
"reqwest-middleware",
"reqwest-retry",
"retry",
"serde_json",
"tokio",
"zoidberg_lib",
]
@@ -1562,6 +1880,8 @@ name = "zoidberg_server"
version = "0.1.0"
dependencies = [
"actix-web",
"clap",
"serde_json",
"zoidberg_lib",
]

12
snakemake/Snakefile Normal file
View File

@@ -0,0 +1,12 @@
rule collect:
input:
[f"{i}.txt" for i in range(100)],
run:
shell("rm {input}")
rule create:
output:
"{i}.txt",
run:
shell("touch {output}")

View File

@@ -0,0 +1,9 @@
jobscript: "grid-jobscript.sh"
cluster: "grid-submit.py"
cluster-status: "grid-status.py"
max-jobs-per-second: 100
max-status-checks-per-second: 100
restart-times: 5
local-cores: 10
jobs: 5000
verbose: false

View File

@@ -0,0 +1,9 @@
#!/bin/bash
# properties = {properties}
set -e
echo "hostname:"
hostname -f
{exec_job}

View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python
import sys
import requests
resp = requests.post("http://localhost:8080/status", json=[{"id": int(sys.argv[1])}])
translation = {
"Submitted": "running",
"Completed": "success",
"Failed": "failed",
}
print(translation[resp.json()[0]["status"]])

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
import sys
import requests
jobscript = sys.argv[1]
resp = requests.post(
"http://localhost:8080/submit",
json=[
{"cmd": jobscript},
],
)
assert resp.ok, "http request failed"
print(resp.json()[0]["id"])

View File

@@ -10,4 +10,11 @@ path = "../zoidberg_lib"
version = "0.1.0"
[dependencies]
reqwest = { version = "0.11", features = ["blocking", "json"] }
# reqwest = { version = "0.11", features = ["blocking", "json"] }
reqwest = { version = "0.11"}
serde_json = "1.0.85"
retry = "2.0.0"
reqwest-retry = "0.1.5"
reqwest-middleware = "0.1.6"
tokio = { version = "1", features = ["full"] }
clap = "3.2.22"

View File

@@ -1,8 +1,11 @@
use core::time::Duration;
use reqwest::blocking::Client;
use reqwest::header;
use clap::{App, Arg};
use reqwest::{header, Client, ClientBuilder};
use std::error::Error;
use std::process::Command;
use std::time::Duration;
use std::{thread, time};
use zoidberg_lib::types::Update;
use zoidberg_lib::types::{Job, RegisterResponse, Status, Update};
fn build_client(secret: &str) -> Client {
let cookie = format!("secret={}", secret);
@@ -14,53 +17,122 @@ fn build_client(secret: &str) -> Client {
.unwrap_or_else(|_| panic!("invalid header value {}", &cookie)),
);
Client::builder()
.default_headers(headers)
ClientBuilder::new()
.timeout(Duration::from_secs(15))
.default_headers(headers)
.build()
.expect("Could not create HTTP client")
.expect("Could not create client")
}
fn main() {
// test get request to index
let res = build_client("some_secret")
.get("http://localhost:8080/")
.send()
.expect("Could not send get request");
#[derive(Debug)]
struct Worker {
id: i32,
}
println!("Status: {}", res.status());
println!("Headers:\n{:#?}", res.headers());
let body = res.text().unwrap();
println!("Body:\n{}", body);
(0..10).for_each(|_| {
// test get request to /register
impl Worker {
async fn new(server: &str) -> Result<Worker, Box<dyn Error>> {
let res = build_client("some_secret")
.get("http://localhost:8080/register")
.get(format!("{}/register", server))
.send()
.expect("Could not send get request");
.await?;
let body = res.text().await?;
let r: RegisterResponse = serde_json::from_str(&body)?;
println!("registered worker with id: {}", &r.id);
Ok(Worker { id: r.id })
}
async fn update(self: &Self, jobs: &[Job]) -> Result<(), Box<dyn Error>> {
let updates: Vec<Update> = jobs
.iter()
.map(|job| Update {
worker: self.id,
job: job.id,
status: job.status.clone(),
})
.collect();
let body = build_client("some_secret")
.post("http://localhost:8080/update")
.json(&updates)
.send()
.await?
.text()
.await?;
println!("Status: {}", res.status());
println!("Headers:\n{:#?}", res.headers());
let body = res.text().unwrap();
println!("Body:\n{}", body);
});
Ok(())
}
// test post request to /update
let update = Update {
id: 99,
status: "hi".to_string(),
};
async fn fetch(self: &Self) -> Result<Vec<Job>, Box<dyn Error>> {
let res = build_client("some_secret")
.get("http://localhost:8080/fetch")
.send()
.await?;
let body = res.text().await?;
let jobs: Vec<Job> = serde_json::from_str(&body)?;
Ok(jobs)
}
let res = build_client("some_secret")
.post("http://localhost:8080/update")
.json(&update)
.send()
.expect("Could not send get request");
async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> {
let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?;
println!("Status: {}", res.status());
println!("Headers:\n{:#?}", res.headers());
let body = res.text().unwrap();
println!("Body:\n{}", body);
println!(
"command: {}\nstdout: {}\nstderr: {}",
&job.cmd,
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
match output.status.success() {
true => Ok(()),
false => Err(Box::from("Job failed")),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
const VERSION: &str = env!("CARGO_PKG_VERSION");
let matches = App::new("Zoidberg client")
.version(VERSION)
.author("Johannes Heuel")
.arg(
Arg::with_name("server")
.takes_value(true)
.required(true)
.help("Set Zoidberg server address"),
)
.get_matches();
let server = matches.value_of("server").unwrap();
let client = Worker::new(server).await.expect("Could not create client");
let pause = time::Duration::from_secs(0);
let long_pause = time::Duration::from_secs(2);
let extra_long_pause = time::Duration::from_secs(4);
loop {
if let Ok(jobs) = client.fetch().await {
// if there are no jobs, wait a little longer
if jobs.len() == 0 {
thread::sleep(long_pause);
}
for job in jobs {
let status = match client.run(&job).await {
Ok(()) => Status::Completed,
Err(..) => Status::Failed,
};
let n = &[Job { status, ..job }];
if let Err(error) = client.update(n).await {
println!("Could not update job: {}", error);
}
}
} else {
// wait a little longer whenever job fetching fails
thread::sleep(extra_long_pause);
}
thread::sleep(pause);
}
}

View File

@@ -1,7 +1,64 @@
use std::fmt;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct Update {
pub id: i64,
pub status: String,
pub worker: i32,
pub job: i32,
pub status: Status,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Status {
Submitted,
Running,
Completed,
Failed,
}
impl Status {
fn default() -> Self {
Status::Submitted
}
}
impl fmt::Display for Status {
fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Status::Submitted => write!(f, "submitted"),
Status::Running => write!(f, "running"),
Status::Completed => write!(f, "completed"),
Status::Failed => write!(f, "failed"),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct StatusRequest {
pub id: i32,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Job {
#[serde(default)]
pub id: i32,
pub cmd: String,
#[serde(default = "Status::default")]
pub status: Status,
}
#[derive(Serialize, Deserialize)]
pub struct Node {
pub id: i32,
}
#[derive(Serialize, Deserialize)]
pub struct RegisterResponse {
pub id: i32,
}
#[derive(Serialize, Deserialize)]
pub struct Submit {
pub cmd: String,
}

View File

@@ -11,3 +11,5 @@ version = "0.1.0"
[dependencies]
actix-web = "4"
serde_json = "1.0.85"
clap = "3.2.22"

View File

@@ -1,49 +1,297 @@
use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder, Result};
use clap;
use std::sync::Mutex;
use zoidberg_lib::types::Update;
use zoidberg_lib::types::{Job, RegisterResponse, StatusRequest, Update};
struct State {
counter: Mutex<i32>,
}
#[get("/register")]
async fn register(data: web::Data<State>) -> impl Responder {
let mut counter = data.counter.lock().unwrap();
*counter += 1;
HttpResponse::Ok().body(format!("Worker node {} registered", *counter))
}
#[get("/fetch")]
async fn fetch() -> impl Responder {
HttpResponse::Ok().body("Here is some work")
}
#[post("/update")]
async fn update(u: web::Json<Update>) -> Result<String> {
Ok(format!("Job {} updated with status {}", u.id, u.status))
jobcounter: Mutex<i32>,
workers: Mutex<Vec<i32>>,
jobs: Mutex<Vec<Job>>,
running_jobs: Mutex<Vec<Job>>,
}
#[get("/")]
async fn index() -> impl Responder {
HttpResponse::Ok().body("Hello world!")
async fn index(data: web::Data<State>) -> impl Responder {
let workers = data.workers.lock().unwrap();
let jobs = data.jobs.lock().unwrap();
let s: String = workers
.iter()
.map(|w| w.to_string())
.collect::<Vec<String>>()
.join("\n");
let s: String = s + &jobs
.iter()
.map(|j| serde_json::to_string(&j).unwrap())
.collect::<Vec<String>>()
.join("\n");
HttpResponse::Ok().body(s)
}
#[get("/register")]
async fn register(data: web::Data<State>) -> Result<impl Responder> {
let mut counter = data.counter.lock().unwrap();
*counter += 1;
let mut workers = data.workers.lock().unwrap();
workers.push(*counter);
println!("Registered worker node with id: {}", *counter);
Ok(web::Json(RegisterResponse { id: *counter }))
}
#[get("/fetch")]
async fn fetch(data: web::Data<State>) -> Result<impl Responder> {
let mut jobs = data.jobs.lock().unwrap();
if let Some(j) = jobs.pop() {
return Ok(web::Json(vec![j]));
}
Ok(web::Json(Vec::new()))
}
#[post("/status")]
async fn status(
s: web::Json<Vec<StatusRequest>>,
data: web::Data<State>,
) -> Result<impl Responder> {
let running_jobs = data.running_jobs.lock().unwrap();
let status_updates: Vec<Job> = running_jobs
.iter()
.filter(|r| {
s.iter().filter(|i| i.id == r.id).count() > 0
})
.cloned()
.collect();
Ok(web::Json(status_updates))
}
#[post("/update")]
async fn update(updates: web::Json<Vec<Update>>, data: web::Data<State>) -> Result<String> {
let mut running_jobs = data.running_jobs.lock().unwrap();
let mut n = 0;
for update in updates.iter() {
println!(
"Worker {} updated job {} with status {}",
update.worker, update.job, update.status
);
for i in 0..running_jobs.len() {
if running_jobs[i].id == update.job {
running_jobs[i].status = update.status.clone();
}
}
n += 1;
}
Ok(format!("Worker updated {} job(s)", n))
}
#[post("/submit")]
async fn submit(data: web::Data<State>, js: web::Json<Vec<Job>>) -> Result<impl Responder> {
let mut jobs = data.jobs.lock().unwrap();
let mut running_jobs = data.running_jobs.lock().unwrap();
let mut jobcounter = data.jobcounter.lock().unwrap();
let mut new_jobs = Vec::new();
for j in js.into_inner() {
*jobcounter += 1;
let cmd = j.cmd.clone();
println!("Job submitted with id: {}, cmd: {}", *jobcounter, cmd);
new_jobs.push(Job {
id: *jobcounter,
..j
});
}
for job in new_jobs.iter() {
jobs.push(job.clone());
running_jobs.push(job.clone());
}
Ok(web::Json(new_jobs))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let counter = web::Data::new(State {
const VERSION: &str = env!("CARGO_PKG_VERSION");
let _matches = clap::App::new("Zoidberg server")
.version(VERSION)
.author("Johannes Heuel")
.get_matches();
let state = web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
});
HttpServer::new(move || {
App::new()
.app_data(counter.clone())
.app_data(state.clone())
.service(index)
.service(register)
.service(fetch)
.service(status)
.service(update)
.service(index)
.service(submit)
})
.bind(("127.0.0.1", 8080))?
.run()
.await
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::{http, test, web, App};
use zoidberg_lib::types::Status;
#[actix_web::test]
async fn test_index() {
let app = test::init_service(
App::new()
.app_data(web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
}))
.service(index),
)
.await;
let req = test::TestRequest::get().uri("/").to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), http::StatusCode::OK);
}
#[actix_web::test]
async fn test_register() {
let app = test::init_service(
App::new()
.app_data(web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
}))
.service(register),
)
.await;
let req = test::TestRequest::get().uri("/register").to_request();
let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp.id, 1);
}
#[actix_web::test]
async fn test_fetch() {
let cmd = String::from("hi");
let app = test::init_service(
App::new()
.app_data(web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(vec![Job {
id: 0,
cmd: cmd.clone(),
status: Status::Submitted,
}]),
running_jobs: Mutex::new(Vec::new()),
}))
.service(fetch),
)
.await;
let req = test::TestRequest::get().uri("/fetch").to_request();
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp[0].id, 0);
assert_eq!(resp[0].cmd, cmd);
let req = test::TestRequest::get().uri("/fetch").to_request();
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp.len(), 0);
}
#[actix_web::test]
async fn test_status() {
let cmd = String::from("hi");
let app = test::init_service(
App::new()
.app_data(web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(vec![Job {
id: 0,
cmd: cmd.clone(),
status: Status::Running,
}]),
running_jobs: Mutex::new(Vec::new()),
}))
.service(status),
)
.await;
let req = test::TestRequest::post()
.set_json(vec![StatusRequest { id: 0 }])
.uri("/status")
.to_request();
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
assert_eq!(resp[0].id, 0);
}
#[actix_web::test]
async fn test_update() {
let app = test::init_service(
App::new()
.app_data(web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
}))
.service(update),
)
.await;
let req = test::TestRequest::post()
.set_json(vec![Update {
worker: 0,
job: 0,
status: Status::Running,
}])
.uri("/update")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), http::StatusCode::OK);
}
#[actix_web::test]
async fn test_submit() {
let app = test::init_service(
App::new()
.app_data(web::Data::new(State {
counter: Mutex::new(0),
jobcounter: Mutex::new(0),
workers: Mutex::new(Vec::new()),
jobs: Mutex::new(Vec::new()),
running_jobs: Mutex::new(Vec::new()),
}))
.service(submit),
)
.await;
let req = test::TestRequest::post()
.set_json(vec![Job {
id: 0,
cmd: String::from("hi"),
status: Status::Running,
}])
.uri("/submit")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), http::StatusCode::OK);
}
}