Commit b6246a99 authored by Fardale's avatar Fardale

Initial commit

# -*- mode: python; coding: utf-8 -*-
import os, sys, subprocess, time, signal
import argparse
from multiprocessing import Queue, Process, managers, Manager
from queue import Empty
from time import sleep
# Handle the sig, to end properly and save the result before dying
def sigterm_handler(_signo, _stack_frame):
# Raises SystemExit(0):
def execute(command, options, file_name=None):
Start command with options in a subprocess and return the result strinrg
if file_name:
with open(file_name) as f:
output = subprocess.check_output([command] + options, input =, universal_newlines = True)
output = subprocess.check_output([command] + options, universal_newlines = True)
return output
except subprocess.CalledProcessError:
print("Error with %s" % " ".join([command] + options), file=sys.stderr)
def find_file(path_dir):
Return the list of path of file that are in the directory and its
file_list = []
for directory in os.walk(path_dir):
for f in directory[2]:
file_list.append(os.path.abspath(directory[0] + "/" + f))
return file_list
def check_file(l):
Take a list of path and return the list of path that correspond to
a real file
file_list = []
for f in l:
if os.path.isfile(f):
print("Error with %s" % f, file = sys.stderr)
return file_list
def execute_workers(input_q, output_q):
A worker function to be launched in a separate process. Takes input from
input_q and compute the command on it. When this is done, the output is
placed into output_q. Runs until input_q is empty.
while True:
input_dic = input_q.get_nowait()
output = execute(input_dic["command"], input_dic["options"], input_dic["file_name"])
if output is not None:
output_q.put((input_dic["id"], output))
except Empty:
def mp_execute(share_input_q, share_output_q, nprocs):
Split the work into several processes. Launch each process with
execute as the worker function, and wait until all are finished.
if nprocs == 1:
execute_workers(share_input_q, share_output_q)
procs = []
for i in range(nprocs):
p = Process(target=execute_workers,
args=(share_input_q, share_output_q))
for p in procs:
def make_server_manager(port, authkey):
Create a manager for the server, listening on the given port.
Return a manager object with get_intput_q and get_output_q methods.
input_q = Queue()
output_q = Queue()
class QueueManager(managers.SyncManager):
QueueManager.register('get_input_q', callable=lambda: input_q)
QueueManager.register('get_output_q', callable=lambda: output_q)
manager = QueueManager(address=('', port), authkey=authkey)
print('Server started at port %s' % port, file= sys.stderr)
return manager
def runserver(command, options, portnum, authkey, file_list, stdin, fout):
Start a server, fill the queue with the list of input dic, receive the
Save the value obtient by in fout.
Return the value obtient in a dic.
# Start a shared manager server and access its queues
manager = make_server_manager(portnum, authkey)
shared_input_q = manager.get_input_q()
shared_output_q = manager.get_output_q()
N = len(file_list)
# Inputs are pushed into the input queue.
for f in file_list:
if stdin:
shared_input_q.put({"command":command,"options":options, "id":f, "file_name":f})
shared_input_q.put({"command":command,"options":options + [f], "id":f, "file_name":None})
# Wait until all results are ready in shared_output_q
numresults = 0
output_list = []
while numresults < N:
output = shared_output_q.get()
numresults += 1
# Sleep a bit before shutting down the server - to give clients time to
# realize the input queue is empty and exit in an orderly way.
# Capture the ctrl-C signal to allow user to stop the server without all the
# result (if a client crash)
save_result(output_list, fout)
print("Server stoped.", file=sys.stderr)
return output_list
def make_client_manager(ip, port, authkey):
Create a manager for a client. This manager connects to a server on the
given address and exposes the get_input_q and get_output_q methods for
accessing the shared queues from the server.
Return a manager object.
class ServerQueueManager(managers.SyncManager):
manager = ServerQueueManager(address=(ip, port), authkey=authkey)
print('Client connected to %s:%s' % (ip, port))
return manager
def runclient(ip, portnum, authkey, nproc):
Creater a client manager. Receive the shared queue for input and output.
And call mp_trace to do the calculation.
manager = make_client_manager(ip, portnum, authkey)
input_q = manager.get_input_q()
output_q = manager.get_output_q()
mp_execute(input_q, output_q, nproc)
print("Client stoped.")
def save_result(output_list, fout):
Take a list dic of result, sort it and save it in fout.
output_list.sort(key=lambda o:o[0])
for _, output in output_list:
if __name__ == "__main__":
# Parser option
PARSER = argparse.ArgumentParser(description="Run a programme on a list of input in parallel with a client/server architecture")
PARSER.add_argument("-o", "--output", type=argparse.FileType('w'), default=sys.stdout, help="Output file")
INPUT_GROUP = PARSER.add_mutually_exclusive_group(required=True)
INPUT_GROUP.add_argument("-d", "--dir", type=str, help="Directory of input file")
INPUT_GROUP.add_argument("-f", "--file", type=str, nargs="+", help="List of input file")
INPUT_GROUP.add_argument("-c", "--client", type=str, metavar="IP", help="Connect to the server with the IP and start the computation.")
PARSER.add_argument("-i", "--stdin", action="store_true", help="Write file on stdin")
PARSER.add_argument("-cmd", "--command", type=str, help="Programme to execute")
PARSER.add_argument("-opt", "--options", type=str, default=[], nargs="*", help="Options to give to the programme")
PARSER.add_argument("-s", "--server", action="store_true", help="Set up the server to use multiple computer.")
PARSER.add_argument("-p", "--port", type=int, default=2121, help="Port to use for server and client. Default:2121")
PARSER.add_argument("-k", "--key", type=str, default='pieP8cho', help="Authentification key to use for server and client connexion. Default:pieP8cho")
PARSER.add_argument("-m", "--multicore", type=int, default=1, choices=[i+1 for i in range(os.cpu_count())], help="Number of process to use for the computation. Valid for the client and simple computation.")
ARGS = PARSER.parse_args()
file_list = []
signal.signal(signal.SIGTERM, sigterm_handler)
if ARGS.dir is not None:
if not os.path.isdir(ARGS.dir):
print("Error: %s is not a directory" % ARGS.dir, file=sys.stderr)
file_list = find_file(ARGS.dir)
if ARGS.file is not None:
tmp_list = check_file(ARGS.file)
for i in range(ARGS.iter):
file_list += tmp_list
if ARGS.server:
runserver(ARGS.command, ARGS.options, ARGS.port, ARGS.key.encode(sys.getdefaultencoding()), file_list, ARGS.stdin, ARGS.output)
elif ARGS.client:
runclient(ARGS.client, ARGS.port, ARGS.key.encode(sys.getdefaultencoding()), ARGS.multicore)
resultdict = {}
# try:
# if ARGS.multicore == 1:
# for f in file_list:
# res = bench(f).split()
# if int(res[4]) in resultdict:
# resultdict[int(res[4])].append((float(res[8]),f))
# else:
# resultdict[int(res[4])] = [(float(res[8]),f)]
# else:
# manager = Manager()
# graph_q = manager.Queue()
# result_q = manager.Queue()
# for f in file_list:
# graph_q.put(f)
# mp_trace(graph_q, result_q, ARGS.multicore)
# while not result_q.empty():
# res = result_q.get()
# if res[0] in resultdict:
# resultdict[res[0]].append((res[1],res[2]))
# else:
# resultdict[res[0]] = [(res[1],res[2])]
# sleep(20)
# finally:
# print("Ending programme, saving result.", file=sys.stderr)
# save_result(resultdict,ARGS.output)
for i in {01..23};
if [ ! $(hostname) = $i ];
ssh $i.dptinfo $* &
for i in {01..23};
if [ ! $(hostname) = $i ];
rsync -e ssh -avz --delete-after $1$2 $i.local:$1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment