Commit e595336a authored by Fardale's avatar Fardale

version 0.1.0

parent cb9b5f86
open Serialization_t
let cmd cpu ram time iteration port script_file pass addr =
let script = CCIO.with_in script_file CCIO.read_all in
let submission = {script; time; iteration; cpu; ram; pass} in
Lwt_io.with_connection
(Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
(fun (_ic, oc) ->
Lwt_io.write oc (Serialization_j.string_of_query (`JOB submission)) )
(rule
(targets serialization_j.ml serialization_j.mli)
(deps serialization.atd)
(mode fallback)
(action
(run atdgen -j -j-std %{deps})))
(rule
(targets serialization_t.ml serialization_t.mli)
(deps serialization.atd)
(mode fallback)
(action
(run atdgen -t -j-std %{deps})))
(executable
(name ocluster)
(public_name ocluster)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner unix)
(preprocess
(pps lwt_ppx)))
(lang dune 1.0)
(using fmt 1.1)
(name ocluster)
(jbuild_version 1)
(executable (
(name ocluster)
(libraries (lwt.unix lwt_log lwt cmdliner unix ppx_deriving.show))
(preprocess (pps (ppx_deriving.show lwt_ppx)))
(modules (ocluster))))
open Lwt.Infix
open Serialization_t
module SHashtbl = CCHashtbl.Make (CCString)
let nodes : (node * computation list) SHashtbl.t = SHashtbl.create 10
let jobs : computation CCDeque.t = CCDeque.create ()
let jobs_id = ref 0
let string_of_ret_code = function
| `WEXITED i -> Printf.sprintf "WEXITED %i" i
| `WSIGNALED s -> Printf.sprintf "WSIGNALED %i" s
| `WSTOPPED s -> Printf.sprintf "WSTOPPED %i" s
let string_of_sockaddr = function
| Unix.ADDR_UNIX s -> s
| Unix.ADDR_INET (ip, _) -> Unix.string_of_inet_addr ip
let end_job id sockaddr =
let n, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in
let j = List.find (fun c -> c.id = id) l in
SHashtbl.replace nodes
(string_of_sockaddr sockaddr)
( {n with cpu= n.cpu + j.cpu; ram= n.ram + j.ram}
, CCList.remove ~eq:(fun c1 c2 -> c1.id = c2.id) ~key:j l )
let rec launch_job () =
if not (CCDeque.is_empty jobs) then (
let j = CCDeque.take_front jobs in
match
SHashtbl.fold
(fun k (n, l) node ->
match node with
| Some x -> Some x
| None ->
if n.cpu >= j.cpu && n.ram >= j.ram then Some (k, n, l) else None
)
nodes None
with
| None ->
CCDeque.push_front jobs j ;
Lwt_io.eprintf "No free node\n"
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
in
SHashtbl.replace nodes k
({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ;
Lwt_io.eprintf "Send computation %i,%i to %s on %i\n" (fst j.id)
(snd j.id) n.addr n.port
>>= Lwt_io.flush_all
>>= fun () ->
Lwt_io.with_connection sockaddr (fun (_ic, oc) ->
Lwt_io.write oc (Serialization_j.string_of_query (`COMPUTATION j))
)
(* TODO: check return value *)
>>= launch_job )
else Lwt_io.eprintf "No compuatiton\n"
let server_handler pass port sockaddr (ic, _oc) =
( match%lwt Lwt_io.read ic with
| "" ->
Lwt_io.eprintf "Error during the reception of the computation: No data\n"
| json -> (
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `RESULT result ->
if pass = result.pass then (
(*Lwt_io.write oc "true"
<&>*)
Lwt_io.eprintf "Receive result: %s\n"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"})
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf "Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)) )
>>= fun () -> end_job result.id sockaddr ; launch_job () )
else
(*Lwt_io.write oc "Wrong password"
<&>*)
Lwt_io.eprintf "Wrong password: %s\n" result.pass
| `JOB submission ->
if pass = submission.pass then (
(*Lwt_io.write oc "true"
<&>*)
Lwt_io.eprintf "Receive submission: %s\n"
(Serialization_j.string_of_submission submission)
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{ id= (!jobs_id, i)
; env= [Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i]
; script= submission.script
; time= submission.time
; pass
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs computation
done ;
incr jobs_id ;
if empty then launch_job () else Lwt.return_unit )
else
(*Lwt_io.write oc "Wrong password"
<&>*)
Lwt_io.eprintf "Wrong password: %s\n" submission.pass
| _ -> Lwt_io.eprintf "Receive a unwanted command\n" )
| Result.Error e ->
Lwt_io.eprintf "Error during the reception of the computation: %s\n"
(Printexc.to_string e) ) )
>>= Lwt_io.flush_all
let stop_server resolver server _ = Lwt.wakeup_later resolver server
let cmd config =
let conf =
Serialization_j.master_conf_of_string (CCIO.with_in config CCIO.read_all)
in
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes ;
let promise, resolver = Lwt.task () in
Lwt_io.printf "master at %i with pass %s\n" conf.port conf.pass
>>= Lwt_io.flush_all
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass conf.port)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
Lwt_io.shutdown_server server <&> Lwt_io.eprintf "Shuting down server\n"
open Lwt.Infix
open Serialization_t
let process_status_to_ret_code = function
| Unix.WEXITED c -> `WEXITED c
| Unix.WSIGNALED s -> `WSIGNALED s
| Unix.WSTOPPED s -> `WSTOPPED s
let run_computation (computation : computation) =
Lwt_io.with_temp_file ~prefix:"ocluster" ~perm:0o700 (fun (name, oc) ->
Lwt_io.write oc computation.script
>>= fun () ->
Lwt_io.close oc
>>= fun () ->
Lwt_unix.sleep 2.
>>= fun () ->
let read_stderr, write_stderr = Lwt_unix.pipe_in ()
and read_stdout, write_stdout = Lwt_unix.pipe_in () in
let%lwt ret_code =
Lwt_process.exec ?timeout:computation.time ~stdin:`Close
~env:(Array.of_list computation.env)
~stdout:(`FD_move write_stdout) ~stderr:(`FD_move write_stderr)
(name, [||])
and stdout = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
and stderr = Lwt_io.read (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr) in
Lwt.return
{ id= computation.id
; stdout
; stderr
; ret_code= process_status_to_ret_code ret_code
; pass= computation.pass } )
let handle_computation sockaddr computation () =
run_computation computation
>>= fun result ->
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, computation.port)
| s -> s
in
Lwt_io.with_connection sockaddr (fun (_ic, oc) ->
Lwt_io.write oc (Serialization_j.string_of_query (`RESULT result)) )
(* TODO: rendre ça résistant au crash du serveur *)
>>= fun () ->
Lwt_io.eprintf "End computation %i,%i\n" (fst computation.id)
(snd computation.id)
>>= fun () -> Lwt_unix.sleep 2.
let stat oc = Lwt_io.write_value oc `OK
let server_handler pass sockaddr (ic, oc) =
( match%lwt Lwt_io.read_line_opt ic with
| Some json -> (
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Lwt_io.eprintf "Receive computation: %s\n"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"})
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else Lwt_io.eprintf "Wrong password: %s\n" computation.pass
| `STAT -> stat oc <&> Lwt_io.eprintf "Receive a stat command\n"
| _ -> Lwt_io.eprintf "Receive a unwanted command\n" )
| Result.Error e ->
Lwt_io.eprintf "Error during the reception of the computation: %s\n"
(Printexc.to_string e) )
| None ->
Lwt_io.eprintf "Error during the reception of the computation: No data\n"
)
>>= Lwt_io.flush_all
let stop_server resolver server _ = Lwt.wakeup_later resolver server
let cmd config =
let conf =
Serialization_j.node_conf_of_string (CCIO.with_in config CCIO.read_all)
in
let promise, resolver = Lwt.task () in
Lwt_io.eprintf "Node at port %i with pass %s\n" conf.port conf.pass
>>= Lwt_io.flush_all
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
Lwt_io.shutdown_server server <&> Lwt_io.eprintf "Shuting down server\n"
open Lwt.Infix
type computation = {id: int; script: string; time: float option; pass: string}
[@@deriving show]
type query = COMPUTATION of computation | STAT [@@deriving show]
let show_process_status fmt = function
| Unix.WEXITED c -> Format.fprintf fmt "WEXITED %i" c
| Unix.WSIGNALED s -> Format.fprintf fmt "WSIGNALED %i" s
| Unix.WSTOPPED s -> Format.fprintf fmt "WSTOPPED %i" s
type answer =
{ id: int
; stdout: string option
; stderr: string option
; ret_code: Unix.process_status [@printer show_process_status]
; pass: string }
[@@deriving show]
type stat = OK [@@deriving show]
open Cmdliner
let run_computation (computation : computation) =
let read_stderr, write_stderr = Lwt_unix.pipe_in ()
and read_stdout, write_stdout = Lwt_unix.pipe_in () in
Lwt_process.exec ?timeout:computation.time ~stdin:`Close
~stdout:(`FD_move write_stdout) ~stderr:(`FD_move write_stderr)
(Lwt_process.shell computation.script)
>>= fun ret_code ->
let%lwt stdout =
Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stdout)
and stderr =
Lwt_io.read_line_opt (Lwt_io.of_fd ~mode:Lwt_io.input read_stderr)
let node_cmd =
let config =
let doc = "Configuration of the node" in
Arg.(required & pos 0 (some string) None & info [] ~docv:"CONF" ~doc)
in
Lwt.return
{id= computation.id; stdout; stderr; ret_code; pass= computation.pass}
let getinetaddrbyname name =
Lwt_unix.gethostbyname name >|= fun entry -> entry.Lwt_unix.h_addr_list.(0)
let stat oc = Lwt_io.write_value oc OK
let server_handler pass port sockaddr (ic, oc) =
let sockaddr =
match sockaddr with
| Unix.ADDR_INET (a, _) -> Unix.ADDR_INET (a, port)
| s -> s
let doc = "Start un computational node" in
let exits = Term.default_exits in
let man =
[ `S Manpage.s_description
; `P
"Start un computational node on the given port with the given password"
]
in
( match%lwt Lwt_result.catch (Lwt_io.read_value ic) with
| Result.Ok query -> (
match query with
| COMPUTATION computation ->
if pass = computation.pass then
Lwt_io.write_value oc true
<&> Lwt_log.debug
(Printf.sprintf "Receive computation: %s"
(show_computation computation))
>>= fun () ->
run_computation computation
>>= fun answer ->
Lwt_io.with_connection sockaddr (fun (ic, oc) ->
Lwt_io.write_value oc answer )
>>= fun () ->
Lwt_log.debug (Printf.sprintf "End computation %i" computation.id)
else Lwt_io.write_value oc false <&> Lwt_log.warning "Wrong password"
| STAT -> stat oc <&> Lwt_log.debug "Receive a stat command" )
| Result.Error e ->
Lwt_log.error
(Printf.sprintf "Error during the reception of the computation: %s"
(Printexc.to_string e))
<&> Lwt_io.write_value oc false )
>>= fun () -> Lwt_unix.sleep 2.
let stop_server resolver server _ = Lwt.wakeup_later resolver server
let node pass port =
let promise, resolver = Lwt.task () in
let template = "$(date).$(milliseconds) $(name)[$(pid)]: $(message)" in
Lwt_log.file ~template ~perm:0o600 ~file_name:"test.log" ()
>|= (fun x ->
Lwt_log.default :=
Lwt_log.broadcast
[ Lwt_log.channel ~template ~close_mode:`Keep
~channel:Lwt_io.stderr ()
; x ] )
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, port))
(server_handler pass port)
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server)
and _ = Lwt_unix.on_signal 15 (stop_server resolver server) in
promise
>>= fun server ->
Lwt_io.shutdown_server server <&> Lwt_log.info "Shuting down server"
( Term.(const Lwt_main.run $ (const Node.cmd $ config))
, Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
open Cmdliner
let master_cmd =
let config =
let doc = "Configuration file of the master" in
Arg.(required & pos 0 (some file) None & info [] ~docv:"CONF_FILE" ~doc)
in
let doc = "Start un master server" in
let exits = Term.default_exits in
let man =
[ `S Manpage.s_description
; `P "Start un master server on the given port with the given password" ]
in
( Term.(const Lwt_main.run $ (const Master.cmd $ config))
, Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let node_cmd =
let client_cmd =
let cpu =
let doc = "Number of cpu needed" in
Arg.(value & opt int 1 & info ["c"; "cpu"] ~docv:"CPU" ~doc)
in
let ram =
let doc = "Quantity of RAM needed" in
Arg.(value & opt int 1024 & info ["r"; "ram"] ~docv:"RAM" ~doc)
in
let time =
let doc = "Time limit" in
Arg.(value & opt (some float) None & info ["t"; "time"] ~docv:"TIME" ~doc)
in
let iteration =
let doc = "Number of iteration" in
Arg.(value & opt int 1 & info ["i"; "iteration"] ~docv:"N" ~doc)
in
let port =
let doc = "port on which client listen" in
Arg.(value & opt int 4242 & info ["p"; "port"] ~docv:"PORT" ~doc)
in
let script =
let doc = "Ocluster script" in
Arg.(required & pos 0 (some file) None & info [] ~docv:"SCRIPT" ~doc)
in
let pass =
let doc = "Password of the node" in
Arg.(required & pos 0 (some string) None & info [] ~docv:"PASS" ~doc)
Arg.(required & pos 1 (some string) None & info [] ~docv:"PASS" ~doc)
in
let doc = "Start un compatational node" in
let exits = Term.default_exits in
let man =
[ `S Manpage.s_description
; `P
"Start un compatational node on port $(port) with the password $(pass)"
]
let addr =
let doc = "Ip of the master" in
Arg.(required & pos 2 (some string) None & info [] ~docv:"IP" ~doc)
in
( Term.(const Lwt_main.run $ (const node $ pass $ port))
, Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let doc = "Client to use the cluster" in
let exits = Term.default_exits in
let man = [`S Manpage.s_description; `P "Client to use the cluster"] in
( Term.(
const Lwt_main.run
$ ( const Client.cmd $ cpu $ ram $ time $ iteration $ port $ script $ pass
$ addr ))
, Term.info "client" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let default_cmd =
let doc = "Transforme un pool of computer into a cluster." in
let doc = "Use a pool of computer as a cluster." in
let man =
[ `S Manpage.s_bugs
; `P "Email bug reports to <fardale+ocluster at crans.org>." ]
......@@ -126,8 +81,7 @@ let default_cmd =
let sdocs = Manpage.s_common_options in
let exits = Term.default_exits in
( Term.(ret (const (fun _ -> `Help (`Pager, None)) $ const ()))
, Term.info "ocluster" ~version:"v0.0.1" ~doc ~sdocs ~exits ~man )
let () = Term.(exit @@ eval_choice default_cmd [node_cmd])
, Term.info "ocluster" ~version:"v0.1.0" ~doc ~sdocs ~exits ~man )
(*Lwt_io.with_connection (Lwt_unix.ADDR_INET (Unix.inet_addr_of_string "127.0.0.1", 2121)) (fun (ic,oc) -> Lwt_io.write_value oc (COMPUTATION {id = 0; script = "#echo penis\necho plop\nls"; time = None; pass = "wesh"}));;*)
let () =
Term.(exit @@ eval_choice default_cmd [node_cmd; master_cmd; client_cmd])
opam-version: "2.0"
maintainer: "fardale+git@crans.org"
authors: ["Fardale"]
homepage: ""
bug-reports: ""
dev-repo: ""
license: "MIT"
version: "0.1.0"
depends: [
"dune" {build}
"containers"
"atdgen"
"lwt"
"lwt_ppx"
"cmdliner"
"base-unix"
]
build: [
["dune" "build" "-p" name "-j" jobs]
]
synopsis: "Fast, portable and opinionated build system"
description: """
dune is a build system that was designed to simplify the release of
Jane Street packages. It reads metadata from "dune" files following a
very simple s-expression syntax.
dune is fast, it has very low-overhead and support parallel builds on
all platforms. It has no system dependencies, all you need to build
dune and packages using dune is OCaml. You don't need or make or bash
as long as the packages themselves don't use bash explicitly.
dune supports multi-package development by simply dropping multiple
repositories into the same directory.
It also supports multi-context builds, such as building against
several opam roots/switches simultaneously. This helps maintaining
packages across several versions of OCaml and gives cross-compilation
for free.
"""
type node = {addr: string; ~port <ocaml default="4242">: int; cpu: int; ram: int}
type master_conf = {pass: string; ~port <ocaml default="4242">: int; nodes: node list}
type node_conf = {pass: string; ~port <ocaml default="4242">: int}
type computation = {id: (int * int); env: string list; script: string; time: float option; pass: string; port: int; cpu: int; ram: int}
type ret_code = [WEXITED of int | WSIGNALED of int | WSTOPPED of int]
type submission = {script: string; time: float option; iteration: int <ocaml default="1">;
cpu: int <ocaml default="1">; ram: int <ocaml default="1024">;
pass: string}
type query = [COMPUTATION of computation | STAT | RESULT of result | JOB of submission]
type stat = [OK]
type result =
{id: (int * int); stdout: string; stderr: string; ret_code: ret_code; pass: string}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment