Commit e7b1fd8a authored by Fardale's avatar Fardale

use a more compact representation of job on master

parent bdaeb637
......@@ -21,30 +21,29 @@ let cmd id delete addr port pass () =
if not delete then
Lwt_io.read_line ic
>>= fun json ->
let open PrintBox in
match
CCResult.guard (fun () ->
Serialization_j.query_data_of_string json)
with
| Ok (`JOBS jobs) ->
let current_id = ref (-1) in
List.filter
(fun j ->
if !current_id != fst j.id then (
current_id := fst j.id ;
true )
else false)
jobs
|> Array.of_list
Array.of_list jobs
|> fun jobs ->
Array.init
(Array.length jobs + 1)
(fun i ->
if i = 0 then [|PrintBox.text "ID"; PrintBox.text "ARG"|]
if i = 0 then
[| text "ID"
; text "Name"
; text "Iteration(current)"
; text "ARG" |]
else
[| PrintBox.sprintf "%i" (fst jobs.(i - 1).id)
; PrintBox.text (String.concat " " jobs.(i - 1).args)
|])
|> PrintBox.grid |> PrintBox.frame
[| sprintf "%i" jobs.(i - 1).id
; text jobs.(i - 1).name
; sprintf "%i(%i)" jobs.(i - 1).iteration
jobs.(i - 1).current
; text (String.concat " " jobs.(i - 1).args) |])
|> grid |> frame
|> PrintBox_text.output stdout ;
Format.printf "@." ;
Lwt.return_unit
......
......@@ -7,7 +7,7 @@ module SHashtbl = CCHashtbl.Make (CCString)
let nodes : (node * computation list) SHashtbl.t = SHashtbl.create 10
let jobs : computation CCDeque.t = CCDeque.create ()
let jobs : job CCDeque.t = CCDeque.create ()
let jobs_id = ref 0
......@@ -27,45 +27,60 @@ let string_of_sockaddr = function
let end_job id sockaddr =
let n, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in
let j = List.find (fun c -> c.id = id) l in
let j = List.find (fun (c : computation) -> c.id = id) l in
SHashtbl.replace nodes
(string_of_sockaddr sockaddr)
( {n with cpu= n.cpu + j.cpu; ram= n.ram + j.ram}
, CCList.remove_one ~eq:(fun c1 c2 -> c1.id = c2.id) j l ) ;
, CCList.remove_one ~eq:(fun (c1 : computation) c2 -> c1.id = c2.id) j l ) ;
Lwt.return_unit
let rec launch_job pass () =
let rec launch_job port pass () =
if not (CCDeque.is_empty jobs) then (
let j = CCDeque.take_front jobs in
let job = CCDeque.peek_front jobs in
match
SHashtbl.fold
(fun k (n, l) node ->
(fun k ((n : node), l) node ->
match node with
| Some x ->
Some x
| None ->
if n.cpu >= j.cpu && n.ram >= j.ram then Some (k, n, l) else None)
if n.cpu >= job.cpu && n.ram >= job.ram then Some (k, n, l)
else None)
nodes None
with
| None ->
CCDeque.push_front jobs j ;
Logs_lwt.debug (fun m -> m "No free node")
| Some (k, n, l) ->
let sockaddr =
Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port)
in
let computation =
{ id= (job.id, job.current)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" job.current
; Printf.sprintf "OCLUSTER_TASK_ID=%i" job.id ]
; script= job.script
; args= job.args
; time= job.time
; port
; cpu= job.cpu
; ram= job.ram }
in
SHashtbl.replace nodes k
({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ;
( {n with cpu= n.cpu - computation.cpu; ram= n.ram - computation.ram}
, computation :: l ) ;
if job.current + 1 < job.iteration then job.current <- job.current + 1
else ignore (CCDeque.take_front jobs) ;
Logs_lwt.debug (fun m ->
m "Send computation %i,%i to %s on %i" (fst j.id) (snd j.id) n.addr
n.port)
m "Send computation %i,%i to %s on %i" (fst computation.id)
(snd computation.id) n.addr n.port)
>>= fun () ->
Lwt_io.with_connection sockaddr (fun (_ic, oc) ->
Lwt_io.write_line oc
(Serialization_j.string_of_query (pass, `COMPUTATION j)))
(Serialization_j.string_of_query (pass, `COMPUTATION computation)))
(* TODO: check return value *)
>>= Lwt.pause
>>= launch_job pass )
>>= launch_job port pass )
else Logs_lwt.debug (fun m -> m "No computation")
let server_handler pass port sockaddr (ic, oc) =
......@@ -108,7 +123,7 @@ let server_handler pass port sockaddr (ic, oc) =
(Unix.time ())
(string_of_ret_code result.ret_code)))
; end_job result.id sockaddr >>= Lwt.pause
>>= launch_job pass ]))
>>= launch_job port pass ]))
| `JOB submission ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
......@@ -117,23 +132,21 @@ let server_handler pass port sockaddr (ic, oc) =
(Serialization_j.string_of_submission submission))
>>= fun () ->
let empty = CCDeque.is_empty jobs in
for i = 0 to submission.iteration - 1 do
let computation =
{ id= (!jobs_id, i)
; env=
[ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i
; Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id ]
; script= submission.script
; args= submission.args
; time= submission.time
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs computation
done ;
let job =
{ id= !jobs_id
; name = submission.name
; current= 0
; iteration= submission.iteration
; script= submission.script
; args= submission.args
; time= submission.time
; port
; cpu= submission.cpu
; ram= submission.ram }
in
CCDeque.push_back jobs job ;
incr jobs_id ;
if empty then launch_job pass () else Lwt.return_unit
if empty then launch_job port pass () else Lwt.return_unit
| `JOBQ (id, delete) as jobq ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
......@@ -143,10 +156,10 @@ let server_handler pass port sockaddr (ic, oc) =
>>= fun () ->
if delete then (
let last_id =
try fst (CCDeque.peek_back jobs).id with CCDeque.Empty -> 0
try (CCDeque.peek_back jobs).id with CCDeque.Empty -> 0
in
let id = CCOpt.get_or ~default:last_id id in
CCDeque.filter_in_place jobs (fun j -> fst j.id != id) ;
CCDeque.filter_in_place jobs (fun j -> j.id != id) ;
Logs_lwt.info (fun m -> m "Removing jobs with id %i" id) )
else
(let jobs_list =
......@@ -154,7 +167,8 @@ let server_handler pass port sockaddr (ic, oc) =
| None ->
CCDeque.to_list jobs
| Some id ->
CCDeque.(filter (fun j -> fst j.id = id) jobs |> to_list)
CCDeque.(
filter (fun (j : job) -> j.id = id) jobs |> to_list)
in
Lwt_io.write_line oc
(Serialization_j.string_of_query_data (`JOBS jobs_list))
......
......@@ -3,18 +3,20 @@ type node = {addr: string; ~port <ocaml default="4242">: int; cpu: int; ram: int
type master_conf = {pass: string; ~port <ocaml default="4242">: int; nodes: node list}
type node_conf = {pass: string; ~port <ocaml default="4242">: int}
type job = {id: int; name: string; current <ocaml mutable>: int; iteration: int; script: string; args: string list; time: float option; port: int; cpu: int; ram: int}
type computation = {id: (int * int); env: string list; script: string; args: string list; time: float option; port: int; cpu: int; ram: int}
type ret_code = [WEXITED of int | WSIGNALED of int | WSTOPPED of int]
type submission = {script: string; args: string list; time: float option; iteration: int <ocaml default="1">;
type submission = {name: string; script: string; args: string list; time: float option; iteration: int <ocaml default="1">;
cpu: int <ocaml default="1">; ram: int <ocaml default="1024">;}
type result =
{id: (int * int); stdout: string; stderr: string; ret_code: ret_code}
type query_data = [ COMPUTATION of computation | JOBQ of (int option * bool)
| RESULT of result | JOB of submission | JOBS of (computation list) ]
| RESULT of result | JOB of submission | JOBS of (job list) ]
type query = (string * query_data)
......
......@@ -3,7 +3,7 @@ open Serialization_t
let cmd cpu ram time iteration port script_file pass addr args () =
let script = CCIO.with_in script_file CCIO.read_all in
let submission = {script; args; time; iteration; cpu; ram} in
let submission = {name = script_file; script; args; time; iteration; cpu; ram} in
Lwt_io.with_connection
(Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
(fun (ic, oc) ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment