Commit 0e0354d6 authored by Fardale's avatar Fardale

Correct the use of logs

parent 8305db54
......@@ -15,6 +15,7 @@
(executable
(name ocluster)
(public_name ocluster)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner unix logs logs.lwt)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner
unix logs logs.lwt logs.fmt logs.cli fmt fmt.cli fmt.tty)
(preprocess
(pps lwt_ppx)))
......@@ -42,7 +42,7 @@ let rec launch_job () =
with
| None ->
CCDeque.push_front jobs j ;
Logs_lwt.debug (fun m -> m "No free node\n")
Logs_lwt.debug (fun m -> m "No free node")
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
......@@ -50,7 +50,7 @@ let rec launch_job () =
SHashtbl.replace nodes k
({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ;
Logs_lwt.debug (fun m ->
m "Send computation %i,%i to %s on %i\n" (fst j.id) (snd j.id)
m "Send computation %i,%i to %s on %i" (fst j.id) (snd j.id)
n.addr n.port )
>>= fun () ->
Lwt_io.flush Lwt_io.stderr
......@@ -60,7 +60,7 @@ let rec launch_job () =
)
(* TODO: check return value *)
>>= Lwt.pause >>= launch_job )
else Logs_lwt.debug (fun m -> m "No compuatiton\n")
else Logs_lwt.debug (fun m -> m "No compuatiton")
let server_handler pass port sockaddr (ic, _oc) =
Lwt_io.read ic
......@@ -71,7 +71,7 @@ let server_handler pass port sockaddr (ic, _oc) =
| `RESULT result ->
if pass = result.pass then
Logs_lwt.debug (fun m ->
m "Receive result: %s\n"
m "Receive result: %s"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"}) )
>>= fun () ->
......@@ -100,13 +100,13 @@ let server_handler pass port sockaddr (ic, _oc) =
else
(*Lwt_io.write oc "Wrong password"
<&>*)
Logs_lwt.warn (fun m -> m "Wrong password: %s\n" result.pass)
Logs_lwt.warn (fun m -> m "Wrong password: %s" result.pass)
| `JOB submission ->
if pass = submission.pass then (
(*Lwt_io.write oc "true"
<&>*)
Logs_lwt.debug (fun m ->
m "Receive submission: %s\n"
m "Receive submission: %s"
(Serialization_j.string_of_submission submission) )
>>= fun () ->
let empty = CCDeque.is_empty jobs in
......@@ -130,22 +130,22 @@ let server_handler pass port sockaddr (ic, _oc) =
else
(*Lwt_io.write oc "Wrong password"
<&>*)
Logs_lwt.warn (fun m -> m "Wrong password: %s\n" submission.pass)
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command\n") )
Logs_lwt.warn (fun m -> m "Wrong password: %s" submission.pass)
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
| Result.Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s\n"
m "Error during the reception of the computation: %s"
(Printexc.to_string e) )
let stop_server resolver server _ = Lwt.wakeup_later resolver server
let cmd config =
let cmd config () =
let conf =
Serialization_j.master_conf_of_string (CCIO.with_in config CCIO.read_all)
in
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes ;
let promise, resolver = Lwt.task () in
Logs_lwt.app (fun m -> m "master at %i with pass %s\n" conf.port conf.pass)
Logs_lwt.info (fun m -> m "master at %i with pass %s" conf.port conf.pass)
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
......@@ -156,4 +156,4 @@ let cmd config =
promise
>>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.app (fun m -> m "Shuting down server\n")
<&> Logs_lwt.info (fun m -> m "Shuting down server")
......@@ -43,7 +43,7 @@ let handle_computation sockaddr computation () =
(* TODO: rendre ça résistant au crash du serveur *)
>>= fun () ->
Logs_lwt.debug (fun m ->
m "End computation %i,%i\n" (fst computation.id) (snd computation.id) )
m "End computation %i,%i" (fst computation.id) (snd computation.id) )
let stat oc = Lwt_io.write_value oc `OK
......@@ -56,29 +56,29 @@ let server_handler pass sockaddr (ic, oc) =
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Logs_lwt.debug (fun m ->
m "Receive computation: %s\n"
m "Receive computation: %s"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"}) )
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else Logs_lwt.warn (fun m -> m "Wrong password: %s\n" computation.pass)
else Logs_lwt.warn (fun m -> m "Wrong password: %s" computation.pass)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command\n")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command\n") )
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command") )
| Result.Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s\n"
m "Error during the reception of the computation: %s"
(Printexc.to_string e) ) )
>>= fun () -> Lwt_io.flush Lwt_io.stderr
let stop_server resolver server _ = Lwt.wakeup_later resolver server
let cmd config =
let cmd config () =
let conf =
Serialization_j.node_conf_of_string (CCIO.with_in config CCIO.read_all)
in
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m ->
m "Node at port %i with pass %s\n" conf.port conf.pass )
m "Node at port %i with pass %s" conf.port conf.pass )
>>= fun () ->
Lwt_io.flush Lwt_io.stderr
>>= fun () ->
......@@ -91,4 +91,4 @@ let cmd config =
promise
>>= fun server ->
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down server\n")
<&> Logs_lwt.info (fun m -> m "Shuting down node server")
open Cmdliner
let lwt_reporter () =
let buf_fmt ~like =
let b = Buffer.create 512 in
( Fmt.with_buffer ~like b
, fun () ->
let m = Buffer.contents b in
Buffer.reset b ; m )
in
let app, app_flush = buf_fmt ~like:Fmt.stdout in
let dst, dst_flush = buf_fmt ~like:Fmt.stderr in
let reporter = Logs_fmt.reporter ~app ~dst () in
let report src level ~over k msgf =
let k () =
let write () =
match level with
| Logs.App -> Lwt_io.write Lwt_io.stdout (app_flush ())
| _ -> Lwt_io.write Lwt_io.stderr (dst_flush ())
in
let unblock () = over () ; Lwt.return_unit in
Lwt.async (fun () -> Lwt.finalize write unblock) ;
k ()
in
reporter.Logs.report src level ~over:(fun () -> ()) k msgf
in
{Logs.report}
let setup_log style_renderer level =
Fmt_tty.setup_std_outputs ?style_renderer () ;
Logs.set_level level ;
Logs.set_reporter (lwt_reporter ()) ;
()
let setup_log =
Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ())
let node_cmd =
let config =
let doc = "Configuration of the node" in
......@@ -13,7 +48,7 @@ let node_cmd =
"Start un computational node on the given port with the given password"
]
in
( Term.(const Lwt_main.run $ (const Node.cmd $ config))
( Term.(const Lwt_main.run $ (const Node.cmd $ config $ setup_log))
, Term.info "node" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let master_cmd =
......@@ -27,7 +62,7 @@ let master_cmd =
[ `S Manpage.s_description
; `P "Start un master server on the given port with the given password" ]
in
( Term.(const Lwt_main.run $ (const Master.cmd $ config))
( Term.(const Lwt_main.run $ (const Master.cmd $ config $ setup_log))
, Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let client_cmd =
......
......@@ -15,6 +15,7 @@ depends: [
"cmdliner"
"base-unix"
"logs"
"fmt"
]
build: [
["dune" "build" "-p" name "-j" jobs]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment