Commit 72216538 authored by Fardale's avatar Fardale
Browse files

ocamlformat change

parent 7d59031e
(lang dune 1.11)
(using fmt 1.1)
(using fmt 1.2)
(name ocluster)
......@@ -3,22 +3,24 @@ open Serialization_t
let cmd cpu ram time iteration port script_file pass addr args () =
let script = CCIO.with_in script_file CCIO.read_all in
let submission = { script; args; time; iteration; cpu; ram; pass } in
let submission = {script; args; time; iteration; cpu; ram; pass} in
Lwt_io.with_connection
(Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
(fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (`JOB submission))
>>= fun () ->
Lwt_io.read ic >>= fun json ->
Lwt_io.read ic
>>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok -> Logs_lwt.info (fun m -> m "Computation successfully sent.")
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s" s) )
match answer with
| `Ok ->
Logs_lwt.info (fun m -> m "Computation successfully sent.")
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s" s) )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
......
......@@ -12,13 +12,18 @@ let jobs : computation CCDeque.t = CCDeque.create ()
let jobs_id = ref 0
let string_of_ret_code = function
| `WEXITED i -> Printf.sprintf "WEXITED %i" i
| `WSIGNALED s -> Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s -> Printf.sprintf "WSTOPPED %i" s
| `WEXITED i ->
Printf.sprintf "WEXITED %i" i
| `WSIGNALED s ->
Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s ->
Printf.sprintf "WSTOPPED %i" s
let string_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (ip, _) -> Unix.string_of_inet_addr ip
| Unix.ADDR_UNIX s ->
s
| Unix.ADDR_INET (ip, _) ->
Unix.string_of_inet_addr ip
let end_job id sockaddr =
let n, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in
......@@ -36,7 +41,7 @@ let end_job id sockaddr =
SHashtbl.replace nodes
(string_of_sockaddr sockaddr)
( {n with cpu= n.cpu + j.cpu; ram= n.ram + j.ram}
, CCList.remove_one ~eq:(fun c1 c2 -> c1.id = c2.id) j l );
, CCList.remove_one ~eq:(fun c1 c2 -> c1.id = c2.id) j l ) ;
Lwt.return_unit
let rec launch_job () =
......@@ -46,20 +51,21 @@ let rec launch_job () =
SHashtbl.fold
(fun k (n, l) node ->
match node with
| Some x -> Some x
| Some x ->
Some x
| None ->
if n.cpu >= j.cpu && n.ram >= j.ram then Some (k, n, l) else None)
nodes None
with
| None ->
CCDeque.push_front jobs j;
CCDeque.push_front jobs j ;
Logs_lwt.debug (fun m -> m "No free node")
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
in
SHashtbl.replace nodes k
({ n with cpu = n.cpu - j.cpu; ram = n.ram - j.ram }, j :: l);
({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ;
Logs_lwt.debug (fun m ->
m "Send computation %i,%i to %s on %i" (fst j.id) (snd j.id) n.addr
n.port)
......@@ -73,96 +79,87 @@ let rec launch_job () =
else Logs_lwt.debug (fun m -> m "No computation")
let server_handler pass port sockaddr (ic, oc) =
Lwt_io.read_line ic >>= fun json ->
Lwt_io.read_line ic
>>= fun json ->
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `RESULT result ->
if pass = result.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive result: %s"
(Serialization_j.string_of_result
{
result with
stdout = "<stdout>";
stderr = "<stderr>";
}))
>>= fun () ->
Lwt.return
(Lwt.async (fun () ->
Lwt.join
[
( if String.length result.stdout > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
else Lwt.return_unit );
( if String.length result.stderr > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
else Lwt.return_unit );
match query with
| `RESULT result ->
if pass = result.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive result: %s"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"}))
>>= fun () ->
Lwt.return
(Lwt.async (fun () ->
Lwt.join
[ ( if String.length result.stdout > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)));
end_job result.id sockaddr >>= Lwt.pause >>= launch_job;
]))
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" result.pass)
| `JOB submission ->
if pass = submission.pass then (
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive submission: %s"
(Serialization_j.string_of_submission submission))
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{
id = (!jobs_id, i);
env =
[
Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i;
Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id;
];
script = submission.script;
args = submission.args;
time = submission.time;
pass;
port;
cpu = submission.cpu;
ram = submission.ram;
}
in
CCDeque.push_back jobs computation
done;
incr jobs_id;
if empty then launch_job () else Lwt.return_unit )
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" submission.pass)
| _ ->
(fun oc -> Lwt_io.write oc result.stdout)
else Lwt.return_unit )
; ( if String.length result.stderr > 0 then
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
else Lwt.return_unit )
; Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf
"Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)))
; end_job result.id sockaddr >>= Lwt.pause >>= launch_job ]))
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Unwanted command"))
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" result.pass)
| `JOB submission ->
if pass = submission.pass then (
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive submission: %s"
(Serialization_j.string_of_submission submission))
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{ id= (!jobs_id, i)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i
; Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id ]
; script= submission.script
; args= submission.args
; time= submission.time
; pass
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs computation
done ;
incr jobs_id ;
if empty then launch_job () else Lwt.return_unit )
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" submission.pass)
| _ ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Unwanted command"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
| Result.Error e ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer
......@@ -181,7 +178,7 @@ let cmd config () =
let conf =
Serialization_j.master_conf_of_string (CCIO.with_in config CCIO.read_all)
in
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes;
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes ;
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "master at %i with pass %s" conf.port conf.pass)
>>= fun () ->
......@@ -191,6 +188,7 @@ let cmd config () =
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise >>= fun server ->
promise
>>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down server")
......@@ -4,15 +4,21 @@ open Serialization_t
let max_std = 5120000 (* 5Mo *)
let process_status_to_ret_code = function
| Unix.WEXITED c -> `WEXITED c
| Unix.WSIGNALED s -> `WSIGNALED s
| Unix.WSTOPPED s -> `WSTOPPED s
| Unix.WEXITED c ->
`WEXITED c
| Unix.WSIGNALED s ->
`WSIGNALED s
| Unix.WSTOPPED s ->
`WSTOPPED s
let run_computation (computation : computation) =
Lwt_io.with_temp_file ~prefix:"ocluster" ~perm:0o700 (fun (name, oc) ->
Lwt_io.write_line oc computation.script >>= fun () ->
Lwt_io.close oc >>= fun () ->
Lwt_unix.sleep 2. >>= fun () ->
Lwt_io.write_line oc computation.script
>>= fun () ->
Lwt_io.close oc
>>= fun () ->
Lwt_unix.sleep 2.
>>= fun () ->
let read_stderr, write_stderr = Lwt_unix.pipe_in ()
and read_stdout, write_stdout = Lwt_unix.pipe_in () in
let%lwt ret_code =
......@@ -23,43 +29,43 @@ let run_computation (computation : computation) =
and stdout = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
and stderr = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Lwt.return
{
id = computation.id;
stdout =
{ id= computation.id
; stdout=
( if String.length stdout > max_std then
CCString.drop (String.length stdout - max_std) stdout
else stdout );
stderr =
else stdout )
; stderr=
( if String.length stderr > max_std then
CCString.drop (String.length stderr - max_std) stderr
else stderr );
ret_code = process_status_to_ret_code ret_code;
pass = computation.pass;
})
else stderr )
; ret_code= process_status_to_ret_code ret_code
; pass= computation.pass })
let rec send_result sockaddr result =
if%lwt
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (`RESULT result))
>>= fun () ->
Lwt_io.flush oc >>= fun () ->
Lwt_io.flush oc
>>= fun () ->
try%lwt
Lwt_io.read_line ic >>= fun json ->
Lwt_io.read_line ic
>>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Result %i,%i successfully sent." (fst result.id)
(snd result.id))
>>= fun () -> Lwt.return false
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst result.id) (snd result.id) s)
>>= fun () -> Lwt.return true )
match answer with
| `Ok ->
Logs_lwt.debug (fun m ->
m "Result %i,%i successfully sent." (fst result.id)
(snd result.id))
>>= fun () -> Lwt.return false
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the result %i,%i: %s"
(fst result.id) (snd result.id) s)
>>= fun () -> Lwt.return true )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
......@@ -71,11 +77,14 @@ let rec send_result sockaddr result =
then send_result sockaddr result
let handle_computation sockaddr computation () =
run_computation computation >>= fun result ->
run_computation computation
>>= fun result ->
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, computation.port)
| s -> s
| Unix.ADDR_INET (a, _) ->
Unix.ADDR_INET (a, computation.port)
| s ->
s
in
send_result sockaddr result
<&> Logs_lwt.debug (fun m ->
......@@ -84,28 +93,29 @@ let handle_computation sockaddr computation () =
let stat oc = Lwt_io.write_line oc (Serialization_j.string_of_stat `OK)
let server_handler pass sockaddr (ic, oc) =
Lwt_io.read_line ic >>= fun json ->
Lwt_io.read_line ic
>>= fun json ->
( match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive computation: %s"
(Serialization_j.string_of_computation
{ computation with env = []; script = "<script>" }))
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m ->
m "Wrong password: %s" computation.pass)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive computation: %s"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"}))
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Wrong password"))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.warn (fun m -> m "Wrong password: %s" computation.pass)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
| _ ->
Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
| Result.Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s"
......@@ -121,13 +131,15 @@ let cmd config () =
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "Node at port %i with pass %s" conf.port conf.pass)
>>= fun () ->
Lwt_io.flush Lwt_io.stderr >>= fun () ->
Lwt_io.flush Lwt_io.stderr
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise >>= fun server ->
promise
>>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down node server")
......@@ -3,11 +3,10 @@ open Cmdliner
let lwt_reporter () =
let buf_fmt ~like =
let b = Buffer.create 512 in
( Fmt.with_buffer ~like b,
fun () ->
( Fmt.with_buffer ~like b
, fun () ->
let m = Buffer.contents b in
Buffer.reset b;
m )
Buffer.reset b ; m )
in
let app, app_flush = buf_fmt ~like:Fmt.stdout in
let dst, dst_flush = buf_fmt ~like:Fmt.stderr in
......@@ -16,24 +15,23 @@ let lwt_reporter () =
let k () =
let write () =
match level with
| Logs.App -> Lwt_io.write Lwt_io.stdout (app_flush ())
| _ -> Lwt_io.write Lwt_io.stderr (dst_flush ())
| Logs.App ->
Lwt_io.write Lwt_io.stdout (app_flush ())
| _ ->
Lwt_io.write Lwt_io.stderr (dst_flush ())
in
let unblock () =
over ();
Lwt.return_unit
in
Lwt.async (fun () -> Lwt.finalize write unblock);
let unblock () = over () ; Lwt.return_unit in
Lwt.async (fun () -> Lwt.finalize write unblock) ;
k ()
in
reporter.Logs.report src level ~over:(fun () -> ()) k msgf
in
{ Logs.report }
{Logs.report}
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer ();
Logs.set_level level;
Logs.set_reporter (lwt_reporter ());
Fmt_tty.setup_std_outputs ?style_renderer () ;
Logs.set_level level ;
Logs.set_reporter (lwt_reporter ()) ;
()
let setup_log =
......@@ -47,14 +45,13 @@ let node_cmd =
let doc = "Start un computational node" in
let exits = Term.default_exits in
let man =
[
`S Manpage.s_description;
`P
"Start un computational node on the given port with the given password";
[ `S Manpage.s_description
; `P
"Start un computational node on the given port with the given password"
]
in
( Term.(const Lwt_main.run $ (const Node.cmd $ config $ setup_log)),
Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
( Term.(const Lwt_main.run $ (const Node.cmd $ config $ setup_log))
, Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let master_cmd =
let config =
......@@ -64,35 +61,32 @@ let master_cmd =
let doc = "Start un master server" in
let exits = Term.default_exits in
let man =
[
`S Manpage.s_description;
`P "Start un master server on the given port with the given password";
]
[ `S Manpage.s_description
; `P "Start un master server on the given port with the given password" ]
in
( Term.(const Lwt_main.run $ (const Master.cmd $ config $ setup_log)),
Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
( Term.(const Lwt_main.run $ (const Master.cmd $ config $ setup_log))
, Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let client_cmd =
let cpu =
let doc = "Number of cpu needed" in
Arg.(value & opt int 1 & info [ "c"; "cpu" ] ~docv:"CPU" ~doc)
Arg.(value & opt int 1 & info ["c"; "cpu"] ~docv:"CPU" ~doc)
in
let ram =
let doc = "Quantity of RAM needed" in
Arg.(value & opt int 1024 & info [ "r"; "ram" ] ~docv:"RAM" ~doc)
Arg.(value & opt int 1024 & info ["r"; "ram"] ~docv:"RAM" ~doc)
in
let time =
let doc = "Time limit" in
Arg.(
value & opt (some float) None & info [ "t"; "time" ] ~docv:"TIME" ~doc)
Arg.(value & opt (some float) None & info ["t"; "time"] ~docv:"TIME" ~doc)
in
let iteration =
let doc = "Number of iteration" in
Arg.(value & opt int 1 & info [ "i"; "iteration" ] ~docv:"N" ~doc)
Arg.(value & opt int 1 & info ["i"; "iteration"] ~docv:"N" ~doc)
in
let port =
let doc = "port on which client listen" in
Arg.(value & opt int 4242 & info [ "p"; "port" ] ~docv:"PORT" ~doc)
Arg.(value & opt int 4242 & info ["p"; "port"] ~docv:"PORT" ~doc)
in
let script =
let doc = "Ocluster script" in
......@@ -112,25 +106,23 @@ let client_cmd =
in
let doc = "Client to use the cluster" in
let exits = Term.default_exits in
let man = [ `S Manpage.s_description; `P "Client to use the cluster" ] in
let man = [`S Manpage.s_description; `P "Client to use the cluster"] in
( Term.(
const Lwt_main.run
$ ( const Client.cmd $ cpu $ ram $ time $ iteration $ port $ script $ pass
$ addr $ args $ setup_log )),
Term.info "client" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
$ addr $ args $ setup_log ))
, Term.info "client" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let default_cmd =
let doc = "Use a pool of computer as a cluster." in
let man =
[
`S Manpage.s_bugs;
`P "Email bug reports to <fardale+ocluster at crans.org>.";
]