Commit 517795c8 authored by Fardale's avatar Fardale

change to dynamic resource handling

the server query each node to find a node with enough
resources
parent c35bccdc
......@@ -5,87 +5,140 @@ module SHashtbl = CCHashtbl.Make (CCString)
(* TODO: store currently running jobs and check from time
* to time that node are still alive *)
let nodes : (node * computation list) SHashtbl.t = SHashtbl.create 10
let nodes : (int * computation list) SHashtbl.t = SHashtbl.create 10
let jobs : job CCDeque.t = CCDeque.create ()
let jobs_id = ref 0
let string_of_ret_code = function
| `WEXITED i ->
Printf.sprintf "WEXITED %i" i
| `WSIGNALED s ->
Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s ->
Printf.sprintf "WSTOPPED %i" s
| `WEXITED i -> Printf.sprintf "WEXITED %i" i
| `WSIGNALED s -> Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s -> Printf.sprintf "WSTOPPED %i" s
let string_of_sockaddr = function
| Unix.ADDR_UNIX s ->
s
| Unix.ADDR_INET (ip, _) ->
Unix.string_of_inet_addr ip
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (ip, _) -> Unix.string_of_inet_addr ip
let end_job id sockaddr =
let n, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in
let port, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in
let j = List.find (fun (c : computation) -> c.id = id) l in
SHashtbl.replace nodes
(string_of_sockaddr sockaddr)
( {n with cpu= n.cpu + j.cpu; ram= n.ram + j.ram}
, CCList.remove_one ~eq:(fun (c1 : computation) c2 -> c1.id = c2.id) j l ) ;
( port,
CCList.remove_one ~eq:(fun (c1 : computation) c2 -> c1.id = c2.id) j l );
Lwt.return_unit
let rec launch_job port pass () =
if not (CCDeque.is_empty jobs) then (
let get_stats pass sockaddr =
try%lwt
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (pass, `STAT))
>>= fun () ->
Lwt_io.read_line ic >>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.stat_of_string json)
with
| Ok (cpu, ram) ->
Logs_lwt.debug (fun m -> m "Stat: cpu: %i ram: %i" cpu ram)
>>= fun () -> Lwt.return (cpu, ram)
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the stats: %s"
(Printexc.to_string e))
>|= fun () -> (0, 0))
with e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the stats: %s" (Printexc.to_string e))
>|= fun () -> (0, 0)
let send_computation sockaddr pass computation =
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_line oc
(Serialization_j.string_of_query (pass, `COMPUTATION computation))
>>= fun () ->
Lwt_io.flush oc >>= fun () ->
try%lwt
Lwt_io.read_line ic >>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Computation %i,%i successfully sent."
(fst computation.id) (snd computation.id))
>|= fun () -> true
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst computation.id) (snd computation.id) s)
>|= fun () -> false )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
(Printexc.to_string e))
>|= fun () -> false
with End_of_file ->
Logs_lwt.err (fun m -> m "Error during the read of the answer: EOF")
>|= fun () -> false)
let mutex = ref true
let rec launch_job server_port pass () =
if !mutex && not (CCDeque.is_empty jobs) then (
mutex := false;
let job = CCDeque.peek_front jobs in
match
SHashtbl.fold
(fun k ((n : node), l) node ->
match node with
| Some x ->
Some x
| None ->
if n.cpu >= job.cpu && n.ram >= job.ram then Some (k, n, l)
else None)
nodes None
with
SHashtbl.fold
(fun addr (port, l) node ->
node >>= fun node ->
match node with
| Some x -> Lwt.return (Some x)
| None ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string addr, port)
in
get_stats pass sockaddr >|= fun (cpu, ram) ->
if cpu >= job.cpu && ram >= job.ram then Some (addr, port, l)
else None)
nodes (Lwt.return None)
>>= fun node ->
match node with
| None ->
mutex := true;
Logs_lwt.debug (fun m -> m "No free node")
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
in
| Some (addr, port, l) ->
let sockaddr = Unix.ADDR_INET (Unix.inet_addr_of_string addr, port) in
let computation =
{ id= (job.id, job.current)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" job.current
; Printf.sprintf "OCLUSTER_TASK_ID=%i" job.id ]
; script= job.script
; args= job.args
; time= job.time
; port
; cpu= job.cpu
; ram= job.ram }
{
id = (job.id, job.current);
env =
[
Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" job.current;
Printf.sprintf "OCLUSTER_TASK_ID=%i" job.id;
];
script = job.script;
args = job.args;
time = job.time;
port = server_port;
cpu = job.cpu;
ram = job.ram;
}
in
SHashtbl.replace nodes k
( {n with cpu= n.cpu - computation.cpu; ram= n.ram - computation.ram}
, computation :: l ) ;
if job.current + 1 < job.iteration then job.current <- job.current + 1
else ignore (CCDeque.take_front jobs) ;
Logs_lwt.debug (fun m ->
m "Send computation %i,%i to %s on %i" (fst computation.id)
(snd computation.id) n.addr n.port)
>>= fun () ->
Lwt_io.with_connection sockaddr (fun (_ic, oc) ->
Lwt_io.write_line oc
(Serialization_j.string_of_query (pass, `COMPUTATION computation)))
(* TODO: check return value *)
>>= Lwt.pause
>>= launch_job port pass )
else Logs_lwt.debug (fun m -> m "No computation")
if%lwt send_computation sockaddr pass computation then (
( SHashtbl.replace nodes addr (port, computation :: l);
if job.current + 1 < job.iteration then
job.current <- job.current + 1
else CCDeque.remove_front jobs;
Lwt.return_unit )
>>= fun () ->
Lwt_unix.sleep 2. >>= fun () ->
mutex := true;
launch_job server_port pass () ) )
else Lwt.return_unit
let server_handler pass port sockaddr (ic, oc) =
Lwt_io.read_line ic
>>= fun json ->
Lwt_io.read_line ic >>= fun json ->
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok (query_pass, query) ->
if query_pass = pass then
......@@ -96,33 +149,39 @@ let server_handler pass port sockaddr (ic, oc) =
<&> Logs_lwt.debug (fun m ->
m "Receive result: %s"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"}))
{
result with
stdout = "<stdout>";
stderr = "<stderr>";
}))
>|= fun () ->
(Lwt.async (fun () ->
Lwt.join
[ ( if String.length result.stdout > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
else Lwt.return_unit )
; ( if String.length result.stderr > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
else Lwt.return_unit )
; Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)))
; end_job result.id sockaddr >>= Lwt.pause
>>= launch_job port pass ]))
Lwt.async (fun () ->
Lwt.join
[
( if String.length result.stdout > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
else Lwt.return_unit );
( if String.length result.stderr > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
else Lwt.return_unit );
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)));
end_job result.id sockaddr >>= Lwt.pause
>>= launch_job port pass;
])
| `JOB submission ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
......@@ -132,19 +191,21 @@ let server_handler pass port sockaddr (ic, oc) =
>>= fun () ->
let empty = CCDeque.is_empty jobs in
let job =
{ id= !jobs_id
; name = submission.name
; current= 0
; iteration= submission.iteration
; script= submission.script
; args= submission.args
; time= submission.time
; port
; cpu= submission.cpu
; ram= submission.ram }
{
id = !jobs_id;
name = submission.name;
current = 0;
iteration = submission.iteration;
script = submission.script;
args = submission.args;
time = submission.time;
port;
cpu = submission.cpu;
ram = submission.ram;
}
in
CCDeque.push_back jobs job ;
incr jobs_id ;
CCDeque.push_back jobs job;
incr jobs_id;
if empty then launch_job port pass () else Lwt.return_unit
| `JOBQ (id, delete) as jobq ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
......@@ -158,20 +219,19 @@ let server_handler pass port sockaddr (ic, oc) =
try (CCDeque.peek_back jobs).id with CCDeque.Empty -> 0
in
let id = CCOpt.get_or ~default:last_id id in
CCDeque.filter_in_place jobs (fun j -> j.id != id) ;
CCDeque.filter_in_place jobs (fun j -> j.id != id);
Logs_lwt.info (fun m -> m "Removing jobs with id %i" id) )
else
(let jobs_list =
match id with
| None ->
CCDeque.to_list jobs
| Some id ->
CCDeque.(
filter (fun (j : job) -> j.id = id) jobs |> to_list)
in
Lwt_io.write_line oc
(Serialization_j.string_of_query_data (`JOBS jobs_list))
>>= fun () -> Lwt_io.flush oc)
let jobs_list =
match id with
| None -> CCDeque.to_list jobs
| Some id ->
CCDeque.(
filter (fun (j : job) -> j.id = id) jobs |> to_list)
in
Lwt_io.write_line oc
(Serialization_j.string_of_query_data (`JOBS jobs_list))
>>= fun () -> Lwt_io.flush oc
| _ ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Unwanted command"))
......@@ -186,31 +246,32 @@ let server_handler pass port sockaddr (ic, oc) =
Lwt_io.write_line oc
(Serialization_j.string_of_answer
(`Error
(Printf.sprintf
"Error during the reception of the computation: %s"
(Printf.sprintf "Error during the reception of the computation: %s"
(Printexc.to_string e))))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s"
(Printexc.to_string e))
let rec timer f = Lwt_unix.sleep 1800. >>= f >>= fun () -> timer f
let stop_server resolver server _ = Lwt.wakeup_later resolver server
let cmd config () =
let conf =
Serialization_j.master_conf_of_string (CCIO.with_in config CCIO.read_all)
in
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes ;
List.iter (fun n -> SHashtbl.add nodes n.addr (n.port, [])) conf.nodes;
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "master at %i with pass %s" conf.port conf.pass)
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass conf.port)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
( Lwt.async (fun () -> timer (launch_job conf.port conf.pass));
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass conf.port) )
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver ())
and _ = Lwt_unix.on_signal 2 (stop_server resolver ()) in
promise >>= fun () ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down server")
......@@ -3,22 +3,53 @@ open Serialization_t
let max_std = 5120000 (* 5Mo *)
let get_ram () =
CCIO.(with_in "/proc/meminfo" read_lines_l)
|> CCList.find_map (fun l ->
if CCString.prefix ~pre:"MemAvailable" l then
match String.split_on_char ':' l with
| [ _; ram ] ->
Option.bind
(String.split_on_char ' ' (String.trim ram) |> CCList.head_opt)
CCInt.of_string
| _ -> None
else None)
let get_load () =
Option.bind
CCIO.(with_in "/proc/loadavg" read_line)
(fun s ->
match String.split_on_char ' ' s with
| v :: _ -> Some (CCFloat.of_string_exn v)
| _ -> None)
let get_cpu_count () =
CCIO.(with_in "/proc/cpuinfo" read_lines_l)
|> CCList.find_map (fun l ->
if CCString.prefix ~pre:"cpu cores" l then
match String.split_on_char ':' l with
| [ _; ncpu ] -> String.trim ncpu |> CCInt.of_string
| _ -> None
else None)
let get_available_cpu () =
match (get_cpu_count (), get_load ()) with
| Some cpu_count, Some load -> cpu_count - CCFloat.(ceil (load +. 1.5) |> to_int)
| _ -> 0
let get_available_ram () =
match get_ram () with Some ram -> (ram / 1024) - 4096 | None -> 0
let process_status_to_ret_code = function
| Unix.WEXITED c ->
`WEXITED c
| Unix.WSIGNALED s ->
`WSIGNALED s
| Unix.WSTOPPED s ->
`WSTOPPED s
| Unix.WEXITED c -> `WEXITED c
| Unix.WSIGNALED s -> `WSIGNALED s
| Unix.WSTOPPED s -> `WSTOPPED s
let run_computation (computation : computation) =
Lwt_io.with_temp_file ~prefix:"ocluster" ~perm:0o700 (fun (name, oc) ->
Lwt_io.write_line oc computation.script
>>= fun () ->
Lwt_io.close oc
>>= fun () ->
Lwt_unix.sleep 2.
>>= fun () ->
Lwt_io.write_line oc computation.script >>= fun () ->
Lwt_io.close oc >>= fun () ->
Lwt_unix.sleep 2. >>= fun () ->
let read_stderr, write_stderr = Lwt_unix.pipe_in ()
and read_stdout, write_stdout = Lwt_unix.pipe_in () in
let%lwt ret_code =
......@@ -28,20 +59,20 @@ let run_computation (computation : computation) =
(name, Array.of_list (name :: computation.args))
and stdout = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
and stderr = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Lwt_unix.close read_stderr
>>= fun () ->
Lwt_unix.close read_stdout
>|= fun () ->
{ id= computation.id
; stdout=
( if String.length stdout > max_std then
CCString.drop (String.length stdout - max_std) stdout
else stdout )
; stderr=
( if String.length stderr > max_std then
CCString.drop (String.length stderr - max_std) stderr
else stderr )
; ret_code= process_status_to_ret_code ret_code })
Lwt_unix.close read_stderr >>= fun () ->
Lwt_unix.close read_stdout >|= fun () ->
{
id = computation.id;
stdout =
( if String.length stdout > max_std then
CCString.drop (String.length stdout - max_std) stdout
else stdout );
stderr =
( if String.length stderr > max_std then
CCString.drop (String.length stderr - max_std) stderr
else stderr );
ret_code = process_status_to_ret_code ret_code;
})
let rec send_result sockaddr pass result =
if%lwt
......@@ -49,26 +80,24 @@ let rec send_result sockaddr pass result =
Lwt_io.write_line oc
(Serialization_j.string_of_query (pass, `RESULT result))
>>= fun () ->
Lwt_io.flush oc
>>= fun () ->
Lwt_io.flush oc >>= fun () ->
try%lwt
Lwt_io.read_line ic
>>= fun json ->
Lwt_io.read_line ic >>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Result %i,%i successfully sent." (fst result.id)
(snd result.id))
>|= fun () -> false
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst result.id) (snd result.id) s)
>|= fun () -> true )
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Result %i,%i successfully sent." (fst result.id)
(snd result.id))
>|= fun () -> false
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst result.id) (snd result.id) s)
>|= fun () -> true )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
......@@ -80,22 +109,18 @@ let rec send_result sockaddr pass result =
then send_result sockaddr pass result
let handle_computation sockaddr pass computation () =
run_computation computation
>>= fun result ->
run_computation computation >>= fun result ->
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) ->
Unix.ADDR_INET (a, computation.port)
| s ->
s
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, computation.port)
| s -> s
in
send_result sockaddr pass result
<&> Logs_lwt.debug (fun m ->
m "End computation %i,%i" (fst computation.id) (snd computation.id))
let server_handler pass sockaddr (ic, oc) =
Lwt_io.read_line ic
>>= fun json ->
Lwt_io.read_line ic >>= fun json ->
( match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok (query_pass, query) ->
if query_pass = pass then
......@@ -106,11 +131,16 @@ let server_handler pass sockaddr (ic, oc) =
<&> Logs_lwt.debug (fun m ->
m "Receive computation: %s"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"}))
{ computation with env = []; script = "<script>" }))
>|= fun () ->
Lwt.async (handle_computation sockaddr pass computation)
| _ ->
Logs_lwt.warn (fun m -> m "Receive a unwanted command")
| `STAT ->
Lwt_io.write_line oc
(Serialization_j.string_of_stat
(get_available_cpu (), get_available_ram ()))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m -> m "Receive stat command")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command")
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
......@@ -131,15 +161,13 @@ let cmd config () =
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "Node at port %i with pass %s" conf.port conf.pass)
>>= fun () ->
Lwt_io.flush Lwt_io.stderr
>>= fun () ->
Lwt_io.flush Lwt_io.stderr >>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
promise >>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down node server")
type node = {addr: string; ~port <ocaml default="4242">: int; cpu: int; ram: int}
type node = {addr: string; ~port <ocaml default="4242">: int;}
type master_conf = {pass: string; ~port <ocaml default="4242">: int; nodes: node list}
type node_conf = {pass: string; ~port <ocaml default="4242">: int}
......@@ -16,7 +16,10 @@ type result =
{id: (int * int); stdout: string; stderr: string; ret_code: ret_code}
type query_data = [ COMPUTATION of computation | JOBQ of (int option * bool)
| RESULT of result | JOB of submission | JOBS of (job list) ]
| RESULT of result | JOB of submission | JOBS of (job list)
| STAT ]
type stat = (int * int)
type query = (string * query_data)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment