Compare commits
11 Commits
9f2d06b1bb
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
6707e7e403
|
|||
|
9971b46e9e
|
|||
|
302e47c342
|
|||
|
|
461cdb2cc7
|
||
|
|
126145ede7
|
||
|
|
ea30f57ea9
|
||
|
|
71098b44e6
|
||
|
|
949801fc4d
|
||
|
|
32bd7d5b62
|
||
|
|
6fd1cda2a2
|
||
|
|
60df9aca1a
|
41
Cargo.lock
generated
41
Cargo.lock
generated
@@ -166,7 +166,7 @@ dependencies = [
|
|||||||
"serde_urlencoded",
|
"serde_urlencoded",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"socket2",
|
"socket2",
|
||||||
"time",
|
"time 0.3.14",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -351,8 +351,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
|
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"iana-time-zone",
|
"iana-time-zone",
|
||||||
|
"js-sys",
|
||||||
"num-integer",
|
"num-integer",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
|
"time 0.1.44",
|
||||||
|
"wasm-bindgen",
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -393,7 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05"
|
checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"time",
|
"time 0.3.14",
|
||||||
"version_check",
|
"version_check",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -648,7 +651,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"libc",
|
"libc",
|
||||||
"wasi",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -929,7 +932,7 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
"wasi",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
"windows-sys",
|
"windows-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -1505,6 +1508,17 @@ dependencies = [
|
|||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "time"
|
||||||
|
version = "0.1.44"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"wasi 0.10.0+wasi-snapshot-preview1",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.3.14"
|
version = "0.3.14"
|
||||||
@@ -1686,6 +1700,15 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "uuid"
|
||||||
|
version = "1.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vcpkg"
|
name = "vcpkg"
|
||||||
version = "0.2.15"
|
version = "0.2.15"
|
||||||
@@ -1708,6 +1731,12 @@ dependencies = [
|
|||||||
"try-lock",
|
"try-lock",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.10.0+wasi-snapshot-preview1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasi"
|
name = "wasi"
|
||||||
version = "0.11.0+wasi-snapshot-preview1"
|
version = "0.11.0+wasi-snapshot-preview1"
|
||||||
@@ -1879,6 +1908,7 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
"futures",
|
||||||
"log",
|
"log",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"reqwest-middleware",
|
"reqwest-middleware",
|
||||||
@@ -1901,11 +1931,14 @@ name = "zoidberg_server"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-web",
|
"actix-web",
|
||||||
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"futures",
|
"futures",
|
||||||
"log",
|
"log",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"tokio",
|
||||||
|
"uuid",
|
||||||
"zoidberg_lib",
|
"zoidberg_lib",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
9
backends/htcondor/run.sh
Normal file → Executable file
9
backends/htcondor/run.sh
Normal file → Executable file
@@ -1,9 +1,8 @@
|
|||||||
#!/usr/bin/bash
|
#!/usr/bin/bash
|
||||||
|
|
||||||
# apparently this vodoo kills all processes opened in this script
|
# apparently this vodoo kills all processes opened in this script
|
||||||
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
|
# trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
|
||||||
|
# ssh -N -L 8080:localhost:8080 lxplus7103 &
|
||||||
|
# sleep 10
|
||||||
|
|
||||||
ssh -N -L 8080:localhost:8080 lxplus7103 &
|
/home/home4/institut_1b/jheuel/repositories/zoidberg/target/release/zoidberg_client
|
||||||
|
|
||||||
sleep 10
|
|
||||||
/afs/cern.ch/work/j/jheuel/zoidberg/target/debug/zoidberg_client http://localhost:8080
|
|
||||||
|
|||||||
@@ -6,9 +6,10 @@ error = output/stderr.$(Process)
|
|||||||
log = output/log.$(Process)
|
log = output/log.$(Process)
|
||||||
|
|
||||||
request_cpus = 1
|
request_cpus = 1
|
||||||
#request_memory = 1024
|
request_memory = 4096
|
||||||
#request_disk = 10240
|
#request_disk = 10240
|
||||||
|
|
||||||
|
getenv = True
|
||||||
should_transfer_files = no
|
should_transfer_files = no
|
||||||
|
|
||||||
queue 150
|
queue 20
|
||||||
|
|||||||
@@ -2,8 +2,17 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import requests
|
import requests
|
||||||
|
from os import environ
|
||||||
|
|
||||||
resp = requests.post("http://localhost:8080/status", json=[{"id": int(sys.argv[1])}])
|
def print_and_exit(s):
|
||||||
|
print(s)
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
resp = requests.post(
|
||||||
|
"http://localhost:8080/status",
|
||||||
|
json=[{"id": int(sys.argv[1])}],
|
||||||
|
headers={"cookie": environ["ZOIDBERG_SECRET"]},
|
||||||
|
)
|
||||||
|
|
||||||
translation = {
|
translation = {
|
||||||
"Submitted": "running",
|
"Submitted": "running",
|
||||||
@@ -11,4 +20,12 @@ translation = {
|
|||||||
"Failed": "failed",
|
"Failed": "failed",
|
||||||
}
|
}
|
||||||
|
|
||||||
print(translation[resp.json()[0]["status"]])
|
j = resp.json()
|
||||||
|
|
||||||
|
if len(j) == 0:
|
||||||
|
print_and_exit("failed")
|
||||||
|
|
||||||
|
if "Running" in j[0]["status"]:
|
||||||
|
print_and_exit("running")
|
||||||
|
|
||||||
|
print_and_exit(translation[resp.json()[0]["status"]])
|
||||||
|
|||||||
@@ -2,14 +2,25 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import requests
|
import requests
|
||||||
|
from os import environ
|
||||||
|
|
||||||
|
from snakemake.utils import read_job_properties
|
||||||
|
|
||||||
jobscript = sys.argv[1]
|
jobscript = sys.argv[1]
|
||||||
|
job_properties = read_job_properties(jobscript)
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"cmd": jobscript
|
||||||
|
}
|
||||||
|
|
||||||
|
payload["threads"] = job_properties.get("threads", 1)
|
||||||
|
|
||||||
resp = requests.post(
|
resp = requests.post(
|
||||||
"http://localhost:8080/submit",
|
"http://localhost:8080/submit",
|
||||||
json=[
|
json=[
|
||||||
{"cmd": jobscript},
|
payload,
|
||||||
],
|
],
|
||||||
|
headers={"cookie": environ["ZOIDBERG_SECRET"]},
|
||||||
)
|
)
|
||||||
assert resp.ok, "http request failed"
|
assert resp.ok, "http request failed"
|
||||||
|
|
||||||
|
|||||||
@@ -20,3 +20,4 @@ tokio = { version = "1", features = ["full"] }
|
|||||||
clap = "3.2.22"
|
clap = "3.2.22"
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
futures = "0.3.24"
|
||||||
|
|||||||
@@ -1,18 +1,22 @@
|
|||||||
use clap::{App, Arg};
|
use clap::{arg, value_parser, App, Arg};
|
||||||
use env_logger::Env;
|
use env_logger::Env;
|
||||||
|
use futures::future::{AbortHandle, Abortable};
|
||||||
use log;
|
use log;
|
||||||
use reqwest::{header, Client, ClientBuilder};
|
use reqwest::{header, Client, ClientBuilder};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::process::Command;
|
use std::process::Stdio;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{thread, time};
|
use tokio::{process::Command, time};
|
||||||
|
|
||||||
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, Status, Update};
|
use zoidberg_lib::types::{
|
||||||
|
FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, Update,
|
||||||
|
};
|
||||||
|
|
||||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
|
|
||||||
fn build_client(secret: &str) -> Client {
|
fn build_client(secret: &str) -> Client {
|
||||||
let cookie = format!("{}", secret);
|
let cookie = secret.to_string();
|
||||||
|
|
||||||
let mut headers = header::HeaderMap::new();
|
let mut headers = header::HeaderMap::new();
|
||||||
headers.insert(
|
headers.insert(
|
||||||
@@ -28,14 +32,16 @@ fn build_client(secret: &str) -> Client {
|
|||||||
.expect("Could not create client")
|
.expect("Could not create client")
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
struct Worker {
|
struct Worker {
|
||||||
id: i32,
|
id: String,
|
||||||
secret: String,
|
secret: String,
|
||||||
|
server: String,
|
||||||
|
threads: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Worker {
|
impl Worker {
|
||||||
async fn new(server: &str, secret: &str) -> Result<Worker, Box<dyn Error>> {
|
async fn new(server: &str, secret: &str, threads: i32) -> Result<Worker, Box<dyn Error>> {
|
||||||
let res = build_client(secret)
|
let res = build_client(secret)
|
||||||
.get(format!("{}/register", server))
|
.get(format!("{}/register", server))
|
||||||
.send()
|
.send()
|
||||||
@@ -47,21 +53,23 @@ impl Worker {
|
|||||||
Ok(Worker {
|
Ok(Worker {
|
||||||
id: r.id,
|
id: r.id,
|
||||||
secret: secret.to_string(),
|
secret: secret.to_string(),
|
||||||
|
server: server.to_string(),
|
||||||
|
threads,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update(self: &Self, jobs: &[Job]) -> Result<(), Box<dyn Error>> {
|
async fn update(&self, jobs: &[Job]) -> Result<(), Box<dyn Error>> {
|
||||||
let updates: Vec<Update> = jobs
|
let updates: Vec<Update> = jobs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|job| Update {
|
.map(|job| Update {
|
||||||
worker: self.id,
|
worker: self.id.clone(),
|
||||||
job: job.id,
|
job: job.id,
|
||||||
status: job.status.clone(),
|
status: job.status.clone(),
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let body = build_client(&self.secret)
|
let body = build_client(&self.secret)
|
||||||
.post("http://localhost:8080/update")
|
.post(format!("{}/update", self.server))
|
||||||
.json(&updates)
|
.json(&updates)
|
||||||
.send()
|
.send()
|
||||||
.await?
|
.await?
|
||||||
@@ -72,9 +80,13 @@ impl Worker {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch(self: &Self) -> Result<FetchResponse, Box<dyn Error>> {
|
async fn fetch(&self) -> Result<FetchResponse, Box<dyn Error>> {
|
||||||
let res = build_client(&self.secret)
|
let res = build_client(&self.secret)
|
||||||
.get("http://localhost:8080/fetch")
|
.post(format!("{}/fetch", self.server))
|
||||||
|
.json(&FetchRequest {
|
||||||
|
worker_id: self.id.clone(),
|
||||||
|
threads: self.threads,
|
||||||
|
})
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
let body = res.text().await?;
|
let body = res.text().await?;
|
||||||
@@ -82,35 +94,35 @@ impl Worker {
|
|||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(self: &Self, job: &Job) -> Result<(), Box<dyn Error>> {
|
async fn heartbeat(&self) {
|
||||||
let output = Command::new("bash").arg("-c").arg(&job.cmd).output()?;
|
let _ = build_client(&self.secret)
|
||||||
|
.post(format!("{}/heartbeat", self.server))
|
||||||
log::info!(
|
.json(&Heartbeat {
|
||||||
"command: {}\nstdout: {}\nstderr: {}",
|
id: self.id.clone(),
|
||||||
&job.cmd,
|
})
|
||||||
String::from_utf8_lossy(&output.stdout),
|
.send()
|
||||||
String::from_utf8_lossy(&output.stderr)
|
.await;
|
||||||
);
|
|
||||||
match output.status.success() {
|
|
||||||
true => Ok(()),
|
|
||||||
false => Err(Box::from("Job failed")),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn process(self: &Self, jobs: &[Job]) {
|
async fn run(job: &Job) -> Result<(), Box<dyn Error>> {
|
||||||
for job in jobs {
|
let output = Command::new("bash")
|
||||||
let status = match self.run(&job).await {
|
.arg("-c")
|
||||||
Ok(()) => Status::Completed,
|
.arg(&job.cmd)
|
||||||
Err(..) => Status::Failed,
|
.stdout(Stdio::piped())
|
||||||
};
|
.stderr(Stdio::piped())
|
||||||
let n = &[Job {
|
.output();
|
||||||
status,
|
let output = output.await?;
|
||||||
..job.clone()
|
|
||||||
}];
|
log::info!(
|
||||||
if let Err(error) = self.update(n).await {
|
"command: {}\nstdout: {}\nstderr: {}",
|
||||||
log::info!("Could not update job: {}", error);
|
&job.cmd,
|
||||||
}
|
String::from_utf8_lossy(&output.stdout),
|
||||||
}
|
String::from_utf8_lossy(&output.stderr)
|
||||||
|
);
|
||||||
|
match output.status.success() {
|
||||||
|
true => Ok(()),
|
||||||
|
false => Err(Box::from("Job failed")),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,37 +132,94 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
|
|
||||||
let matches = App::new("Zoidberg client")
|
let matches = App::new("Zoidberg client")
|
||||||
.version(VERSION)
|
.version(VERSION)
|
||||||
.author("Johannes Heuel")
|
.author("by Johannes Heuel")
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("server")
|
Arg::with_name("server")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.required(true)
|
|
||||||
.help("Set Zoidberg server address"),
|
.help("Set Zoidberg server address"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
arg!(-j --threads <VALUE> "Sets number of threads")
|
||||||
|
.required(false)
|
||||||
|
.value_parser(value_parser!(i32)),
|
||||||
|
)
|
||||||
.get_matches();
|
.get_matches();
|
||||||
let server = matches.value_of("server").unwrap();
|
let threads: i32 = if let Some(t) = matches.get_one::<i32>("threads") {
|
||||||
|
*t
|
||||||
|
} else {
|
||||||
|
1
|
||||||
|
};
|
||||||
|
|
||||||
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
|
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
|
||||||
println!("Please set the $ZOIDBERG_SECRET environment variable");
|
eprintln!("Please set the $ZOIDBERG_SECRET environment variable");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let client = Worker::new(server, &secret)
|
let server = std::env::var("ZOIDBERG_SERVER")
|
||||||
.await
|
.unwrap_or_else(|_| String::from(matches.value_of("server").unwrap()));
|
||||||
.expect("Could not create client");
|
|
||||||
|
let client = Arc::new(
|
||||||
|
Worker::new(&server, &secret, threads)
|
||||||
|
.await
|
||||||
|
.expect("Could not create client"),
|
||||||
|
);
|
||||||
|
|
||||||
let pause = time::Duration::from_secs(1);
|
let pause = time::Duration::from_secs(1);
|
||||||
let long_pause = time::Duration::from_secs(20);
|
let long_pause = time::Duration::from_secs(40);
|
||||||
|
let heartbeat_pause = time::Duration::from_secs(30);
|
||||||
|
|
||||||
|
let (heartbeat_handle, abort_registration) = AbortHandle::new_pair();
|
||||||
|
let c = Arc::clone(&client);
|
||||||
|
tokio::spawn(Abortable::new(
|
||||||
|
async move {
|
||||||
|
loop {
|
||||||
|
time::sleep(heartbeat_pause).await;
|
||||||
|
c.heartbeat().await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
abort_registration,
|
||||||
|
));
|
||||||
|
|
||||||
|
let mut fail_counter = 0;
|
||||||
loop {
|
loop {
|
||||||
if let Ok(fetch) = client.fetch().await {
|
let jobs = if let Ok(fetch) = client.fetch().await {
|
||||||
|
fail_counter = 0;
|
||||||
match fetch {
|
match fetch {
|
||||||
FetchResponse::Nop => thread::sleep(pause),
|
FetchResponse::Nop => {
|
||||||
FetchResponse::StopWorking => break,
|
time::sleep(pause).await;
|
||||||
FetchResponse::Jobs(jobs) => client.process(&jobs).await,
|
continue;
|
||||||
|
}
|
||||||
|
FetchResponse::Terminate(m) => {
|
||||||
|
println!("Terminate worker: {}", m);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
FetchResponse::Jobs(jobs) => jobs,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
thread::sleep(long_pause);
|
fail_counter += 1;
|
||||||
|
if fail_counter == 3 {
|
||||||
|
log::error!("failed to fetch three times, assume that server crashed and exit");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
log::error!("failed to fetch new jobs");
|
||||||
|
time::sleep(long_pause).await;
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
for job in jobs {
|
||||||
|
let status = match run(&job).await {
|
||||||
|
Ok(()) => Status::Completed,
|
||||||
|
Err(..) => Status::Failed,
|
||||||
|
};
|
||||||
|
let update = &[Job {
|
||||||
|
status,
|
||||||
|
..job.clone()
|
||||||
|
}];
|
||||||
|
if let Err(error) = client.update(update).await {
|
||||||
|
log::info!("Could not update job: {}", error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
heartbeat_handle.abort();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub struct Update {
|
pub struct Update {
|
||||||
pub worker: i32,
|
pub worker: String,
|
||||||
pub job: i32,
|
pub job: i32,
|
||||||
pub status: Status,
|
pub status: Status,
|
||||||
}
|
}
|
||||||
@@ -12,7 +12,7 @@ pub struct Update {
|
|||||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub enum Status {
|
pub enum Status {
|
||||||
Submitted,
|
Submitted,
|
||||||
Running,
|
Running(String),
|
||||||
Completed,
|
Completed,
|
||||||
Failed,
|
Failed,
|
||||||
}
|
}
|
||||||
@@ -27,7 +27,7 @@ impl fmt::Display for Status {
|
|||||||
fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(self: &Self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
Status::Submitted => write!(f, "submitted"),
|
Status::Submitted => write!(f, "submitted"),
|
||||||
Status::Running => write!(f, "running"),
|
Status::Running(w) => write!(f, "running on worker {}", w),
|
||||||
Status::Completed => write!(f, "completed"),
|
Status::Completed => write!(f, "completed"),
|
||||||
Status::Failed => write!(f, "failed"),
|
Status::Failed => write!(f, "failed"),
|
||||||
}
|
}
|
||||||
@@ -46,6 +46,8 @@ pub struct Job {
|
|||||||
pub cmd: String,
|
pub cmd: String,
|
||||||
#[serde(default = "Status::default")]
|
#[serde(default = "Status::default")]
|
||||||
pub status: Status,
|
pub status: Status,
|
||||||
|
#[serde(default)]
|
||||||
|
pub threads: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
@@ -55,13 +57,20 @@ pub struct Node {
|
|||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub struct RegisterResponse {
|
pub struct RegisterResponse {
|
||||||
pub id: i32,
|
pub id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct FetchRequest {
|
||||||
|
pub worker_id: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub threads: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub enum FetchResponse {
|
pub enum FetchResponse {
|
||||||
Jobs(Vec<Job>),
|
Jobs(Vec<Job>),
|
||||||
StopWorking,
|
Terminate(String),
|
||||||
Nop,
|
Nop,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,5 +82,13 @@ pub struct Submit {
|
|||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub struct Worker {
|
pub struct Worker {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub id: i32,
|
pub id: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub last_heartbeat: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct Heartbeat {
|
||||||
|
#[serde(default)]
|
||||||
|
pub id: String,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,3 +16,6 @@ clap = "3.2"
|
|||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
futures = "0.3.24"
|
futures = "0.3.24"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
chrono = "0.4.22"
|
||||||
|
uuid = { version = "1.1.2", features = ["v4"] }
|
||||||
|
|||||||
24
zoidberg_server/src/auth.rs
Normal file
24
zoidberg_server/src/auth.rs
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
use actix_web::{dev, error::ErrorBadRequest, Error, FromRequest, HttpRequest, Result};
|
||||||
|
use futures::future::{err, ok, Ready};
|
||||||
|
|
||||||
|
pub struct Authorization {}
|
||||||
|
|
||||||
|
impl FromRequest for Authorization {
|
||||||
|
type Error = Error;
|
||||||
|
type Future = Ready<Result<Self, Self::Error>>;
|
||||||
|
|
||||||
|
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
|
||||||
|
if let Some(head) = req.headers().get("cookie") {
|
||||||
|
if let Ok(cookie) = head.to_str() {
|
||||||
|
if let Some(secret) = req.app_data::<String>() {
|
||||||
|
if secret == cookie {
|
||||||
|
return ok(Authorization {});
|
||||||
|
} else {
|
||||||
|
return err(ErrorBadRequest("no auth"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err(ErrorBadRequest("no auth"))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,21 +1,25 @@
|
|||||||
use actix_web::error::ErrorBadRequest;
|
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
dev, get, middleware::Logger, post, web, App, Error, FromRequest, HttpRequest, HttpResponse,
|
get, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder, Result,
|
||||||
HttpServer, Responder, Result,
|
|
||||||
};
|
};
|
||||||
use clap;
|
use chrono::Utc;
|
||||||
use env_logger::Env;
|
use env_logger::Env;
|
||||||
use futures::future::{err, ok, Ready};
|
|
||||||
use log;
|
|
||||||
use std::sync::Mutex;
|
|
||||||
use zoidberg_lib::types::{FetchResponse, Job, RegisterResponse, StatusRequest, Update, Worker};
|
|
||||||
|
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::time::Duration;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use zoidberg_lib::types::{
|
||||||
|
FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, StatusRequest, Update,
|
||||||
|
Worker,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod auth;
|
||||||
mod webpage;
|
mod webpage;
|
||||||
|
|
||||||
|
use auth::Authorization;
|
||||||
|
|
||||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
|
|
||||||
struct State {
|
struct State {
|
||||||
counter_workers: Mutex<i32>,
|
|
||||||
counter_jobs: Mutex<i32>,
|
counter_jobs: Mutex<i32>,
|
||||||
workers: Mutex<Vec<Worker>>,
|
workers: Mutex<Vec<Worker>>,
|
||||||
new_jobs: Mutex<Vec<Job>>,
|
new_jobs: Mutex<Vec<Job>>,
|
||||||
@@ -25,7 +29,6 @@ struct State {
|
|||||||
impl State {
|
impl State {
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
counter_workers: Mutex::new(0),
|
|
||||||
counter_jobs: Mutex::new(0),
|
counter_jobs: Mutex::new(0),
|
||||||
workers: Mutex::new(Vec::new()),
|
workers: Mutex::new(Vec::new()),
|
||||||
new_jobs: Mutex::new(Vec::new()),
|
new_jobs: Mutex::new(Vec::new()),
|
||||||
@@ -34,59 +37,69 @@ impl State {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Authorization {}
|
|
||||||
|
|
||||||
impl FromRequest for Authorization {
|
|
||||||
type Error = Error;
|
|
||||||
type Future = Ready<Result<Self, Self::Error>>;
|
|
||||||
|
|
||||||
fn from_request(req: &HttpRequest, _payload: &mut dev::Payload) -> Self::Future {
|
|
||||||
if let Some(head) = req.headers().get("cookie") {
|
|
||||||
if let Ok(cookie) = head.to_str() {
|
|
||||||
if let Some(secret) = req.app_data::<String>() {
|
|
||||||
println!("{} == {}", secret, cookie);
|
|
||||||
if secret == cookie {
|
|
||||||
return ok(Authorization {});
|
|
||||||
} else {
|
|
||||||
return err(ErrorBadRequest("no auth"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err(ErrorBadRequest("no auth"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[get("/")]
|
#[get("/")]
|
||||||
async fn index(data: web::Data<State>) -> impl Responder {
|
async fn index(data: web::Data<State>) -> impl Responder {
|
||||||
let workers = data.workers.lock().unwrap();
|
let workers = data.workers.lock().unwrap();
|
||||||
let new_jobs = data.new_jobs.lock().unwrap();
|
let jobs = data.jobs.lock().unwrap();
|
||||||
let page = webpage::render(&*new_jobs, &*workers);
|
let filtered_jobs: Vec<Job> = jobs
|
||||||
|
.to_vec()
|
||||||
|
.iter()
|
||||||
|
.filter(|x| !matches!(x.status, Status::Completed))
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let page = webpage::render(&filtered_jobs, &*workers);
|
||||||
HttpResponse::Ok().body(page)
|
HttpResponse::Ok().body(page)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/register")]
|
#[get("/register")]
|
||||||
async fn register(data: web::Data<State>, _: Authorization) -> Result<impl Responder> {
|
async fn register(data: web::Data<State>, _: Authorization) -> Result<impl Responder> {
|
||||||
let mut counter_workers = data.counter_workers.lock().unwrap();
|
|
||||||
*counter_workers += 1;
|
|
||||||
|
|
||||||
let mut workers = data.workers.lock().unwrap();
|
let mut workers = data.workers.lock().unwrap();
|
||||||
|
let uuid = Uuid::new_v4().to_string();
|
||||||
workers.push(Worker {
|
workers.push(Worker {
|
||||||
id: *counter_workers,
|
id: uuid.clone(),
|
||||||
|
last_heartbeat: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
log::info!("Registered worker node with id: {}", *counter_workers);
|
log::info!("Registered worker node with id: {}", uuid);
|
||||||
Ok(web::Json(RegisterResponse {
|
Ok(web::Json(RegisterResponse { id: uuid }))
|
||||||
id: *counter_workers,
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/fetch")]
|
#[post("/fetch")]
|
||||||
async fn fetch(data: web::Data<State>, _: Authorization) -> Result<impl Responder> {
|
async fn fetch(
|
||||||
let mut new_jobs = data.new_jobs.lock().unwrap();
|
data: web::Data<State>,
|
||||||
if let Some(j) = new_jobs.pop() {
|
f: web::Json<FetchRequest>,
|
||||||
return Ok(web::Json(FetchResponse::Jobs(vec![j])));
|
_: Authorization,
|
||||||
|
) -> Result<impl Responder> {
|
||||||
|
let f = f.into_inner();
|
||||||
|
let requesting_worker = f.worker_id;
|
||||||
|
let threads = f.threads;
|
||||||
|
{
|
||||||
|
let workers = data.workers.lock().unwrap();
|
||||||
|
if workers.iter().filter(|w| w.id == requesting_worker).count() != 1 {
|
||||||
|
return Ok(web::Json(FetchResponse::Terminate(
|
||||||
|
"Worker not found".into(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
let mut new_jobs = data.new_jobs.lock().unwrap();
|
||||||
|
new_jobs.sort_by(|a, b| b.threads.cmp(&a.threads));
|
||||||
|
|
||||||
|
if let Some(j) = new_jobs
|
||||||
|
.iter()
|
||||||
|
.filter(|x| x.threads <= threads)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<Job>>()
|
||||||
|
.first()
|
||||||
|
{
|
||||||
|
let mut jobs = data.jobs.lock().unwrap();
|
||||||
|
for cj in jobs.iter_mut() {
|
||||||
|
if cj.id == j.id {
|
||||||
|
cj.status = Status::Running(requesting_worker.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
new_jobs.retain(|x| x.id != j.id);
|
||||||
|
return Ok(web::Json(FetchResponse::Jobs(vec![j.clone()])));
|
||||||
|
};
|
||||||
Ok(web::Json(FetchResponse::Nop))
|
Ok(web::Json(FetchResponse::Nop))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -131,6 +144,22 @@ async fn update(
|
|||||||
Ok(format!("Worker updated {} job(s)", n))
|
Ok(format!("Worker updated {} job(s)", n))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[post("/heartbeat")]
|
||||||
|
async fn heartbeat(
|
||||||
|
heartbeat: web::Json<Heartbeat>,
|
||||||
|
data: web::Data<State>,
|
||||||
|
_: Authorization,
|
||||||
|
) -> Result<String> {
|
||||||
|
log::debug!("Heartbeat from worker {}", heartbeat.id);
|
||||||
|
let mut workers = data.workers.lock().unwrap();
|
||||||
|
for w in workers.iter_mut() {
|
||||||
|
if w.id == heartbeat.id {
|
||||||
|
w.last_heartbeat = Some(Utc::now().timestamp());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(format!("Heartbeat from worker {}", heartbeat.id))
|
||||||
|
}
|
||||||
|
|
||||||
#[post("/submit")]
|
#[post("/submit")]
|
||||||
async fn submit(
|
async fn submit(
|
||||||
data: web::Data<State>,
|
data: web::Data<State>,
|
||||||
@@ -163,17 +192,41 @@ async fn main() -> std::io::Result<()> {
|
|||||||
env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init();
|
env_logger::Builder::from_env(Env::default().default_filter_or("zoidberg_server=info")).init();
|
||||||
|
|
||||||
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
|
let secret = std::env::var("ZOIDBERG_SECRET").unwrap_or_else(|_| {
|
||||||
println!("Please set the $ZOIDBERG_SECRET environment variable");
|
eprintln!("Please set the $ZOIDBERG_SECRET environment variable");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let _matches = clap::App::new("Zoidberg server")
|
let _matches = clap::App::new("Zoidberg server")
|
||||||
.version(VERSION)
|
.version(VERSION)
|
||||||
.author("Johannes Heuel")
|
.author("by Johannes Heuel")
|
||||||
.get_matches();
|
.get_matches();
|
||||||
|
|
||||||
let state = web::Data::new(State::new());
|
let state = web::Data::new(State::new());
|
||||||
|
|
||||||
|
let s = state.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
|
{
|
||||||
|
let mut workers = s.workers.lock().unwrap();
|
||||||
|
workers.retain(|w| match w.last_heartbeat {
|
||||||
|
None => true,
|
||||||
|
Some(t) => Utc::now().timestamp() - t < 60,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
let workers = s.workers.lock().unwrap();
|
||||||
|
let mut jobs = s.jobs.lock().unwrap();
|
||||||
|
for job in jobs.iter_mut() {
|
||||||
|
if let Status::Running(w) = &job.status {
|
||||||
|
let exists = workers.iter().filter(|x| &x.id == w).count() > 0;
|
||||||
|
if !exists {
|
||||||
|
job.status = Status::Failed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
App::new()
|
App::new()
|
||||||
.wrap(Logger::default())
|
.wrap(Logger::default())
|
||||||
@@ -184,9 +237,10 @@ async fn main() -> std::io::Result<()> {
|
|||||||
.service(fetch)
|
.service(fetch)
|
||||||
.service(status)
|
.service(status)
|
||||||
.service(update)
|
.service(update)
|
||||||
|
.service(heartbeat)
|
||||||
.service(submit)
|
.service(submit)
|
||||||
})
|
})
|
||||||
.bind(("127.0.0.1", 8080))?
|
.bind(("0.0.0.0", 8080))?
|
||||||
.run()
|
.run()
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -224,31 +278,53 @@ mod tests {
|
|||||||
.uri("/register")
|
.uri("/register")
|
||||||
.to_request();
|
.to_request();
|
||||||
let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await;
|
let resp: RegisterResponse = test::call_and_read_body_json(&app, req).await;
|
||||||
assert_eq!(resp.id, 1);
|
assert!(!resp.id.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_web::test]
|
#[actix_web::test]
|
||||||
async fn test_fetch() {
|
async fn test_fetch() {
|
||||||
let cmd = String::from("hi");
|
let cmd = String::from("hi");
|
||||||
|
let jobid = 11;
|
||||||
let app = test::init_service(
|
let app = test::init_service(
|
||||||
App::new()
|
App::new()
|
||||||
.app_data(String::from("secret"))
|
.app_data(String::from("secret"))
|
||||||
.app_data(web::Data::new(State {
|
.app_data(web::Data::new(State {
|
||||||
counter_workers: Mutex::new(0),
|
|
||||||
counter_jobs: Mutex::new(0),
|
counter_jobs: Mutex::new(0),
|
||||||
workers: Mutex::new(Vec::new()),
|
workers: Mutex::new(vec![Worker {
|
||||||
new_jobs: Mutex::new(vec![Job {
|
id: "some_worker".to_string(),
|
||||||
id: 0,
|
last_heartbeat: None,
|
||||||
cmd: cmd.clone(),
|
|
||||||
status: Status::Submitted,
|
|
||||||
}]),
|
}]),
|
||||||
|
new_jobs: Mutex::new(vec![
|
||||||
|
Job {
|
||||||
|
id: jobid,
|
||||||
|
cmd: cmd.clone(),
|
||||||
|
status: Status::Submitted,
|
||||||
|
threads: 1,
|
||||||
|
},
|
||||||
|
Job {
|
||||||
|
id: jobid + 1,
|
||||||
|
cmd: cmd.clone(),
|
||||||
|
status: Status::Submitted,
|
||||||
|
threads: 2,
|
||||||
|
},
|
||||||
|
Job {
|
||||||
|
id: jobid + 2,
|
||||||
|
cmd: cmd.clone(),
|
||||||
|
status: Status::Submitted,
|
||||||
|
threads: 3,
|
||||||
|
},
|
||||||
|
]),
|
||||||
jobs: Mutex::new(Vec::new()),
|
jobs: Mutex::new(Vec::new()),
|
||||||
}))
|
}))
|
||||||
.service(fetch),
|
.service(fetch),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let req = test::TestRequest::get()
|
let req = test::TestRequest::post()
|
||||||
.append_header(("cookie", "secret"))
|
.append_header(("cookie", "secret"))
|
||||||
|
.set_json(FetchRequest {
|
||||||
|
worker_id: "some_worker".to_string(),
|
||||||
|
threads: 1,
|
||||||
|
})
|
||||||
.uri("/fetch")
|
.uri("/fetch")
|
||||||
.to_request();
|
.to_request();
|
||||||
let resp: FetchResponse = test::call_and_read_body_json(&app, req).await;
|
let resp: FetchResponse = test::call_and_read_body_json(&app, req).await;
|
||||||
@@ -256,11 +332,11 @@ mod tests {
|
|||||||
FetchResponse::Nop => {
|
FetchResponse::Nop => {
|
||||||
panic!("did not expect FetchResponse::Nop")
|
panic!("did not expect FetchResponse::Nop")
|
||||||
}
|
}
|
||||||
FetchResponse::StopWorking => {
|
FetchResponse::Terminate(w) => {
|
||||||
panic!("did not expect FetchResponse::NotWorking")
|
panic!("did not expect FetchResponse::Terminate from worker {}", w)
|
||||||
}
|
}
|
||||||
FetchResponse::Jobs(new_jobs) => {
|
FetchResponse::Jobs(new_jobs) => {
|
||||||
assert_eq!(new_jobs[0].id, 0);
|
assert_eq!(new_jobs[0].id, jobid);
|
||||||
assert_eq!(new_jobs[0].cmd, cmd);
|
assert_eq!(new_jobs[0].cmd, cmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -269,18 +345,19 @@ mod tests {
|
|||||||
#[actix_web::test]
|
#[actix_web::test]
|
||||||
async fn test_status() {
|
async fn test_status() {
|
||||||
let cmd = String::from("hi");
|
let cmd = String::from("hi");
|
||||||
|
let jobid = 1;
|
||||||
let app = test::init_service(
|
let app = test::init_service(
|
||||||
App::new()
|
App::new()
|
||||||
.app_data(String::from("secret"))
|
.app_data(String::from("secret"))
|
||||||
.app_data(web::Data::new(State {
|
.app_data(web::Data::new(State {
|
||||||
counter_workers: Mutex::new(0),
|
|
||||||
counter_jobs: Mutex::new(0),
|
counter_jobs: Mutex::new(0),
|
||||||
workers: Mutex::new(Vec::new()),
|
workers: Mutex::new(Vec::new()),
|
||||||
new_jobs: Mutex::new(Vec::new()),
|
new_jobs: Mutex::new(Vec::new()),
|
||||||
jobs: Mutex::new(vec![Job {
|
jobs: Mutex::new(vec![Job {
|
||||||
id: 1,
|
id: jobid,
|
||||||
cmd: cmd.clone(),
|
cmd: cmd.clone(),
|
||||||
status: Status::Running,
|
status: Status::Submitted,
|
||||||
|
threads: 1,
|
||||||
}]),
|
}]),
|
||||||
}))
|
}))
|
||||||
.service(status),
|
.service(status),
|
||||||
@@ -288,11 +365,11 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
let req = test::TestRequest::post()
|
let req = test::TestRequest::post()
|
||||||
.append_header(("cookie", "secret"))
|
.append_header(("cookie", "secret"))
|
||||||
.set_json(vec![StatusRequest { id: 1 }])
|
.set_json(vec![StatusRequest { id: jobid }])
|
||||||
.uri("/status")
|
.uri("/status")
|
||||||
.to_request();
|
.to_request();
|
||||||
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
|
let resp: Vec<Job> = test::call_and_read_body_json(&app, req).await;
|
||||||
assert_eq!(resp[0].id, 1);
|
assert_eq!(resp[0].id, jobid);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_web::test]
|
#[actix_web::test]
|
||||||
@@ -307,9 +384,9 @@ mod tests {
|
|||||||
let req = test::TestRequest::post()
|
let req = test::TestRequest::post()
|
||||||
.append_header(("cookie", "secret"))
|
.append_header(("cookie", "secret"))
|
||||||
.set_json(vec![Update {
|
.set_json(vec![Update {
|
||||||
worker: 0,
|
worker: "some_worker".to_string(),
|
||||||
job: 0,
|
job: 0,
|
||||||
status: Status::Running,
|
status: Status::Submitted,
|
||||||
}])
|
}])
|
||||||
.uri("/update")
|
.uri("/update")
|
||||||
.to_request();
|
.to_request();
|
||||||
@@ -331,7 +408,8 @@ mod tests {
|
|||||||
.set_json(vec![Job {
|
.set_json(vec![Job {
|
||||||
id: 0,
|
id: 0,
|
||||||
cmd: String::from("hi"),
|
cmd: String::from("hi"),
|
||||||
status: Status::Running,
|
status: Status::Submitted,
|
||||||
|
threads: 1,
|
||||||
}])
|
}])
|
||||||
.uri("/submit")
|
.uri("/submit")
|
||||||
.to_request();
|
.to_request();
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
|
use chrono::Utc;
|
||||||
use zoidberg_lib::types::{Job, Worker};
|
use zoidberg_lib::types::{Job, Worker};
|
||||||
|
|
||||||
// TODO: write nicer frontend
|
// TODO: write nicer frontend
|
||||||
pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
|
pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
|
||||||
let jobs_html: String = String::from("<table class=\"table is-hoverable\">")
|
let jobs_html: String = String::from("<table class=\"table is-hoverable\">")
|
||||||
+ "<thead><tr><th><td>ID</td><td>command</td><td>status</td></th></tr></thead><tbody>"
|
+ "<thead><tr><th>ID</th><th style=\"width: 150px;\">command</th><th>status</th></tr></thead><tbody>"
|
||||||
+ &jobs
|
+ &jobs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|j| {
|
.map(|j| {
|
||||||
format!(
|
format!(
|
||||||
"<tr><th></th><td>{}</td><td>{}</td><td>{}</td></tr>",
|
"<tr><td>{}</td><td>{}</td><td>{}</td></tr>",
|
||||||
j.id, j.cmd, j.status
|
j.id, j.cmd, j.status
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@@ -17,14 +18,28 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
|
|||||||
+ "</tbody></table>";
|
+ "</tbody></table>";
|
||||||
|
|
||||||
let workers_html: String = String::from("<table class=\"table is-hoverable\">")
|
let workers_html: String = String::from("<table class=\"table is-hoverable\">")
|
||||||
+ "<thead><tr><th><td>ID</td></th></tr></thead><tbody>"
|
+ "<thead><tr><th>ID</th><th>last heartbeat</th></tr></thead><tbody>"
|
||||||
+ &workers
|
+ &workers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|w| format!("<tr><th></th><td>{}</td></tr>", w.id))
|
.map(|w| {
|
||||||
|
let ts = if let Some(ts) = w.last_heartbeat {
|
||||||
|
format!("{}", Utc::now().timestamp() - ts)
|
||||||
|
} else {
|
||||||
|
String::from("")
|
||||||
|
};
|
||||||
|
format!("<tr><td>{}</td><td>{}</td></tr>", w.id, ts)
|
||||||
|
})
|
||||||
.collect::<Vec<String>>()
|
.collect::<Vec<String>>()
|
||||||
.join("\n")
|
.join("\n")
|
||||||
+ "</tbody></table>";
|
+ "</tbody></table>";
|
||||||
|
|
||||||
|
let style = r#"<style>
|
||||||
|
td {
|
||||||
|
max-width: 40vw;
|
||||||
|
word-wrap: break-word;
|
||||||
|
}
|
||||||
|
</style>"#;
|
||||||
|
|
||||||
let _debug_html = r#"<style>
|
let _debug_html = r#"<style>
|
||||||
*:not(path):not(g) {{
|
*:not(path):not(g) {{
|
||||||
color: hsla(210, 100%, 100%, 0.9) !important;
|
color: hsla(210, 100%, 100%, 0.9) !important;
|
||||||
@@ -43,9 +58,12 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
|
|||||||
<head>
|
<head>
|
||||||
<meta charset="utf-8">
|
<meta charset="utf-8">
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||||
<title>Hello Bulma!</title>
|
<title>Zoidberg</title>
|
||||||
|
<link rel="icon" href="data:image/svg+xml,%3Csvg%20xmlns='http://www.w3.org/2000/svg'%20viewBox='0%200%2016%2016'%3E%3Ctext%20x='0'%20y='14'%3E🦀%3C/text%3E%3C/svg%3E" type="image/svg+xml" />
|
||||||
|
|
||||||
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
|
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
|
||||||
{}
|
{}
|
||||||
|
{}
|
||||||
</head>
|
</head>
|
||||||
<body>
|
<body>
|
||||||
<section class="section">
|
<section class="section">
|
||||||
@@ -73,7 +91,7 @@ pub fn render(jobs: &[Job], workers: &[Worker]) -> String {
|
|||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
"#,
|
"#,
|
||||||
_debug_html, jobs_html, workers_html
|
style, _debug_html, jobs_html, workers_html
|
||||||
);
|
);
|
||||||
page
|
page
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user