Commit 01b5b478 authored by Fardale's avatar Fardale

master check status of node every 30min

The master requeue jobs if nodes fails
parent 40910dc9
......@@ -190,7 +190,7 @@ let server_handler pass port sockaddr (ic, oc) =
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)));
end_job result.id sockaddr
end_job result.id sockaddr;
])
| `JOB submission ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
......@@ -249,7 +249,8 @@ let server_handler pass port sockaddr (ic, oc) =
List.map (fun (ip, (_, computations)) ->
( ip,
List.filter_map
(fun (c:computation) -> if fst c.id = id then Some c else None)
(fun (c : computation) ->
if fst c.id = id then Some c else None)
computations ))
in
Lwt_io.write_line oc
......@@ -277,7 +278,62 @@ let server_handler pass port sockaddr (ic, oc) =
m "Error during the reception of the computation: %s"
(Printexc.to_string e))
let rec timer f = Lwt_unix.sleep 180. >>= f >>= fun () -> timer f
let rec timer delay f = Lwt_unix.sleep delay >>= f >>= fun () -> timer delay f
let get_running_jobs pass sockaddr =
try%lwt
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (pass, `RUNNING))
>>= fun () ->
Lwt_io.read_line ic >>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.running_jobs_of_string json)
with
| Ok l -> Lwt.return l
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the running jobs: %s"
(Printexc.to_string e))
>|= fun () -> [])
with e ->
Logs_lwt.err (fun m ->
m "Error during the collect of the stats: %s" (Printexc.to_string e))
>|= fun () -> []
let check_jobs pass () =
SHashtbl.map_list
(fun ip (port, computations) ->
let sockaddr = Unix.ADDR_INET (Unix.inet_addr_of_string ip, port) in
get_running_jobs pass sockaddr >>= fun running_jobs ->
Lwt_list.iter_s
(fun computation ->
if not (List.mem computation running_jobs) then (
CCDeque.push_front jobs
{
id = fst computation.id;
name = "Restart computation";
current = snd computation.id;
iteration = snd computation.id + 1;
script = computation.script;
args = computation.args;
time = computation.time;
port = computation.port;
cpu = computation.cpu;
ram = computation.ram;
};
Lwt.return_unit )
else Lwt.return_unit)
computations
>|= fun () ->
SHashtbl.update nodes
~f:(fun _ e ->
match e with
| None -> Some (port, [])
| Some (port, l) ->
Some (port, List.filter (fun e -> List.mem e running_jobs) l))
~k:ip)
nodes
|> Lwt.join
let stop_server resolver server _ = Lwt.wakeup_later resolver server
......@@ -289,7 +345,8 @@ let cmd config () =
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "master at %i with pass %s" conf.port conf.pass)
>>= fun () ->
( Lwt.async (fun () -> timer (launch_job conf.port conf.pass));
( Lwt.async (fun () -> timer 180. (launch_job conf.port conf.pass));
Lwt.async (fun () -> timer 1800. (check_jobs conf.pass));
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass conf.port) )
......
......@@ -5,6 +5,8 @@ let max_std = 5120000 (* 5Mo *)
let current_load = ref 0.
let running_jobs = CCVector.create ()
let get_ram () =
Lwt_io.lines_of_file "/proc/meminfo"
|> Lwt_stream.find_map (fun line ->
......@@ -142,14 +144,16 @@ let rec send_result sockaddr pass result =
then send_result sockaddr pass result
let handle_computation sockaddr pass computation () =
CCVector.push running_jobs computation;
run_computation computation >>= fun result ->
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, computation.port)
| s -> s
in
CCVector.filter' (fun (c:computation) -> c.id != computation.id) running_jobs;
send_result sockaddr pass result
<&> Logs_lwt.debug (fun m ->
<&> Logs_lwt.info (fun m ->
m "End computation %i,%i" (fst computation.id) (snd computation.id))
let server_handler pass sockaddr (ic, oc) =
......@@ -173,7 +177,12 @@ let server_handler pass sockaddr (ic, oc) =
Lwt_io.write_line oc
(Serialization_j.string_of_stat (cpu_count, ram))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m -> m "Receive stat command")
<&> Logs_lwt.debug (fun m -> m "Receive STAT command")
| `RUNNING ->
Lwt_io.write_line oc
(Serialization_j.string_of_running_jobs (CCVector.to_list running_jobs))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m -> m "Receive RUNNING command")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command")
else
Lwt_io.write_line oc
......
......@@ -17,7 +17,9 @@ type result =
type query_data = [ COMPUTATION of computation | JOBQ of (int option * bool)
| RESULT of result | JOB of submission
| STAT ]
| STAT | RUNNING ]
type running_jobs = computation list
type jobs_status = (job list * (string * computation list) list)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment