From e7b1fd8a22159c5e5981c3e48aa05cadbc6cbde5 Mon Sep 17 00:00:00 2001 From: Fardale Date: Fri, 25 Oct 2019 14:56:43 +0200 Subject: [PATCH] use a more compact representation of job on master --- src/jobq.ml | 27 +++++++-------- src/master.ml | 80 +++++++++++++++++++++++++------------------ src/serialization.atd | 6 ++-- src/submit.ml | 2 +- 4 files changed, 65 insertions(+), 50 deletions(-) diff --git a/src/jobq.ml b/src/jobq.ml index b48a4e1..94c53ca 100644 --- a/src/jobq.ml +++ b/src/jobq.ml @@ -21,30 +21,29 @@ let cmd id delete addr port pass () = if not delete then Lwt_io.read_line ic >>= fun json -> + let open PrintBox in match CCResult.guard (fun () -> Serialization_j.query_data_of_string json) with | Ok (`JOBS jobs) -> - let current_id = ref (-1) in - List.filter - (fun j -> - if !current_id != fst j.id then ( - current_id := fst j.id ; - true ) - else false) - jobs - |> Array.of_list + Array.of_list jobs |> fun jobs -> Array.init (Array.length jobs + 1) (fun i -> - if i = 0 then [|PrintBox.text "ID"; PrintBox.text "ARG"|] + if i = 0 then + [| text "ID" + ; text "Name" + ; text "Iteration(current)" + ; text "ARG" |] else - [| PrintBox.sprintf "%i" (fst jobs.(i - 1).id) - ; PrintBox.text (String.concat " " jobs.(i - 1).args) - |]) - |> PrintBox.grid |> PrintBox.frame + [| sprintf "%i" jobs.(i - 1).id + ; text jobs.(i - 1).name + ; sprintf "%i(%i)" jobs.(i - 1).iteration + jobs.(i - 1).current + ; text (String.concat " " jobs.(i - 1).args) |]) + |> grid |> frame |> PrintBox_text.output stdout ; Format.printf "@." ; Lwt.return_unit diff --git a/src/master.ml b/src/master.ml index 8f9dfca..88420df 100644 --- a/src/master.ml +++ b/src/master.ml @@ -7,7 +7,7 @@ module SHashtbl = CCHashtbl.Make (CCString) let nodes : (node * computation list) SHashtbl.t = SHashtbl.create 10 -let jobs : computation CCDeque.t = CCDeque.create () +let jobs : job CCDeque.t = CCDeque.create () let jobs_id = ref 0 @@ -27,45 +27,60 @@ let string_of_sockaddr = function let end_job id sockaddr = let n, l = SHashtbl.find nodes (string_of_sockaddr sockaddr) in - let j = List.find (fun c -> c.id = id) l in + let j = List.find (fun (c : computation) -> c.id = id) l in SHashtbl.replace nodes (string_of_sockaddr sockaddr) ( {n with cpu= n.cpu + j.cpu; ram= n.ram + j.ram} - , CCList.remove_one ~eq:(fun c1 c2 -> c1.id = c2.id) j l ) ; + , CCList.remove_one ~eq:(fun (c1 : computation) c2 -> c1.id = c2.id) j l ) ; Lwt.return_unit -let rec launch_job pass () = +let rec launch_job port pass () = if not (CCDeque.is_empty jobs) then ( - let j = CCDeque.take_front jobs in + let job = CCDeque.peek_front jobs in match SHashtbl.fold - (fun k (n, l) node -> + (fun k ((n : node), l) node -> match node with | Some x -> Some x | None -> - if n.cpu >= j.cpu && n.ram >= j.ram then Some (k, n, l) else None) + if n.cpu >= job.cpu && n.ram >= job.ram then Some (k, n, l) + else None) nodes None with | None -> - CCDeque.push_front jobs j ; Logs_lwt.debug (fun m -> m "No free node") | Some (k, n, l) -> let sockaddr = Unix.ADDR_INET (Unix.inet_addr_of_string n.addr, n.port) in + let computation = + { id= (job.id, job.current) + ; env= + [ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" job.current + ; Printf.sprintf "OCLUSTER_TASK_ID=%i" job.id ] + ; script= job.script + ; args= job.args + ; time= job.time + ; port + ; cpu= job.cpu + ; ram= job.ram } + in SHashtbl.replace nodes k - ({n with cpu= n.cpu - j.cpu; ram= n.ram - j.ram}, j :: l) ; + ( {n with cpu= n.cpu - computation.cpu; ram= n.ram - computation.ram} + , computation :: l ) ; + if job.current + 1 < job.iteration then job.current <- job.current + 1 + else ignore (CCDeque.take_front jobs) ; Logs_lwt.debug (fun m -> - m "Send computation %i,%i to %s on %i" (fst j.id) (snd j.id) n.addr - n.port) + m "Send computation %i,%i to %s on %i" (fst computation.id) + (snd computation.id) n.addr n.port) >>= fun () -> Lwt_io.with_connection sockaddr (fun (_ic, oc) -> Lwt_io.write_line oc - (Serialization_j.string_of_query (pass, `COMPUTATION j))) + (Serialization_j.string_of_query (pass, `COMPUTATION computation))) (* TODO: check return value *) >>= Lwt.pause - >>= launch_job pass ) + >>= launch_job port pass ) else Logs_lwt.debug (fun m -> m "No computation") let server_handler pass port sockaddr (ic, oc) = @@ -108,7 +123,7 @@ let server_handler pass port sockaddr (ic, oc) = (Unix.time ()) (string_of_ret_code result.ret_code))) ; end_job result.id sockaddr >>= Lwt.pause - >>= launch_job pass ])) + >>= launch_job port pass ])) | `JOB submission -> Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok) >>= (fun () -> Lwt_io.flush oc) @@ -117,23 +132,21 @@ let server_handler pass port sockaddr (ic, oc) = (Serialization_j.string_of_submission submission)) >>= fun () -> let empty = CCDeque.is_empty jobs in - for i = 0 to submission.iteration - 1 do - let computation = - { id= (!jobs_id, i) - ; env= - [ Printf.sprintf "OCLUSTER_ARRAY_TASK_ID=%i" i - ; Printf.sprintf "OCLUSTER_TASK_ID=%i" !jobs_id ] - ; script= submission.script - ; args= submission.args - ; time= submission.time - ; port - ; cpu= submission.cpu - ; ram= submission.ram } - in - CCDeque.push_back jobs computation - done ; + let job = + { id= !jobs_id + ; name = submission.name + ; current= 0 + ; iteration= submission.iteration + ; script= submission.script + ; args= submission.args + ; time= submission.time + ; port + ; cpu= submission.cpu + ; ram= submission.ram } + in + CCDeque.push_back jobs job ; incr jobs_id ; - if empty then launch_job pass () else Lwt.return_unit + if empty then launch_job port pass () else Lwt.return_unit | `JOBQ (id, delete) as jobq -> Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok) >>= (fun () -> Lwt_io.flush oc) @@ -143,10 +156,10 @@ let server_handler pass port sockaddr (ic, oc) = >>= fun () -> if delete then ( let last_id = - try fst (CCDeque.peek_back jobs).id with CCDeque.Empty -> 0 + try (CCDeque.peek_back jobs).id with CCDeque.Empty -> 0 in let id = CCOpt.get_or ~default:last_id id in - CCDeque.filter_in_place jobs (fun j -> fst j.id != id) ; + CCDeque.filter_in_place jobs (fun j -> j.id != id) ; Logs_lwt.info (fun m -> m "Removing jobs with id %i" id) ) else (let jobs_list = @@ -154,7 +167,8 @@ let server_handler pass port sockaddr (ic, oc) = | None -> CCDeque.to_list jobs | Some id -> - CCDeque.(filter (fun j -> fst j.id = id) jobs |> to_list) + CCDeque.( + filter (fun (j : job) -> j.id = id) jobs |> to_list) in Lwt_io.write_line oc (Serialization_j.string_of_query_data (`JOBS jobs_list)) diff --git a/src/serialization.atd b/src/serialization.atd index 513ddc4..e27823b 100644 --- a/src/serialization.atd +++ b/src/serialization.atd @@ -3,18 +3,20 @@ type node = {addr: string; ~port : int; cpu: int; ram: int type master_conf = {pass: string; ~port : int; nodes: node list} type node_conf = {pass: string; ~port : int} +type job = {id: int; name: string; current : int; iteration: int; script: string; args: string list; time: float option; port: int; cpu: int; ram: int} + type computation = {id: (int * int); env: string list; script: string; args: string list; time: float option; port: int; cpu: int; ram: int} type ret_code = [WEXITED of int | WSIGNALED of int | WSTOPPED of int] -type submission = {script: string; args: string list; time: float option; iteration: int ; +type submission = {name: string; script: string; args: string list; time: float option; iteration: int ; cpu: int ; ram: int ;} type result = {id: (int * int); stdout: string; stderr: string; ret_code: ret_code} type query_data = [ COMPUTATION of computation | JOBQ of (int option * bool) - | RESULT of result | JOB of submission | JOBS of (computation list) ] + | RESULT of result | JOB of submission | JOBS of (job list) ] type query = (string * query_data) diff --git a/src/submit.ml b/src/submit.ml index 73a4681..c178e52 100644 --- a/src/submit.ml +++ b/src/submit.ml @@ -3,7 +3,7 @@ open Serialization_t let cmd cpu ram time iteration port script_file pass addr args () = let script = CCIO.with_in script_file CCIO.read_all in - let submission = {script; args; time; iteration; cpu; ram} in + let submission = {name = script_file; script; args; time; iteration; cpu; ram} in Lwt_io.with_connection (Lwt_unix.ADDR_INET (Unix.inet_addr_of_string addr, port)) (fun (ic, oc) -> -- GitLab