node.ml 5.25 KB
Newer Older
Fardale's avatar
Fardale committed
1 2 3
open Lwt.Infix
open Serialization_t

Fardale's avatar
Fardale committed
4 5
let max_std = 5120000 (* 5Mo *)

Fardale's avatar
Fardale committed
6
let process_status_to_ret_code = function
Fardale's avatar
Fardale committed
7 8 9 10 11 12
  | Unix.WEXITED c ->
      `WEXITED c
  | Unix.WSIGNALED s ->
      `WSIGNALED s
  | Unix.WSTOPPED s ->
      `WSTOPPED s
Fardale's avatar
Fardale committed
13 14 15

let run_computation (computation : computation) =
  Lwt_io.with_temp_file ~prefix:"ocluster" ~perm:0o700 (fun (name, oc) ->
16
      Lwt_io.write_line oc computation.script
Fardale's avatar
Fardale committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
      >>= fun () ->
      Lwt_io.close oc
      >>= fun () ->
      Lwt_unix.sleep 2.
      >>= fun () ->
      let read_stderr, write_stderr = Lwt_unix.pipe_in ()
      and read_stdout, write_stdout = Lwt_unix.pipe_in () in
      let%lwt ret_code =
        Lwt_process.exec ?timeout:computation.time ~stdin:`Close
          ~env:(Array.of_list computation.env)
          ~stdout:(`FD_move write_stdout) ~stderr:(`FD_move write_stderr)
          (name, [||])
      and stdout = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
      and stderr = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
      Lwt.return
        { id= computation.id
Fardale's avatar
Fardale committed
33 34 35 36 37 38 39 40
        ; stdout=
            ( if String.length stdout > max_std then
              CCString.drop (String.length stdout - max_std) stdout
            else stdout )
        ; stderr=
            ( if String.length stderr > max_std then
              CCString.drop (String.length stderr - max_std) stderr
            else stderr )
Fardale's avatar
Fardale committed
41 42 43
        ; ret_code= process_status_to_ret_code ret_code
        ; pass= computation.pass } )

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
let rec send_result sockaddr result =
  if%lwt
    Lwt_io.with_connection sockaddr (fun (ic, oc) ->
        Lwt_io.write_line oc (Serialization_j.string_of_query (`RESULT result))
        >>= fun () ->
        Lwt_io.flush oc
        >>= fun () ->
        try%lwt
          Lwt_io.read_line ic
          >>= fun json ->
          match
            CCResult.guard (fun () -> Serialization_j.answer_of_string json)
          with
          | Ok answer -> (
            match answer with
            | `Ok ->
                Logs_lwt.debug (fun m ->
                    m "Result %i,%i successfully sent." (fst result.id)
                      (snd result.id) )
                >>= fun () -> Lwt.return false
            | `Error s ->
                Logs_lwt.err (fun m ->
                    m "Error during the reception of the result %i,%i: %s"
                      (fst result.id) (snd result.id) s )
                >>= fun () -> Lwt.return true )
          | Error e ->
              Logs_lwt.err (fun m ->
                  m "Error during the reception of the answer: %s"
                    (Printexc.to_string e) )
              >>= fun () -> Lwt.return true
        with End_of_file ->
          Logs_lwt.err (fun m -> m "Error during the read of the answer: EOF")
          >>= fun () -> Lwt.return true )
  then send_result sockaddr result

Fardale's avatar
Fardale committed
79 80 81 82 83
let handle_computation sockaddr computation () =
  run_computation computation
  >>= fun result ->
  let sockaddr =
    match sockaddr with
Fardale's avatar
Fardale committed
84 85 86 87
    | Unix.ADDR_INET (a, _) ->
        Unix.ADDR_INET (a, computation.port)
    | s ->
        s
Fardale's avatar
Fardale committed
88
  in
89 90 91 92
  send_result sockaddr result
  <&> Logs_lwt.debug (fun m ->
          m "End computation %i,%i" (fst computation.id) (snd computation.id)
      )
Fardale's avatar
Fardale committed
93

94
let stat oc = Lwt_io.write_line oc (Serialization_j.string_of_stat `OK)
Fardale's avatar
Fardale committed
95 96

let server_handler pass sockaddr (ic, oc) =
97
  Lwt_io.read_line ic
Fardale's avatar
Fardale committed
98 99 100 101 102 103
  >>= fun json ->
  ( match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
  | Result.Ok query -> (
    match query with
    | `COMPUTATION (computation : computation) ->
        if pass = computation.pass then
104 105 106 107 108 109
          Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
          >>= (fun () -> Lwt_io.flush oc)
          <&> Logs_lwt.debug (fun m ->
                  m "Receive computation: %s"
                    (Serialization_j.string_of_computation
                       {computation with env= []; script= "<script>"}) )
Fardale's avatar
Fardale committed
110
          >|= fun () -> Lwt.async (handle_computation sockaddr computation)
111 112 113 114 115
        else
          Lwt_io.write_line oc
            (Serialization_j.string_of_answer (`Error "Wrong password"))
          >>= (fun () -> Lwt_io.flush oc)
          <&> Logs_lwt.warn (fun m -> m "Wrong password: %s" computation.pass)
Fardale's avatar
Fardale committed
116 117 118 119
    | `STAT ->
        stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
    | _ ->
        Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
Fardale's avatar
Fardale committed
120 121
  | Result.Error e ->
      Logs_lwt.err (fun m ->
Fardale's avatar
Fardale committed
122
          m "Error during the reception of the computation: %s"
Fardale's avatar
Fardale committed
123 124
            (Printexc.to_string e) ) )
  >>= fun () -> Lwt_io.flush Lwt_io.stderr
Fardale's avatar
Fardale committed
125 126 127

let stop_server resolver server _ = Lwt.wakeup_later resolver server

Fardale's avatar
Fardale committed
128
let cmd config () =
Fardale's avatar
Fardale committed
129 130 131 132
  let conf =
    Serialization_j.node_conf_of_string (CCIO.with_in config CCIO.read_all)
  in
  let promise, resolver = Lwt.task () in
Fardale's avatar
Fardale committed
133
  Logs_lwt.info (fun m -> m "Node at port %i with pass %s" conf.port conf.pass)
Fardale's avatar
Fardale committed
134 135
  >>= fun () ->
  Lwt_io.flush Lwt_io.stderr
Fardale's avatar
Fardale committed
136 137 138 139 140 141 142 143 144
  >>= fun () ->
  Lwt_io.establish_server_with_client_address
    (Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
    (server_handler conf.pass)
  >>= fun server ->
  let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
  and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
  promise
  >>= fun server ->
Fardale's avatar
Fardale committed
145
  Lwt_io.shutdown_server server
Fardale's avatar
Fardale committed
146
  <&> Logs_lwt.info (fun m -> m "Shuting down node server")