Commit 56545bf7 authored by Fardale's avatar Fardale

Neoformat

parent b26ee1ad
Pipeline #881 failed with stages
open Lwt.Infix
type computation = {
id : int;
script : string;
time : float option;
pass : string;
}
[@@deriving show];;
type computation = {id: int; script: string; time: float option; pass: string}
[@@deriving show]
type query = COMPUTATION of computation | STAT
[@@deriving show];;
type query = COMPUTATION of computation | STAT [@@deriving show]
let show_process_status fmt = function
| Unix.WEXITED c -> Format.fprintf fmt "WEXITED %i" c
| Unix.WSIGNALED s -> Format.fprintf fmt "WSIGNALED %i" s
| Unix.WSTOPPED s -> Format.fprintf fmt "WSTOPPED %i" s
;;
type answer = {
id : int;
stdout : string option;
stderr : string option;
ret_code : Unix.process_status; [@printer show_process_status]
pass : string;
}
[@@deriving show];;
type stat = OK
[@@deriving show];;
let run_computation (computation:computation) =
let read_stderr, write_stderr = Lwt_unix.pipe_in () and read_stdout, write_stdout = Lwt_unix.pipe_in () in
Lwt_process.exec ?timeout:computation.time ~stdin:`Close ~stdout:(`FD_move write_stdout)
~stderr:(`FD_move write_stderr) (Lwt_process.shell computation.script)
type answer =
{ id: int
; stdout: string option
; stderr: string option
; ret_code: Unix.process_status [@printer show_process_status]
; pass: string }
[@@deriving show]
type stat = OK [@@deriving show]
let run_computation (computation : computation) =
let read_stderr, write_stderr = Lwt_unix.pipe_in ()
and read_stdout, write_stdout = Lwt_unix.pipe_in () in
Lwt_process.exec ?timeout:computation.time ~stdin:`Close
~stdout:(`FD_move write_stdout) ~stderr:(`FD_move write_stderr)
(Lwt_process.shell computation.script)
>>= fun ret_code ->
let%lwt stdout = Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout) and
stderr = Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Lwt.return { id = computation.id; stdout; stderr; ret_code; pass = computation.pass }
let%lwt stdout =
Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
and stderr =
Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr)
in
Lwt.return
{id= computation.id; stdout; stderr; ret_code; pass= computation.pass}
let getinetaddrbyname name =
Lwt_unix.gethostbyname name
>|= fun entry -> entry.Lwt_unix.h_addr_list.(0)
Lwt_unix.gethostbyname name >|= fun entry -> entry.Lwt_unix.h_addr_list.(0)
let stat oc =
Lwt_io.write_value oc OK
let stat oc = Lwt_io.write_value oc OK
let server_handler pass port sockaddr (ic, oc) =
let sockaddr = match sockaddr with
| Unix.ADDR_INET (a,_) -> Unix.ADDR_INET (a,port)
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, port)
| s -> s
in
begin match%lwt Lwt_result.catch (Lwt_io.read_value ic) with
| Result.Ok query -> begin
match query with
| COMPUTATION computation ->
if pass = computation.pass then begin
Lwt_io.write_value oc true
<&> Lwt_log.debug (Printf.sprintf "Receive computation: %s" (show_computation computation))
>>= fun () -> run_computation computation
>>= fun answer -> Lwt_io.with_connection sockaddr
(fun (ic, oc) -> Lwt_io.write_value oc answer)
>>= fun () -> Lwt_log.debug (Printf.sprintf "End computation %i" computation.id)
end
else Lwt_io.write_value oc false <&> Lwt_log.warning "Wrong password"
| STAT -> stat oc
<&> Lwt_log.debug "Receive a stat command"
end
| Result.Error e ->
Lwt_log.error (Printf.sprintf "Error during the reception of the computation: %s" (Printexc.to_string e))
<&> Lwt_io.write_value oc false
end >>= fun () ->
Lwt_unix.sleep 2.
;;
( match%lwt Lwt_result.catch (Lwt_io.read_value ic) with
| Result.Ok query -> (
match query with
| COMPUTATION computation ->
if pass = computation.pass then
Lwt_io.write_value oc true
<&> Lwt_log.debug
(Printf.sprintf "Receive computation: %s"
(show_computation computation))
>>= fun () ->
run_computation computation
>>= fun answer ->
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_value oc answer )
>>= fun () ->
Lwt_log.debug (Printf.sprintf "End computation %i" computation.id)
else Lwt_io.write_value oc false <&> Lwt_log.warning "Wrong password"
| STAT -> stat oc <&> Lwt_log.debug "Receive a stat command" )
| Result.Error e ->
Lwt_log.error
(Printf.sprintf "Error during the reception of the computation: %s"
(Printexc.to_string e))
<&> Lwt_io.write_value oc false )
>>= fun () -> Lwt_unix.sleep 2.
let stop_server resolver server _ = Lwt.wakeup_later resolver server
......@@ -79,16 +77,26 @@ let node pass port =
let promise, resolver = Lwt.task () in
let template = "$(date).$(milliseconds) $(name)[$(pid)]: $(message)" in
Lwt_log.file ~template ~perm:0o600 ~file_name:"test.log" ()
>|= (fun x -> Lwt_log.default := Lwt_log.broadcast [Lwt_log.channel ~template ~close_mode:`Keep ~channel:Lwt_io.stderr (); x])
>>= fun () -> Lwt_io.establish_server_with_client_address (Unix.ADDR_INET (Unix.inet_addr_any, port)) (server_handler pass port)
>>= fun server -> let _ = Lwt_unix.on_signal 15 (stop_server resolver server) and
_ = Lwt_unix.on_signal 2 (stop_server resolver server) and
_ = Lwt_unix.on_signal 15 (stop_server resolver server) in promise
>>= fun server -> Lwt_io.shutdown_server server <&> Lwt_log.info "Shuting down server"
>|= (fun x ->
Lwt_log.default :=
Lwt_log.broadcast
[ Lwt_log.channel ~template ~close_mode:`Keep
~channel:Lwt_io.stderr ()
; x ] )
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, port))
(server_handler pass port)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server)
and _ = Lwt_unix.on_signal 15 (stop_server resolver server) in
promise
>>= fun server ->
Lwt_io.shutdown_server server <&> Lwt_log.info "Shuting down server"
open Cmdliner
let node_cmd =
let port =
let doc = "port on which client listen" in
......@@ -96,30 +104,29 @@ let node_cmd =
in
let pass =
let doc = "Password of the node" in
Arg.(required & pos 0 (some string) None & info [] ~docv:"PASS"
~doc)
Arg.(required & pos 0 (some string) None & info [] ~docv:"PASS" ~doc)
in
let doc = "Start un compatational node" in
let exits = Term.default_exits in
let man =
[`S Manpage.s_description;
`P "Start un compatational node on port $(port) with the password $(pass)";]
[ `S Manpage.s_description
; `P
"Start un compatational node on port $(port) with the password $(pass)"
]
in
Term.(const Lwt_main.run $ (const node $ pass $ port)),
Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man
( Term.(const Lwt_main.run $ (const node $ pass $ port))
, Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let default_cmd =
let doc = "Transforme un pool of computer into a cluster." in
let man = [
`S Manpage.s_bugs;
`P "Email bug reports to <fardale+ocluster at crans.org>." ]
let man =
[ `S Manpage.s_bugs
; `P "Email bug reports to <fardale+ocluster at crans.org>." ]
in
let sdocs = Manpage.s_common_options in
let exits = Term.default_exits in
Term.(ret (const (fun _ -> `Help (`Pager, None)) $ const ())),
Term.info "ocluster" ~version:"v0.0.1" ~doc ~sdocs ~exits ~man
( Term.(ret (const (fun _ -> `Help (`Pager, None)) $ const ()))
, Term.info "ocluster" ~version:"v0.0.1" ~doc ~sdocs ~exits ~man )
let () = Term.(exit @@ eval_choice default_cmd [node_cmd])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment