Commit 7a7d07b2 authored by Fardale's avatar Fardale

add jobq command

jobq command allow to list jobs on master and delete a job
parent 17660e17
......@@ -13,6 +13,7 @@ depends: [
"cmdliner"
"logs"
"fmt"
"printbox"
]
build: [
["dune" "build" "-p" name "-j" jobs]
......
......@@ -16,6 +16,6 @@
(name ocluster)
(public_name ocluster)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner logs.lwt
logs.fmt logs.cli fmt.cli fmt.tty)
logs.fmt logs.cli fmt.cli fmt.tty printbox)
(preprocess
(pps lwt_ppx)))
open Lwt.Infix
open Serialization_t
let cmd id delete addr port pass () =
Lwt_io.with_connection
(Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
(fun (ic, oc) ->
Lwt_io.write_line oc
(Serialization_j.string_of_query (pass, `JOBQ (id, delete)))
>>= fun () ->
Lwt_io.read_line ic
>>= fun json ->
match
CCResult.guard (fun () -> Serialization_j.answer_of_string json)
with
| Ok answer -> (
match answer with
| `Ok ->
Logs_lwt.info (fun m -> m "Jobq query successfully sent.")
>>= fun () ->
if not delete then
Lwt_io.read_line ic
>>= fun json ->
match
CCResult.guard (fun () ->
Serialization_j.query_data_of_string json)
with
| Ok (`JOBS jobs) ->
let current_id = ref (-1) in
List.filter
(fun j ->
if !current_id != fst j.id then (
current_id := fst j.id ;
true )
else false)
jobs
|> Array.of_list
|> Array.map (fun j ->
[| PrintBox.sprintf "%i" (fst j.id)
; PrintBox.text (String.concat " " j.args) |])
|> PrintBox.grid |> PrintBox.frame
|> PrintBox_text.output stdout ;
Format.printf "@." ;
Lwt.return_unit
| Ok q ->
Logs_lwt.err (fun m ->
m "Error received the wrong answer: %s"
(Serialization_j.string_of_query_data q))
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
(Printexc.to_string e))
else Lwt.return_unit
| `Error s ->
Logs_lwt.err (fun m ->
m "Error during the reception of the jobq query: %s" s) )
| Error e ->
Logs_lwt.err (fun m ->
m "Error during the reception of the answer: %s"
(Printexc.to_string e)))
......@@ -134,6 +134,29 @@ let server_handler pass port sockaddr (ic, oc) =
done ;
incr jobs_id ;
if empty then launch_job pass () else Lwt.return_unit
| `JOBQ (id, delete) as jobq ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m ->
m "Receive JOBQ command: %s"
(Serialization_j.string_of_query_data jobq))
>>= fun () ->
if delete then (
let id = CCOpt.get_or ~default:(!jobs_id - 1) id in
CCDeque.filter_in_place jobs (fun j -> fst j.id != id) ;
Logs_lwt.info (fun m -> m "Removing jobs with id %i" id) )
else
(let jobs_list =
match id with
| None ->
CCDeque.to_list jobs
| Some id ->
CCDeque.(filter (fun j -> fst j.id = id) jobs |> to_list)
in
Lwt_io.write_line oc
(Serialization_j.string_of_query_data (`JOBS jobs_list))
>>= fun () -> Lwt_io.flush oc)
>>= fun () -> Lwt.return_unit
| _ ->
Lwt_io.write_line oc
(Serialization_j.string_of_answer (`Error "Unwanted command"))
......
......@@ -94,8 +94,6 @@ let handle_computation sockaddr pass computation () =
<&> Logs_lwt.debug (fun m ->
m "End computation %i,%i" (fst computation.id) (snd computation.id))
let stat oc = Lwt_io.write_line oc (Serialization_j.string_of_stat `OK)
let server_handler pass sockaddr (ic, oc) =
Lwt_io.read_line ic
>>= fun json ->
......@@ -112,8 +110,6 @@ let server_handler pass sockaddr (ic, oc) =
{computation with env= []; script= "<script>"}))
>|= fun () ->
Lwt.async (handle_computation sockaddr pass computation)
| `STAT ->
stat oc <&> Logs_lwt.debug (fun m -> m "Receive a stat command")
| _ ->
Logs_lwt.warn (fun m -> m "Receive a unwanted command")
else
......
......@@ -67,7 +67,7 @@ let master_cmd =
( Term.(const Lwt_main.run $ (const Master.cmd $ config $ setup_log))
, Term.info "master" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let client_cmd =
let submit_cmd =
let cpu =
let doc = "Number of cpu needed" in
Arg.(value & opt int 1 & info ["c"; "cpu"] ~docv:"CPU" ~doc)
......@@ -85,7 +85,7 @@ let client_cmd =
Arg.(value & opt int 1 & info ["i"; "iteration"] ~docv:"N" ~doc)
in
let port =
let doc = "port on which client listen" in
let doc = "port on which master listen" in
Arg.(value & opt int 4242 & info ["p"; "port"] ~docv:"PORT" ~doc)
in
let script =
......@@ -104,14 +104,43 @@ let client_cmd =
let doc = "Arguments passed to the script" in
Arg.(value & pos_right 2 string [] & info [] ~docv:"ARGS" ~doc)
in
let doc = "Client to use the cluster" in
let doc = "Submit jobs to the cluster" in
let exits = Term.default_exits in
let man = [`S Manpage.s_description; `P "Client to use the cluster"] in
let man = [`S Manpage.s_description; `P "Submit jobs to the cluster"] in
( Term.(
const Lwt_main.run
$ ( const Client.cmd $ cpu $ ram $ time $ iteration $ port $ script $ pass
$ ( const Submit.cmd $ cpu $ ram $ time $ iteration $ port $ script $ pass
$ addr $ args $ setup_log ))
, Term.info "client" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
, Term.info "submit" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let jobq_cmd =
let id =
let doc = "Id of the job to consider" in
Arg.(value & opt (some int) None & info ["i"; "id"] ~docv:"ID" ~doc)
in
let delete =
let doc = "Delete a job. If job id is given delete the last job." in
Arg.(value & flag & info ["d"; "delete"] ~doc)
in
let port =
let doc = "port on which master listen" in
Arg.(value & opt int 4242 & info ["p"; "port"] ~docv:"PORT" ~doc)
in
let pass =
let doc = "Password of the node" in
Arg.(required & pos 0 (some string) None & info [] ~docv:"PASS" ~doc)
in
let addr =
let doc = "Ip of the master" in
Arg.(required & pos 1 (some string) None & info [] ~docv:"IP" ~doc)
in
let doc = "Interact with the job queue." in
let exits = Term.default_exits in
let man = [`S Manpage.s_description; `P "Interact with the job queue."] in
( Term.(
const Lwt_main.run
$ (const Jobq.cmd $ id $ delete $ addr $ port $ pass $ setup_log))
, Term.info "jobq" ~doc ~sdocs:Manpage.s_common_options ~exits ~man )
let default_cmd =
let doc = "Use a pool of computer as a cluster." in
......@@ -125,4 +154,5 @@ let default_cmd =
, Term.info "ocluster" ~version:"v0.1.0" ~doc ~sdocs ~exits ~man )
let () =
Term.(exit @@ eval_choice default_cmd [node_cmd; master_cmd; client_cmd])
Term.(
exit @@ eval_choice default_cmd [node_cmd; master_cmd; submit_cmd; jobq_cmd])
......@@ -13,10 +13,9 @@ type submission = {script: string; args: string list; time: float option; iterat
type result =
{id: (int * int); stdout: string; stderr: string; ret_code: ret_code}
type query_data = [COMPUTATION of computation | STAT | RESULT of result | JOB of submission]
type query_data = [ COMPUTATION of computation | JOBQ of (int option * bool)
| RESULT of result | JOB of submission | JOBS of (computation list) ]
type query = (string * query_data)
type answer = [Ok | Error of string]
type stat = [OK]
......@@ -7,7 +7,8 @@ let cmd cpu ram time iteration port script_file pass addr args () =
Lwt_io.with_connection
(Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
(fun (ic, oc) ->
Lwt_io.write_line oc (Serialization_j.string_of_query (pass, `JOB submission))
Lwt_io.write_line oc
(Serialization_j.string_of_query (pass, `JOB submission))
>>= fun () ->
Lwt_io.read ic
>>= fun json ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment