ocluster.ml 4.66 KB
Newer Older
Fardale's avatar
Fardale committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
open Lwt.Infix

type computation = {
    id : int;
    script : string;
    time : float option;
    pass : string;
}
[@@deriving show];;

type query = COMPUTATION of computation | STAT
[@@deriving show];;

let show_process_status fmt = function
  | Unix.WEXITED c -> Format.fprintf fmt "WEXITED %i" c
  | Unix.WSIGNALED s -> Format.fprintf fmt "WSIGNALED %i" s
  | Unix.WSTOPPED s -> Format.fprintf fmt "WSTOPPED %i" s
;;

type answer = {
    id : int;
Fardale's avatar
Fardale committed
22
23
    stdout : string option;
    stderr : string option;
Fardale's avatar
Fardale committed
24
25
26
27
28
29
30
31
32
33
34
35
36
    ret_code : Unix.process_status; [@printer show_process_status]
    pass : string;
}
[@@deriving show];;

type stat = OK
[@@deriving show];;

let run_computation (computation:computation) =
  let read_stderr, write_stderr = Lwt_unix.pipe_in () and read_stdout, write_stdout = Lwt_unix.pipe_in () in
  Lwt_process.exec ?timeout:computation.time ~stdin:`Close ~stdout:(`FD_move write_stdout)
    ~stderr:(`FD_move write_stderr) (Lwt_process.shell computation.script)
  >>= fun ret_code ->
Fardale's avatar
Fardale committed
37
38
  let%lwt stdout = Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout) and
    stderr = Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Fardale's avatar
Fardale committed
39
40
41
42
43
44
45
46
47
  Lwt.return { id = computation.id; stdout; stderr; ret_code; pass = computation.pass }

let getinetaddrbyname name =
  Lwt_unix.gethostbyname name
  >|= fun entry -> entry.Lwt_unix.h_addr_list.(0)

let stat oc =
  Lwt_io.write_value oc OK

Fardale's avatar
Fardale committed
48
let server_handler pass port sockaddr (ic, oc) =
Fardale's avatar
Fardale committed
49
50
51
52
53
54
55
56
  let sockaddr = match sockaddr with
    | Unix.ADDR_INET (a,_) -> Unix.ADDR_INET (a,port)
    | s -> s
  in
  begin match%lwt Lwt_result.catch (Lwt_io.read_value ic) with
    | Result.Ok query -> begin
        match query with
        | COMPUTATION computation ->
Fardale's avatar
Fardale committed
57
58
59
60
61
62
63
64
65
          if pass = computation.pass then begin
            Lwt_io.write_value oc true
            <&> Lwt_log.debug (Printf.sprintf "Receive computation: %s" (show_computation computation))
            >>= fun () ->  run_computation computation
            >>= fun answer -> Lwt_io.with_connection sockaddr
              (fun (ic, oc) -> Lwt_io.write_value oc answer)
            >>= fun () -> Lwt_log.debug (Printf.sprintf "End computation %i" computation.id)
          end
          else Lwt_io.write_value oc false <&> Lwt_log.warning "Wrong password"
Fardale's avatar
Fardale committed
66
67
68
69
70
71
72
73
74
75
        | STAT -> stat oc
                    <&> Lwt_log.debug "Receive a stat command"
      end
    | Result.Error e ->
      Lwt_log.error (Printf.sprintf "Error during the reception of the computation: %s" (Printexc.to_string e))
      <&> Lwt_io.write_value oc false
  end >>= fun () ->
  Lwt_unix.sleep 2.
;;

Fardale's avatar
Fardale committed
76
77
let stop_server resolver server _ = Lwt.wakeup_later resolver server

Fardale's avatar
Fardale committed
78
let node pass port =
Fardale's avatar
Fardale committed
79
  let promise, resolver = Lwt.task () in
Fardale's avatar
Fardale committed
80
81
  let template = "$(date).$(milliseconds) $(name)[$(pid)]: $(message)" in
  Lwt_log.file ~template ~perm:0o600 ~file_name:"test.log" ()
Fardale's avatar
Fardale committed
82
83
84
85
86
87
  >|= (fun x -> Lwt_log.default := Lwt_log.broadcast [Lwt_log.channel ~template ~close_mode:`Keep ~channel:Lwt_io.stderr (); x])
  >>= fun () -> Lwt_io.establish_server_with_client_address (Unix.ADDR_INET (Unix.inet_addr_any, port)) (server_handler pass port)
  >>= fun server -> let _ = Lwt_unix.on_signal 15 (stop_server resolver server) and
    _ = Lwt_unix.on_signal 2 (stop_server resolver server) and
    _ = Lwt_unix.on_signal 15 (stop_server resolver server) in promise
  >>= fun server -> Lwt_io.shutdown_server server <&> Lwt_log.info "Shuting down server"
Fardale's avatar
Fardale committed
88

89
90
91
open Cmdliner


Fardale's avatar
Fardale committed
92
let node_cmd =
93
94
  let port =
    let doc = "port on which client listen" in
Fardale's avatar
Fardale committed
95
    Arg.(value & opt int 4242 & info ["p"; "port"] ~docv:"PORT" ~doc)
96
  in
Fardale's avatar
Fardale committed
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
  let pass =
    let doc = "Password of the node" in
    Arg.(required & pos 0 (some string) None & info [] ~docv:"PASS"
           ~doc)
  in
  let doc = "Start un compatational node" in
  let exits = Term.default_exits in
  let man =
    [`S Manpage.s_description;
     `P "Start un compatational node on port $(port) with the password $(pass)";]
  in
  Term.(const Lwt_main.run $ (const node $ pass $ port)),
  Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man


let default_cmd =
  let doc = "Transforme un pool of computer into a cluster." in
  let man = [
    `S Manpage.s_bugs;
    `P "Email bug reports to <fardale+ocluster at crans.org>." ]
  in
  let sdocs = Manpage.s_common_options in
  let exits = Term.default_exits in
  Term.(ret (const (fun _ -> `Help (`Pager, None)) $ const ())),
  Term.info "ocluster" ~version:"v0.0.1" ~doc ~sdocs ~exits ~man


let () = Term.(exit @@ eval_choice default_cmd [node_cmd])
Fardale's avatar
Fardale committed
125
126

(*Lwt_io.with_connection (Lwt_unix.ADDR_INET (Unix.inet_addr_of_string "127.0.0.1", 2121)) (fun (ic,oc) -> Lwt_io.write_value oc (COMPUTATION {id = 0; script = "#echo penis\necho plop\nls"; time = None; pass = "wesh"}));;*)