Commit 897a694e authored by Fardale's avatar Fardale Committed by Fardale

Initial commit

parents
(jbuild_version 1)
(executable (
(name ocluster)
(libraries (lwt.unix lwt_log lwt unix ppx_deriving.show))
(preprocess (pps (ppx_deriving.show lwt_ppx)))
(modules (ocluster))))
open Lwt.Infix
type computation = {
id : int;
script : string;
time : float option;
pass : string;
}
[@@deriving show];;
type query = COMPUTATION of computation | STAT
[@@deriving show];;
let show_process_status fmt = function
| Unix.WEXITED c -> Format.fprintf fmt "WEXITED %i" c
| Unix.WSIGNALED s -> Format.fprintf fmt "WSIGNALED %i" s
| Unix.WSTOPPED s -> Format.fprintf fmt "WSTOPPED %i" s
;;
type answer = {
id : int;
stdout : string;
stderr : string;
ret_code : Unix.process_status; [@printer show_process_status]
pass : string;
}
[@@deriving show];;
type stat = OK
[@@deriving show];;
let run_computation (computation:computation) =
let read_stderr, write_stderr = Lwt_unix.pipe_in () and read_stdout, write_stdout = Lwt_unix.pipe_in () in
Lwt_process.exec ?timeout:computation.time ~stdin:`Close ~stdout:(`FD_move write_stdout)
~stderr:(`FD_move write_stderr) (Lwt_process.shell computation.script)
>>= fun ret_code ->
let%lwt stdout = Lwt_io.read_line (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout) and
stderr = Lwt_io.read_line (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Lwt.return { id = computation.id; stdout; stderr; ret_code; pass = computation.pass }
let getinetaddrbyname name =
Lwt_unix.gethostbyname name
>|= fun entry -> entry.Lwt_unix.h_addr_list.(0)
let stat oc =
Lwt_io.write_value oc OK
let server_handler port sockaddr (ic, oc) =
let sockaddr = match sockaddr with
| Unix.ADDR_INET (a,_) -> Unix.ADDR_INET (a,port)
| s -> s
in
begin match%lwt Lwt_result.catch (Lwt_io.read_value ic) with
| Result.Ok query -> begin
match query with
| COMPUTATION computation ->
Lwt_io.write_value oc true
<&> Lwt_log.debug (Printf.sprintf "Receive computation: %s" (show_computation computation))
>>= fun () -> run_computation computation
>>= fun answer -> Lwt_io.with_connection sockaddr
(fun (ic, oc) -> Lwt_io.write_value oc answer)
>>= fun _ -> Lwt_log.debug (Printf.sprintf "End computation %i" computation.id)
| STAT -> stat oc
<&> Lwt_log.debug "Receive a stat command"
end
| Result.Error e ->
Lwt_log.error (Printf.sprintf "Error during the reception of the computation: %s" (Printexc.to_string e))
<&> Lwt_io.write_value oc false
end >>= fun () ->
Lwt_unix.sleep 2.
;;
let main port =
let template = "$(date).$(milliseconds) $(name)[$(pid)]: $(message)" in
Lwt_log.file ~template ~perm:0o600 ~file_name:"test.log" ()
>>= fun x -> Lwt.return (Lwt_log.default := Lwt_log.broadcast [Lwt_log.channel ~template ~close_mode:`Keep ~channel:Lwt_io.stderr (); x])
>>= fun () -> Lwt_io.establish_server_with_client_address (Unix.ADDR_INET (Unix.inet_addr_any, port)) (server_handler port)
>>= fun server -> Lwt_unix.sleep 200.
let _ =
Lwt_main.run (main 2121)
(*Lwt_io.with_connection (Lwt_unix.ADDR_INET (Unix.inet_addr_of_string "127.0.0.1", 2121)) (fun (ic,oc) -> Lwt_io.write_value oc (COMPUTATION {id = 0; script = "#echo penis\necho plop\nls"; time = None; pass = "wesh"}));;*)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment