Commit c7fe0147 authored by Fardale's avatar Fardale
Browse files

format + do not write .err and .out file if empty

parent b0960be7
(lang dune 1.0)
(lang dune 1.11)
(using fmt 1.1)
(name ocluster)
......@@ -3,25 +3,23 @@ open Serialization_t
let cmd cpu ram time iteration port script_file pass addr () =
let script = CCIO.with_in script_file CCIO.read_all in
let submission = {script; time; iteration; cpu; ram; pass} in
let submission = { script; time; iteration; cpu; ram; pass } in
Lwt_io.with_connection
(Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
(fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (`JOB submission))
>>= fun () ->
Lwt_io.read ic
>>= fun json ->
Lwt_io.read ic >>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok ->
Logs_lwt.info (fun m -> m "Computation successfully sent.")
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s" s ) )
match answer with
| `Ok -> Logs_lwt.info (fun m -> m "Computation successfully sent.")
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s" s) )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
(Printexc.to_string e) ) )
(Printexc.to_string e)))
......@@ -12,28 +12,22 @@ let jobs : computation CCDeque.t = CCDeque.create ()
let jobs_id = ref 0
let string_of_ret_code = function
| `WEXITED i ->
Printf.sprintf "WEXITED %i" i
| `WSIGNALED s ->
Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s ->
Printf.sprintf "WSTOPPED %i" s
| `WEXITED i -> Printf.sprintf "WEXITED %i" i
| `WSIGNALED s -> Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s -> Printf.sprintf "WSTOPPED %i" s
let string_of_sockaddr = function
| Unix.ADDR_UNIX s ->
s
| Unix.ADDR_INET (ip, _) ->
Unix.string_of_inet_addr ip
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (ip, _) -> Unix.string_of_inet_addr ip
let end_job id sockaddr =
let n, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in
let j = List.find (fun c -> c.id = id) l in
Lwt.pause ()
>|= fun () ->
Lwt.pause () >|= fun () ->
SHashtbl.replace nodes
(string_of_sockaddr sockaddr)
( {n with cpu= n.cpu + j.cpu; ram= n.ram + j.ram}
, CCList.remove ~eq:(fun c1 c2 -> c1.id = c2.id) ~key:j l )
( { n with cpu = n.cpu + j.cpu; ram = n.ram + j.ram },
CCList.remove ~eq:(fun c1 c2 -> c1.id = c2.id) ~key:j l )
let rec launch_job () =
if not (CCDeque.is_empty jobs) then (
......@@ -42,112 +36,122 @@ let rec launch_job () =
SHashtbl.fold
(fun k (n, l) node ->
match node with
| Some x ->
Some x
| Some x -> Some x
| None ->
if n.cpu >= j.cpu && n.ram >= j.ram then Some (k, n, l) else None
)
if n.cpu >= j.cpu && n.ram >= j.ram then Some (k, n, l) else None)
nodes None
with
| None ->
CCDeque.push_front jobs j ;
CCDeque.push_front jobs j;
Logs_lwt.debug (fun m -> m "No free node")
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
in
SHashtbl.replace nodes k
({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ;
({ n with cpu = n.cpu - j.cpu; ram = n.ram - j.ram }, j :: l);
Logs_lwt.debug (fun m ->
m "Send computation %i,%i to %s on %i" (fst j.id) (snd j.id) n.addr
n.port )
n.port)
>>= fun () ->
Lwt_io.with_connection sockaddr (fun (_ic, oc) ->
Lwt_io.write_line oc
(Serialization_j.string_of_query (`COMPUTATION j)) )
(Serialization_j.string_of_query (`COMPUTATION j)))
(* TODO: check return value *)
>>= Lwt.pause
>>= launch_job )
else Logs_lwt.debug (fun m -> m "No computation")
let server_handler pass port sockaddr (ic, oc) =
Lwt_io.read_line ic
>>= fun json ->
Lwt_io.read_line ic >>= fun json ->
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `RESULT result ->
if pass = result.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive result: %s"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"}) )
>>= fun () ->
Lwt.return
(Lwt.async (fun () ->
Lwt.join
[ Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
; Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
; Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)) )
; end_job result.id sockaddr >>= Lwt.pause >>= launch_job ]
))
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" result.pass)
| `JOB submission ->
if pass = submission.pass then (
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive submission: %s"
(Serialization_j.string_of_submission submission) )
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{ id= (!jobs_id, i)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i
; Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id ]
; script= submission.script
; time= submission.time
; pass
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs computation
done ;
incr jobs_id ;
if empty then launch_job () else Lwt.return_unit )
else
match query with
| `RESULT result ->
if pass = result.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive result: %s"
(Serialization_j.string_of_result
{
result with
stdout = "<stdout>";
stderr = "<stderr>";
}))
>>= fun () ->
Lwt.return
(Lwt.async (fun () ->
Lwt.join
[
( if String.length result.stdout > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
else Lwt.return_unit );
( if String.length result.stderr > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
else Lwt.return_unit );
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)));
end_job result.id sockaddr >>= Lwt.pause >>= launch_job;
]))
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" result.pass)
| `JOB submission ->
if pass = submission.pass then (
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive submission: %s"
(Serialization_j.string_of_submission submission))
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{
id = (!jobs_id, i);
env =
[
Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i;
Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id;
];
script = submission.script;
time = submission.time;
pass;
port;
cpu = submission.cpu;
ram = submission.ram;
}
in
CCDeque.push_back jobs computation
done;
incr jobs_id;
if empty then launch_job () else Lwt.return_unit )
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" submission.pass)
| _ ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
(Serialization_j.string_of_answer (`Error "Unwanted command"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" submission.pass)
| _ ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Unwanted command"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
<&> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
| Result.Error e ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer
......@@ -158,7 +162,7 @@ let server_handler pass port sockaddr (ic, oc) =
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s"
(Printexc.to_string e) )
(Printexc.to_string e))
let stop_server resolver server _ = Lwt.wakeup_later resolver server
......@@ -166,7 +170,7 @@ let cmd config () =
let conf =
Serialization_j.master_conf_of_string (CCIO.with_in config CCIO.read_all)
in
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes ;
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes;
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "master at %i with pass %s" conf.port conf.pass)
>>= fun () ->
......@@ -176,7 +180,6 @@ let cmd config () =
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
promise >>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down server")
......@@ -4,21 +4,15 @@ open Serialization_t
let max_std = 5120000 (* 5Mo *)
let process_status_to_ret_code = function
| Unix.WEXITED c ->
`WEXITED c
| Unix.WSIGNALED s ->
`WSIGNALED s
| Unix.WSTOPPED s ->
`WSTOPPED s
| Unix.WEXITED c -> `WEXITED c
| Unix.WSIGNALED s -> `WSIGNALED s
| Unix.WSTOPPED s -> `WSTOPPED s
let run_computation (computation : computation) =
Lwt_io.with_temp_file ~prefix:"ocluster" ~perm:0o700 (fun (name, oc) ->
Lwt_io.write_line oc computation.script
>>= fun () ->
Lwt_io.close oc
>>= fun () ->
Lwt_unix.sleep 2.
>>= fun () ->
Lwt_io.write_line oc computation.script >>= fun () ->
Lwt_io.close oc >>= fun () ->
Lwt_unix.sleep 2. >>= fun () ->
let read_stderr, write_stderr = Lwt_unix.pipe_in ()
and read_stdout, write_stdout = Lwt_unix.pipe_in () in
let%lwt ret_code =
......@@ -29,98 +23,93 @@ let run_computation (computation : computation) =
and stdout = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
and stderr = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Lwt.return
{ id= computation.id
; stdout=
{
id = computation.id;
stdout =
( if String.length stdout > max_std then
CCString.drop (String.length stdout - max_std) stdout
else stdout )
; stderr=
else stdout );
stderr =
( if String.length stderr > max_std then
CCString.drop (String.length stderr - max_std) stderr
else stderr )
; ret_code= process_status_to_ret_code ret_code
; pass= computation.pass } )
else stderr );
ret_code = process_status_to_ret_code ret_code;
pass = computation.pass;
})
let rec send_result sockaddr result =
if%lwt
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (`RESULT result))
>>= fun () ->
Lwt_io.flush oc
>>= fun () ->
Lwt_io.flush oc >>= fun () ->
try%lwt
Lwt_io.read_line ic
>>= fun json ->
Lwt_io.read_line ic >>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Result %i,%i successfully sent." (fst result.id)
(snd result.id) )
>>= fun () -> Lwt.return false
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst result.id) (snd result.id) s )
>>= fun () -> Lwt.return true )
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Result %i,%i successfully sent." (fst result.id)
(snd result.id))
>>= fun () -> Lwt.return false
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst result.id) (snd result.id) s)
>>= fun () -> Lwt.return true )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
(Printexc.to_string e) )
(Printexc.to_string e))
>>= fun () -> Lwt.return true
with End_of_file ->
Logs_lwt.err (fun m -> m "Error during the read of the answer: EOF")
>>= fun () -> Lwt.return true )
>>= fun () -> Lwt.return true)
then send_result sockaddr result
let handle_computation sockaddr computation () =
run_computation computation
>>= fun result ->
run_computation computation >>= fun result ->
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) ->
Unix.ADDR_INET (a, computation.port)
| s ->
s
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, computation.port)
| s -> s
in
send_result sockaddr result
<&> Logs_lwt.debug (fun m ->
m "End computation %i,%i" (fst computation.id) (snd computation.id)
)
m "End computation %i,%i" (fst computation.id) (snd computation.id))
let stat oc = Lwt_io.write_line oc (Serialization_j.string_of_stat `OK)
let server_handler pass sockaddr (ic, oc) =
Lwt_io.read_line ic
>>= fun json ->
Lwt_io.read_line ic >>= fun json ->
( match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive computation: %s"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"}) )
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" computation.pass)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
| _ ->
Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive computation: %s"
(Serialization_j.string_of_computation
{ computation with env = []; script = "<script>" }))
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m ->
m "Wrong password: %s" computation.pass)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
| Result.Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s"
(Printexc.to_string e) ) )
(Printexc.to_string e)) )
>>= fun () -> Lwt_io.flush Lwt_io.stderr
let stop_server resolver server _ = Lwt.wakeup_later resolver server
......@@ -132,15 +121,13 @@ let cmd config () =
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "Node at port %i with pass %s" conf.port conf.pass)
>>= fun () ->
Lwt_io.flush Lwt_io.stderr
>>= fun () ->
Lwt_io.flush Lwt_io.stderr >>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
promise >>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down node server")
......@@ -3,10 +3,11 @@ open Cmdliner
let lwt_reporter () =
let buf_fmt ~like =
let b = Buffer.create 512 in
( Fmt.with_buffer ~like b
, fun () ->
( Fmt.with_buffer ~like b,
fun () ->
let m = Buffer.contents b in
Buffer.reset b ; m )
Buffer.reset b;
m )
in
let app, app_flush = buf_fmt ~like:Fmt.stdout in
let dst, dst_flush = buf_fmt ~like:Fmt.stderr in
......@@ -15,23 +16,24 @@ let lwt_reporter () =
let k () =
let write () =
match level with
| Logs.App ->
Lwt_io.write Lwt_io.stdout (app_flush ())
| _ ->
Lwt_io.write Lwt_io.stderr (dst_flush ())
| Logs.App -> Lwt_io.write Lwt_io.stdout (app_flush ())
| _ -> Lwt_io.write Lwt_io.stderr (dst_flush ())
in
let unblock () = over () ; Lwt.return_unit in
Lwt.async (fun () -> Lwt.finalize write unblock) ;
let unblock () =
over ();
Lwt.return_unit
in
Lwt.async (fun () -> Lwt.finalize write unblock);
k ()
in
reporter.Logs.report src level ~over:(fun () -> ()) k msgf
in
{Logs.report}
{ Logs.report }
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer () ;
Logs.set_level level ;
Logs.set_reporter (lwt_reporter ()) ;
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (lwt_reporter ());
()
let setup_log =
......@@ -45,13 +47,14 @@ let node_cmd =
let doc = "Start un computational node" in
let exits = Term.default_exits in
let man =
[ `S Manpage.s_description
; `P
"Start un computational node on the given port with the given password"
[
`S Manpage.s_description;
`P
"Start un computational node on the given port with the given password";
]
in
( Term.(const Lwt_main.run $ (const Node.cmd $ config $ setup_log))
, Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
( Term.(const Lwt_main.run $ (const Node.cmd $ config $ setup_log)),
Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let master_cmd =
let config =
......@@ -61,32 +64,35 @@ let master_cmd =
let doc = "Start un master server" in
let exits = Term.default_exits in
let man =
[ `S Manpage.s_description
; `P "Start un master server on the given port with the given password" ]
[
`S Manpage.s_description;
`P "Start un master server on the given port with the given password";
]
in
( Term.(const Lwt_main.run $ (const Master.cmd $ config $ setup_log))
, Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
( Term.(const Lwt_main.run $ (const Master.cmd $ config $ setup_log)),
Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let client_cmd =
let cpu =
let doc = "Number of cpu needed" in
Arg.(value & opt int 1 & info ["c"; "cpu"] ~docv:"CPU" ~doc)
Arg.(value & opt int 1 & info [ "c"; "cpu" ] ~docv:"CPU" ~doc)
in
let ram =
let doc = "Quantity of RAM needed" in
Arg.(value & opt int 1024 & info ["r"; "ram"] ~docv:"RAM" ~doc)
Arg.(value & opt int 1024 & info [ "r"; "ram" ] ~docv:"RAM" ~doc)
in