Commit 19aab08e authored by Fardale's avatar Fardale

Correct the computation of the load on the node

parent ee613004
......@@ -16,6 +16,6 @@
(name ocluster)
(public_name ocluster)
(libraries containers containers.data atdgen lwt.unix lwt cmdliner logs.lwt
logs.fmt logs.cli fmt.cli fmt.tty printbox)
logs.fmt logs.cli fmt.cli fmt.tty printbox str)
(preprocess
(pps lwt_ppx)))
......@@ -132,7 +132,7 @@ let rec launch_job server_port pass () =
else CCDeque.remove_front jobs;
Lwt.return_unit )
>>= fun () ->
Lwt_unix.sleep 20. >>= fun () ->
Lwt_unix.sleep 2. >>= fun () ->
mutex := true;
launch_job server_port pass () ) )
else Lwt.return_unit
......@@ -188,7 +188,7 @@ let server_handler pass port sockaddr (ic, oc) =
<&> Logs_lwt.info (fun m ->
m "Receive submission: %s"
(Serialization_j.string_of_submission submission))
>>= fun () ->
>|= fun () ->
let empty = CCDeque.is_empty jobs in
let job =
{
......@@ -206,7 +206,7 @@ let server_handler pass port sockaddr (ic, oc) =
in
CCDeque.push_back jobs job;
incr jobs_id;
if empty then launch_job port pass () else Lwt.return_unit
if empty then Lwt.async (launch_job port pass)
| `JOBQ (id, delete) as jobq ->
Lwt_io.write_line oc (Serialization_j.string_of_answer `Ok)
>>= (fun () -> Lwt_io.flush oc)
......
......@@ -3,11 +3,13 @@ open Serialization_t
let max_std = 5120000 (* 5Mo *)
let current_load = ref 0.
let get_ram () =
CCIO.(with_in "/proc/meminfo" read_lines_l)
|> CCList.find_map (fun l ->
if CCString.prefix ~pre:"MemAvailable" l then
match String.split_on_char ':' l with
Lwt_io.lines_of_file "/proc/meminfo"
|> Lwt_stream.find_map (fun line ->
if CCString.prefix ~pre:"MemAvailable" line then
match String.split_on_char ':' line with
| [ _; ram ] ->
Option.bind
(String.split_on_char ' ' (String.trim ram) |> CCList.head_opt)
......@@ -15,30 +17,61 @@ let get_ram () =
| _ -> None
else None)
let get_load () =
Option.bind
CCIO.(with_in "/proc/loadavg" read_line)
(fun s ->
match String.split_on_char ' ' s with
| v :: _ -> Some (CCFloat.of_string_exn v)
| _ -> None)
let get_stat () =
let re = Str.regexp {|^cpu[0-9][0-9]* \(.*\)|} in
Lwt_io.lines_of_file "/proc/stat"
|> Lwt_stream.filter_map (fun line ->
if Str.string_match re line 0 then
Str.matched_group 1 line |> String.split_on_char ' '
|> CCList.foldi
(fun (sum, idle) i e ->
let e = int_of_string e in
(sum + e, if i = 3 then e else idle))
(0, 0)
|> CCOpt.return
else None)
|> Lwt_stream.to_list
let rec update_load stat_fst =
Lwt_unix.sleep 1. >>= fun () ->
get_stat () >>= fun stat ->
current_load :=
CCList.fold_left2
(fun load (total_fst, idle_fst) (total_snd, idle_snd) ->
load
+. float (total_snd - total_fst - idle_snd + idle_fst)
/. float (total_snd - total_fst))
0. stat_fst stat;
update_load stat
let get_loadavg () =
Lwt_io.(with_file ~mode:input "/proc/loadavg" read_line_opt)
>|= CCOpt.flat_map (fun s ->
match String.split_on_char ' ' s with
| v :: _ -> Some (CCFloat.of_string_exn v)
| _ -> None)
let get_cpu_count () =
CCIO.(with_in "/proc/cpuinfo" read_lines_l)
|> CCList.find_map (fun l ->
if CCString.prefix ~pre:"cpu cores" l then
match String.split_on_char ':' l with
Lwt_io.lines_of_file "/proc/cpuinfo"
|> Lwt_stream.find_map (fun line ->
if CCString.prefix ~pre:"cpu cores" line then
match String.split_on_char ':' line with
| [ _; ncpu ] -> String.trim ncpu |> CCInt.of_string
| _ -> None
else None)
let get_available_cpu () =
match (get_cpu_count (), get_load ()) with
| Some cpu_count, Some load -> cpu_count - CCFloat.(ceil (load +. 1.5) |> to_int)
get_cpu_count () >>= fun cpu_count ->
get_loadavg () >|= fun loadavg ->
match (cpu_count, loadavg) with
| Some cpu_count, Some loadavg ->
cpu_count
- CCFloat.(ceil (CCFloat.max loadavg !current_load +. 1.5) |> to_int)
| _ -> 0
let get_available_ram () =
match get_ram () with Some ram -> (ram / 1024) - 4096 | None -> 0
get_ram () >|= fun ram ->
match ram with Some ram -> (ram / 1024) - 4096 | None -> 0
let process_status_to_ret_code = function
| Unix.WEXITED c -> `WEXITED c
......@@ -135,9 +168,10 @@ let server_handler pass sockaddr (ic, oc) =
>|= fun () ->
Lwt.async (handle_computation sockaddr pass computation)
| `STAT ->
get_available_cpu () >>= fun cpu_count ->
get_available_ram () >>= fun ram ->
Lwt_io.write_line oc
(Serialization_j.string_of_stat
(get_available_cpu (), get_available_ram ()))
(Serialization_j.string_of_stat (cpu_count, ram))
>>= (fun () -> Lwt_io.flush oc)
<&> Logs_lwt.debug (fun m -> m "Receive stat command")
| _ -> Logs_lwt.warn (fun m -> m "Receive a unwanted command")
......@@ -161,10 +195,10 @@ let cmd config () =
let promise, resolver = Lwt.task () in
Logs_lwt.info (fun m -> m "Node at port %i with pass %s" conf.port conf.pass)
>>= fun () ->
Lwt_io.flush Lwt_io.stderr >>= fun () ->
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass)
( Lwt.async (fun () -> get_stat () >>= update_load);
Lwt_io.establish_server_with_client_address
(Unix.ADDR_INET (Unix.inet_addr_any, conf.port))
(server_handler conf.pass) )
>>= fun server ->
let _ = Lwt_unix.on_signal 15 (stop_server resolver server)
and _ = Lwt_unix.on_signal 2 (stop_server resolver server) in
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment