Commit 252512c7 authored by Fardale's avatar Fardale

Use logs for logging

parent e2378129
......@@ -15,6 +15,6 @@
(executable
(name ocluster)
(public_name ocluster)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner unix)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner unix logs logs.lwt)
(preprocess
(pps lwt_ppx)))
......@@ -40,97 +40,99 @@ let rec launch_job () =
with
| None ->
CCDeque.push_front jobs j ;
Lwt_io.eprintf "No free node\n"
Logs_lwt.debug (fun m -> m "No free node\n")
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
in
SHashtbl.replace nodes k
({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ;
Lwt_io.eprintf "Send computation %i,%i to %s on %i\n" (fst j.id)
(snd j.id) n.addr n.port
>>= Lwt_io.flush_all
Logs_lwt.debug (fun m ->
m "Send computation %i,%i to %s on %i\n" (fst j.id) (snd j.id)
n.addr n.port )
>>= fun () ->
Lwt_io.flush Lwt_io.stderr
>>= fun () ->
Lwt_io.with_connection sockaddr (fun (_ic, oc) ->
Lwt_io.write oc (Serialization_j.string_of_query (`COMPUTATION j))
)
(* TODO: check return value *)
>>= launch_job )
else Lwt_io.eprintf "No compuatiton\n"
else Logs_lwt.debug (fun m -> m "No compuatiton\n")
let server_handler pass port sockaddr (ic, _oc) =
( match%lwt Lwt_io.read ic with
| "" ->
Lwt_io.eprintf "Error during the reception of the computation: No data\n"
| json -> (
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `RESULT result ->
if pass = result.pass then (
(*Lwt_io.write oc "true"
Lwt_io.read ic
>>= fun json ->
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `RESULT result ->
if pass = result.pass then (
(*Lwt_io.write oc "true"
<&>*)
Lwt_io.eprintf "Receive result: %s\n"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"})
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf "Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)) )
>>= fun () -> end_job result.id sockaddr ; launch_job () )
else
(*Lwt_io.write oc "Wrong password"
Logs_lwt.debug (fun m ->
m "Receive result: %s\n"
(Serialization_j.string_of_result
{result with stdout= "<stdout>"; stderr= "<stderr>"}) )
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.out" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stdout)
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.err" (fst result.id)
(snd result.id))
(fun oc -> Lwt_io.write oc result.stderr)
>>= fun () ->
Lwt_io.with_file ~mode:Lwt_io.output
(Printf.sprintf "ocluster_%i_%i.log" (fst result.id)
(snd result.id))
(fun oc ->
Lwt_io.write oc
(Printf.sprintf "Job completed at %f\nReturn code: %s\n"
(Unix.time ())
(string_of_ret_code result.ret_code)) )
>>= fun () -> end_job result.id sockaddr ; launch_job () )
else
(*Lwt_io.write oc "Wrong password"
<&>*)
Lwt_io.eprintf "Wrong password: %s\n" result.pass
| `JOB submission ->
if pass = submission.pass then (
(*Lwt_io.write oc "true"
Logs_lwt.warn (fun m -> m "Wrong password: %s\n" result.pass)
| `JOB submission ->
if pass = submission.pass then (
(*Lwt_io.write oc "true"
<&>*)
Lwt_io.eprintf "Receive submission: %s\n"
(Serialization_j.string_of_submission submission)
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{ id= (!jobs_id, i)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i
; Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id ]
; script= submission.script
; time= submission.time
; pass
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs computation
done ;
incr jobs_id ;
if empty then launch_job () else Lwt.return_unit )
else
(*Lwt_io.write oc "Wrong password"
Logs_lwt.debug (fun m ->
m "Receive submission: %s\n"
(Serialization_j.string_of_submission submission) )
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{ id= (!jobs_id, i)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i
; Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id ]
; script= submission.script
; time= submission.time
; pass
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs computation
done ;
incr jobs_id ;
if empty then launch_job () else Lwt.return_unit )
else
(*Lwt_io.write oc "Wrong password"
<&>*)
Lwt_io.eprintf "Wrong password: %s\n" submission.pass
| _ -> Lwt_io.eprintf "Receive a unwanted command\n" )
| Result.Error e ->
Lwt_io.eprintf "Error during the reception of the computation: %s\n"
(Printexc.to_string e) ) )
>>= Lwt_io.flush_all
Logs_lwt.warn (fun m -> m "Wrong password: %s\n" submission.pass)
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command\n") )
| Result.Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s\n"
(Printexc.to_string e) )
let stop_server resolver server _ = Lwt.wakeup_later resolver server
......@@ -140,8 +142,7 @@ let cmd config =
in
List.iter (fun n -> SHashtbl.add nodes n.addr (n, [])) conf.nodes ;
let promise, resolver = Lwt.task () in
Lwt_io.printf "master at %i with pass %s\n" conf.port conf.pass
>>= Lwt_io.flush_all
Logs_lwt.app (fun m -> m "master at %i with pass %s\n" conf.port conf.pass)
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
......@@ -151,4 +152,5 @@ let cmd config =
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
Lwt_io.shutdown_server server <&> Lwt_io.eprintf "Shuting down server\n"
Lwt_io.shutdown_server server
<&> Logs_lwt.app (fun m -> m "Shuting down server\n")
......@@ -42,34 +42,34 @@ let handle_computation sockaddr computation () =
Lwt_io.write oc (Serialization_j.string_of_query (`RESULT result)) )
(* TODO: rendre ça résistant au crash du serveur *)
>>= fun () ->
Lwt_io.eprintf "End computation %i,%i\n" (fst computation.id)
(snd computation.id)
Logs_lwt.debug (fun m ->
m "End computation %i,%i\n" (fst computation.id) (snd computation.id) )
>>= fun () -> Lwt_unix.sleep 2.
let stat oc = Lwt_io.write_value oc `OK
let server_handler pass sockaddr (ic, oc) =
( match%lwt Lwt_io.read_line_opt ic with
| Some json -> (
match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Lwt_io.eprintf "Receive computation: %s\n"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"})
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else Lwt_io.eprintf "Wrong password: %s\n" computation.pass
| `STAT -> stat oc <&> Lwt_io.eprintf "Receive a stat command\n"
| _ -> Lwt_io.eprintf "Receive a unwanted command\n" )
| Result.Error e ->
Lwt_io.eprintf "Error during the reception of the computation: %s\n"
(Printexc.to_string e) )
| None ->
Lwt_io.eprintf "Error during the reception of the computation: No data\n"
)
>>= Lwt_io.flush_all
Lwt_io.read ic
>>= fun json ->
( match CCResult.guard (fun () -> Serialization_j.query_of_string json) with
| Result.Ok query -> (
match query with
| `COMPUTATION (computation : computation) ->
if pass = computation.pass then
Logs_lwt.debug (fun m ->
m "Receive computation: %s\n"
(Serialization_j.string_of_computation
{computation with env= []; script= "<script>"}) )
>|= fun () -> Lwt.async (handle_computation sockaddr computation)
else Logs_lwt.warn (fun m -> m "Wrong password: %s\n" computation.pass)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command\n")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command\n") )
| Result.Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the computation: %s\n"
(Printexc.to_string e) ) )
>>= fun () -> Lwt_io.flush Lwt_io.stderr
let stop_server resolver server _ = Lwt.wakeup_later resolver server
......@@ -78,8 +78,10 @@ let cmd config =
Serialization_j.node_conf_of_string (CCIO.with_in config CCIO.read_all)
in
let promise, resolver = Lwt.task () in
Lwt_io.eprintf "Node at port %i with pass %s\n" conf.port conf.pass
>>= Lwt_io.flush_all
Logs_lwt.info (fun m ->
m "Node at port %i with pass %s\n" conf.port conf.pass )
>>= fun () ->
Lwt_io.flush Lwt_io.stderr
>>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
......@@ -89,4 +91,5 @@ let cmd config =
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
promise
>>= fun server ->
Lwt_io.shutdown_server server <&> Lwt_io.eprintf "Shuting down server\n"
Lwt_io.shutdown_server server
<&> Logs_lwt.info (fun m -> m "Shuting down server\n")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment