This commit is contained in:
@@ -3,13 +3,17 @@ use actix_web::{
|
||||
dev, get, middleware::Logger, post, web, App, Error, FromRequest, HttpRequest, HttpResponse,
|
||||
HttpServer, Responder, Result,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use clap;
|
||||
use env_logger::Env;
|
||||
use futures::future::{err, ok, Ready};
|
||||
use log;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
use zoidberg_lib::types::{
|
||||
FetchResponse, Heartbeat, Job, RegisterResponse, StatusRequest, Update, Worker,
|
||||
FetchRequest, FetchResponse, Heartbeat, Job, RegisterResponse, Status, StatusRequest, Update,
|
||||
Worker,
|
||||
};
|
||||
|
||||
mod webpage;
|
||||
@@ -17,7 +21,6 @@ mod webpage;
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
||||
struct State {
|
||||
counter_workers: Mutex<i32>,
|
||||
counter_jobs: Mutex<i32>,
|
||||
workers: Mutex<Vec<Worker>>,
|
||||
new_jobs: Mutex<Vec<Job>>,
|
||||
@@ -27,7 +30,6 @@ struct State {
|
||||
impl State {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
counter_workers: Mutex::new(0),
|
||||
counter_jobs: Mutex::new(0),
|
||||
workers: Mutex::new(Vec::new()),
|
||||
new_jobs: Mutex::new(Vec::new()),
|
||||
@@ -68,24 +70,40 @@ async fn index(data: web::Data<State>) -> impl Responder {
|
||||
|
||||
#[get("/register")]
|
||||
async fn register(data: web::Data<State>, _: Authorization) -> Result<impl Responder> {
|
||||
let mut counter_workers = data.counter_workers.lock().unwrap();
|
||||
*counter_workers += 1;
|
||||
|
||||
let mut workers = data.workers.lock().unwrap();
|
||||
let uuid = Uuid::new_v4().to_string();
|
||||
workers.push(Worker {
|
||||
id: *counter_workers,
|
||||
id: uuid.clone(),
|
||||
last_heartbeat: None,
|
||||
});
|
||||
|
||||
log::info!("Registered worker node with id: {}", *counter_workers);
|
||||
Ok(web::Json(RegisterResponse {
|
||||
id: *counter_workers,
|
||||
}))
|
||||
log::info!("Registered worker node with id: {}", uuid);
|
||||
Ok(web::Json(RegisterResponse { id: uuid }))
|
||||
}
|
||||
|
||||
#[get("/fetch")]
|
||||
async fn fetch(data: web::Data<State>, _: Authorization) -> Result<impl Responder> {
|
||||
#[post("/fetch")]
|
||||
async fn fetch(
|
||||
data: web::Data<State>,
|
||||
f: web::Json<FetchRequest>,
|
||||
_: Authorization,
|
||||
) -> Result<impl Responder> {
|
||||
let requesting_worker = f.into_inner().worker_id;
|
||||
{
|
||||
let workers = data.workers.lock().unwrap();
|
||||
if workers.iter().filter(|w| w.id == requesting_worker).count() != 1 {
|
||||
return Ok(web::Json(FetchResponse::Terminate(
|
||||
"Worker not found".into(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
let mut new_jobs = data.new_jobs.lock().unwrap();
|
||||
if let Some(j) = new_jobs.pop() {
|
||||
let mut jobs = data.jobs.lock().unwrap();
|
||||
for cj in jobs.iter_mut() {
|
||||
if cj.id == j.id {
|
||||
cj.status = Status::Running(requesting_worker.clone())
|
||||
}
|
||||
}
|
||||
return Ok(web::Json(FetchResponse::Jobs(vec![j])));
|
||||
}
|
||||
Ok(web::Json(FetchResponse::Nop))
|
||||
@@ -133,8 +151,18 @@ async fn update(
|
||||
}
|
||||
|
||||
#[post("/heartbeat")]
|
||||
async fn heartbeat(heartbeat: web::Json<Heartbeat>, _: Authorization) -> Result<String> {
|
||||
async fn heartbeat(
|
||||
heartbeat: web::Json<Heartbeat>,
|
||||
data: web::Data<State>,
|
||||
_: Authorization,
|
||||
) -> Result<String> {
|
||||
log::info!("Heartbeat from worker {}", heartbeat.id);
|
||||
let mut workers = data.workers.lock().unwrap();
|
||||
for w in workers.iter_mut() {
|
||||
if w.id == heartbeat.id {
|
||||
w.last_heartbeat = Some(Utc::now().timestamp());
|
||||
}
|
||||
}
|
||||
Ok(format!("Heartbeat from worker {}", heartbeat.id))
|
||||
}
|
||||
|
||||
@@ -181,6 +209,18 @@ async fn main() -> std::io::Result<()> {
|
||||
|
||||
let state = web::Data::new(State::new());
|
||||
|
||||
let s = state.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
let mut workers = s.workers.lock().unwrap();
|
||||
workers.retain(|w| match w.last_heartbeat {
|
||||
None => true,
|
||||
Some(t) => Utc::now().timestamp() - t < 60,
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.wrap(Logger::default())
|
||||
@@ -264,8 +304,8 @@ mod tests {
|
||||
FetchResponse::Nop => {
|
||||
panic!("did not expect FetchResponse::Nop")
|
||||
}
|
||||
FetchResponse::StopWorking => {
|
||||
panic!("did not expect FetchResponse::NotWorking")
|
||||
FetchResponse::Terminate(w) => {
|
||||
panic!("did not expect FetchResponse::Terminate from worker {}", w)
|
||||
}
|
||||
FetchResponse::Jobs(new_jobs) => {
|
||||
assert_eq!(new_jobs[0].id, 0);
|
||||
@@ -288,7 +328,7 @@ mod tests {
|
||||
jobs: Mutex::new(vec![Job {
|
||||
id: 1,
|
||||
cmd: cmd.clone(),
|
||||
status: Status::Running,
|
||||
status: Status::Running(0),
|
||||
}]),
|
||||
}))
|
||||
.service(status),
|
||||
@@ -317,7 +357,7 @@ mod tests {
|
||||
.set_json(vec![Update {
|
||||
worker: 0,
|
||||
job: 0,
|
||||
status: Status::Running,
|
||||
status: Status::Running(0),
|
||||
}])
|
||||
.uri("/update")
|
||||
.to_request();
|
||||
@@ -339,7 +379,7 @@ mod tests {
|
||||
.set_json(vec![Job {
|
||||
id: 0,
|
||||
cmd: String::from("hi"),
|
||||
status: Status::Running,
|
||||
status: Status::Running(0),
|
||||
}])
|
||||
.uri("/submit")
|
||||
.to_request();
|
||||
|
||||
Reference in New Issue
Block a user