diff --git a/cpasswords/client.py b/cpasswords/client.py index 42776ce54208abc862b9c209e7f400442a3b2a51..83a9cdfb27ecd4ba4a9def983b4c90f5b8527c17 100755 --- a/cpasswords/client.py +++ b/cpasswords/client.py @@ -18,8 +18,6 @@ import tempfile import os import argparse import re -import time -import datetime import copy import logging from configparser import ConfigParser @@ -29,6 +27,9 @@ from secrets import token_urlsafe from paramiko.client import SSHClient from paramiko.ssh_exception import SSHException +# Import modules +from .gpg import decrypt, encrypt, receive_keys, list_keys, GPG_TRUSTLEVELS + # Configuration loading # On n'a pas encore accès à la config donc on devine le nom bootstrap_cmd_name = os.path.split(sys.argv[0])[1] @@ -39,8 +40,7 @@ config = ConfigParser() if not config.read(config_path + "/clientconfig.ini"): # If config could not be imported, display an error if required ducktape_display_error = sys.stderr.isatty() and \ - not any([opt in sys.argv for opt in ["-q", "--quiet"]]) and \ - __name__ == '__main__' + not any([opt in sys.argv for opt in ["-q", "--quiet"]]) if ducktape_display_error: # Do not use logger as it has not been initialized yet print("%s/clientconfig.ini could not be found or read.\n" @@ -48,201 +48,13 @@ if not config.read(config_path + "/clientconfig.ini"): "repository and customize." % config_path) exit(1) -# Logger local -log = logging.getLogger(bootstrap_cmd_name) +# Local logger +log = logging.getLogger(__name__) #: Pattern utilisé pour détecter la ligne contenant le mot de passe dans les fichiers pass_regexp = re.compile('[\t ]*pass(?:word)?[\t ]*:[\t ]*(.*)\r?\n?$', flags=re.IGNORECASE) -# GPG Definitions -#: Path du binaire gpg -GPG = 'gpg' - -#: Paramètres à fournir à gpg en fonction de l'action désirée -GPG_ARGS = { - 'decrypt': ['-d'], - 'encrypt': ['--armor', '-es'], - 'receive-keys': ['--recv-keys'], - 'list-keys': ['--list-keys', '--with-colons', '--fixed-list-mode', - '--with-fingerprint', '--with-fingerprint'], # Ce n'est pas une erreur. Il faut 2 --with-fingerprint pour avoir les fingerprints des subkeys. -} - -#: Mapping (lettre de trustlevel) -> (signification, faut-il faire confiance à la clé) -GPG_TRUSTLEVELS = { - "-": ("inconnue (pas de valeur assignée)", False), - "o": ("inconnue (nouvelle clé)", False), - "i": ("invalide (self-signature manquante ?)", False), - "n": ("nulle (il ne faut pas faire confiance à cette clé)", False), - "m": ("marginale (pas assez de lien de confiance vers cette clé)", False), - "f": ("entière (clé dans le réseau de confiance)", True), - "u": ("ultime (c'est probablement ta clé)", True), - "r": ("révoquée", False), - "e": ("expirée", False), - "q": ("non définie", False), -} - - -def gpg(options, command, args=None): - """Lance gpg pour la commande donnée avec les arguments - donnés. Renvoie son entrée standard et sa sortie standard.""" - full_command = [GPG] - full_command.extend(GPG_ARGS[command]) - if args: - full_command.extend(args) - if options.verbose: - stderr = sys.stderr - else: - stderr = subprocess.PIPE - full_command.extend(['--debug-level=1']) - - # Run gpg - log.info("Running `%s`" % " ".join(full_command)) - proc = subprocess.Popen(full_command, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=stderr, - close_fds=True) - if not options.verbose: - proc.stderr.close() - return proc.stdin, proc.stdout - - -def _parse_timestamp(string, canbenone=False): - """Interprète ``string`` comme un timestamp depuis l'Epoch.""" - if string == '' and canbenone: - return None - return datetime.datetime(*time.localtime(int(string))[:7]) - - -def _parse_pub(data): - """Interprète une ligne ``pub:``""" - d = { - 'trustletter': data[1], - 'length': int(data[2]), - 'algorithm': int(data[3]), - 'longid': data[4], - 'signdate': _parse_timestamp(data[5]), - 'expiredate': _parse_timestamp(data[6], canbenone=True), - 'ownertrustletter': data[8], - 'capabilities': data[11], - } - return d - - -def _parse_uid(data): - """Interprète une ligne ``uid:``""" - d = { - 'trustletter': data[1], - 'signdate': _parse_timestamp(data[5], canbenone=True), - 'hash': data[7], - 'uid': data[9], - } - return d - - -def _parse_fpr(data): - """Interprète une ligne ``fpr:``""" - d = { - 'fpr': data[9], - } - return d - - -def _parse_sub(data): - """Interprète une ligne ``sub:``""" - d = { - 'trustletter': data[1], - 'length': int(data[2]), - 'algorithm': int(data[3]), - 'longid': data[4], - 'signdate': _parse_timestamp(data[5]), - 'expiredate': _parse_timestamp(data[6], canbenone=True), - 'capabilities': data[11], - } - return d - - -#: Functions to parse the recognized fields -GPG_PARSERS = { - 'pub': _parse_pub, - 'uid': _parse_uid, - 'fpr': _parse_fpr, - 'sub': _parse_sub, -} - - -def parse_keys(gpgout, debug=False): - """Parse l'output d'un listing de clés gpg.""" - ring = {} - # Valeur utilisée pour dire "cet objet n'a pas encore été rencontré pendant le parsing" - init_value = "initialize" - current_pub = init_value - current_sub = init_value - for line in iter(gpgout.readline, ''): - # La doc dit que l'output est en UTF-8 «regardless of any --display-charset setting» - try: - line = line.decode("utf-8") - except UnicodeDecodeError: - try: - line = line.decode("iso8859-1") - except UnicodeDecodeError: - line = line.decode("iso8859-1", "ignore") - log.warning( - "gpg is telling shit, it is neither ISO-8859-1 nor UTF-8. Dropping!") - line = line.split(":") - field = line[0] - if field in GPG_PARSERS.keys(): - log.debug("begin loop, met %s :" % (field)) - log.debug("current_pub : %r" % current_pub) - log.debug("current_sub : %r" % current_sub) - try: - content = GPG_PARSERS[field](line) - except KeyError: - log.error("*** FAILED *** Line: %s", line) - raise - if field == "pub": - # Nouvelle clé - # On sauvegarde d'abord le dernier sub (si il y en a un) dans son pub parent - if current_sub != init_value: - current_pub["subkeys"].append(current_sub) - # Ensuite on sauve le pub précédent (si il y en a un) dans le ring - if current_pub != init_value: - ring[current_pub["fpr"]] = current_pub - # On place le nouveau comme pub courant - current_pub = content - # Par défaut, il n'a ni subkeys, ni uids - current_pub["subkeys"] = [] - current_pub["uids"] = [] - # On oublié l'éventuel dernier sub rencontré - current_sub = init_value - elif field == "fpr": - if current_sub != init_value: - # On a lu un sub depuis le dernier pub, donc le fingerprint est celui du dernier sub rencontré - current_sub["fpr"] = content["fpr"] - else: - # Alors c'est le fingerprint du pub - current_pub["fpr"] = content["fpr"] - elif field == "uid": - current_pub["uids"].append(content) - elif field == "sub": - # Nouvelle sous-clé - # D'abord on sauvegarde la précédente (si il y en a une) dans son pub parent - if current_sub != init_value: - current_pub["subkeys"].append(current_sub) - # On place la nouvelle comme sub courant - current_sub = content - log.debug("current_pub : %r" % current_pub) - log.debug("current_sub : %r" % current_sub) - log.debug("parsed object : %r" % content) - # À la fin, il faut sauvegarder les derniers (sub, pub) rencontrés, - # parce que leur sauvegarde n'a pas encore été déclenchée - if current_sub != init_value: - current_pub["subkeys"].append(current_sub) - if current_pub != init_value: - ring[current_pub["fpr"]] = current_pub - return ring - class simple_memoize(object): """ Memoization/Lazy """ @@ -270,7 +82,7 @@ class simple_memoize(object): # Remote commands @simple_memoize -def create_ssh_client(): +def create_ssh_client(options): """ Create a SSH client with paramiko module """ @@ -295,7 +107,7 @@ def remote_command(options, command, arg=None, stdin_contents=None): """ Execute remote command and return output """ - client = create_ssh_client() + client = create_ssh_client(options) # Build command if "remote_cmd" not in options.serverdata: @@ -372,7 +184,7 @@ def put_files(options, files): return remote_command(options, "putfiles", stdin_contents=files) -def rm_file(filename): +def rm_file(options, filename): """Supprime le fichier sur le serveur distant""" return remote_command(options, "rmfile", filename) @@ -403,8 +215,7 @@ def update_keys(options): keys = all_keys(options) - _, stdout = gpg(options, "receive-keys", - [key for _, key in keys.values() if key]) + _, stdout = receive_keys([key for _, key in keys.values() if key]) return stdout.read().decode("utf-8") @@ -450,8 +261,7 @@ def check_keys(options, recipients=None, quiet=False): keys = {u: val for (u, val) in keys.iteritems() if u in recipients} if speak: print("M : le mail correspond à un uid du fingerprint\nC : confiance OK (inclut la vérification de non expiration).\n") - _, gpgout = gpg(options, 'list-keys') - localring = parse_keys(gpgout) + localring = list_keys() for (recipient, (mail, fpr)) in keys.iteritems(): failed = "" if fpr is not None: @@ -516,7 +326,7 @@ def get_dest_of_roles(options, roles): for rec in get_recipients_of_roles(options, roles) if allkeys[rec][1]] -def encrypt(options, roles, contents): +def my_encrypt(options, roles, contents): """Chiffre le contenu pour les roles donnés""" allkeys = all_keys(options) @@ -529,28 +339,18 @@ def encrypt(options, roles, contents): fpr_recipients.append("-r") fpr_recipients.append(fpr) - stdin, stdout = gpg(options, "encrypt", fpr_recipients) - stdin.write(contents.encode("utf-8")) - stdin.close() - out = stdout.read().decode("utf-8") + out = encrypt(contents, fpr_recipients) + if out == '': - return [False, "Échec de chiffrement"] + return False, "Échec de chiffrement" else: - return [True, out] - - -def decrypt(options, contents): - """Déchiffre le contenu""" - stdin, stdout = gpg(options, "decrypt") - stdin.write(contents.encode("utf-8")) - stdin.close() - return stdout.read().decode("utf-8") + return True, out def put_password(options, roles, contents): """Dépose le mot de passe après l'avoir chiffré pour les destinataires dans ``roles``.""" - success, enc_pwd_or_error = encrypt(options, roles, contents) + success, enc_pwd_or_error = my_encrypt(options, roles, contents) if success: enc_pwd = enc_pwd_or_error return put_files(options, [{'filename': options.fname, 'roles': roles, 'contents': enc_pwd}])[0] @@ -676,7 +476,6 @@ def show_file(options): log.warn(value) # value contient le message d'erreur return passfile = value - (sin, sout) = gpg(options, 'decrypt') content = passfile['contents'] # Kludge (broken db ?) @@ -685,9 +484,7 @@ def show_file(options): content = content[-1] # Déchiffre le contenu - sin.write(content.encode("utf-8")) - sin.close() - texte = sout.read().decode("utf-8") + texte = decrypt(content) # Est-ce une clé ssh ? is_key = texte.startswith('-----BEGIN RSA PRIVATE KEY-----') @@ -782,15 +579,12 @@ Enregistrez le fichier vide pour annuler.\n""" return else: passfile = value - (sin, sout) = gpg(options, 'decrypt') contents = passfile['contents'] # <ducktape> (waddle waddle) if isinstance(contents, list): contents = contents[-1] # </ducktape> - sin.write(contents.encode("utf-8")) - sin.close() - texte = sout.read().decode("utf-8") + texte = decrypt(contents) if new_roles is None: new_roles = passfile['roles'] @@ -865,7 +659,7 @@ def remove_file(options): fname = options.fname if not confirm(options, 'Êtes-vous sûr de vouloir supprimer %s ?' % (fname,)): return - message = rm_file(fname) + message = rm_file(options, fname) print(message) @@ -923,7 +717,7 @@ def recrypt_files(options, strict=False): # On rechiffre to_put = [{'filename': f['filename'], 'roles': f['roles'], - 'contents': encrypt(options, f['roles'], decrypt(options, f['contents']))[-1]} + 'contents': my_encrypt(options, f['roles'], decrypt(f['contents']))[-1]} for [success, f] in files] if to_put: if not options.quiet: diff --git a/cpasswords/gpg.py b/cpasswords/gpg.py new file mode 100644 index 0000000000000000000000000000000000000000..8f1f8310d4786983974305da9f79f5e509525a6d --- /dev/null +++ b/cpasswords/gpg.py @@ -0,0 +1,223 @@ +""" +GPG binding + +Copyright (C) 2010-2020 Cr@ns <roots@crans.org> +Authors : Daniel Stan <daniel.stan@crans.org> + Vincent Le Gallic <legallic@crans.org> + Alexandre Iooss <erdnaxe@crans.org> +SPDX-License-Identifier: GPL-3.0-or-later +""" + +import sys +import subprocess +import logging +import time +import datetime + +# Local logger +log = logging.getLogger(__name__) + +# GPG Definitions +#: Path du binaire gpg +GPG = 'gpg' + +#: Mapping (lettre de trustlevel) -> (signification, faut-il faire confiance à la clé) +GPG_TRUSTLEVELS = { + "-": ("inconnue (pas de valeur assignée)", False), + "o": ("inconnue (nouvelle clé)", False), + "i": ("invalide (self-signature manquante ?)", False), + "n": ("nulle (il ne faut pas faire confiance à cette clé)", False), + "m": ("marginale (pas assez de lien de confiance vers cette clé)", False), + "f": ("entière (clé dans le réseau de confiance)", True), + "u": ("ultime (c'est probablement ta clé)", True), + "r": ("révoquée", False), + "e": ("expirée", False), + "q": ("non définie", False), +} + + +def _parse_timestamp(string, canbenone=False): + """Interprète ``string`` comme un timestamp depuis l'Epoch.""" + if string == '' and canbenone: + return None + return datetime.datetime(*time.localtime(int(string))[:7]) + + +def _parse_pub(data): + """Interprète une ligne ``pub:``""" + d = { + 'trustletter': data[1], + 'length': int(data[2]), + 'algorithm': int(data[3]), + 'longid': data[4], + 'signdate': _parse_timestamp(data[5]), + 'expiredate': _parse_timestamp(data[6], canbenone=True), + 'ownertrustletter': data[8], + 'capabilities': data[11], + } + return d + + +def _parse_uid(data): + """Interprète une ligne ``uid:``""" + d = { + 'trustletter': data[1], + 'signdate': _parse_timestamp(data[5], canbenone=True), + 'hash': data[7], + 'uid': data[9], + } + return d + + +def _parse_fpr(data): + """Interprète une ligne ``fpr:``""" + d = { + 'fpr': data[9], + } + return d + + +def _parse_sub(data): + """Interprète une ligne ``sub:``""" + d = { + 'trustletter': data[1], + 'length': int(data[2]), + 'algorithm': int(data[3]), + 'longid': data[4], + 'signdate': _parse_timestamp(data[5]), + 'expiredate': _parse_timestamp(data[6], canbenone=True), + 'capabilities': data[11], + } + return d + + +#: Functions to parse the recognized fields +GPG_PARSERS = { + 'pub': _parse_pub, + 'uid': _parse_uid, + 'fpr': _parse_fpr, + 'sub': _parse_sub, +} + + +def _parse_keys(gpgout, debug=False): + """ + Parse l'output d'un listing de clés gpg. + TODO: à déméler et simplifier + """ + ring = {} + # Valeur utilisée pour dire "cet objet n'a pas encore été rencontré pendant le parsing" + init_value = "initialize" + current_pub = init_value + current_sub = init_value + for line in iter(gpgout.readline, ''): + # La doc dit que l'output est en UTF-8 «regardless of any --display-charset setting» + try: + line = line.decode("utf-8") + except UnicodeDecodeError: + try: + line = line.decode("iso8859-1") + except UnicodeDecodeError: + line = line.decode("iso8859-1", "ignore") + log.warning( + "gpg is telling shit, it is neither ISO-8859-1 nor UTF-8. Dropping!") + line = line.split(":") + field = line[0] + if field in GPG_PARSERS.keys(): + log.debug("begin loop, met %s :" % (field)) + log.debug("current_pub : %r" % current_pub) + log.debug("current_sub : %r" % current_sub) + try: + content = GPG_PARSERS[field](line) + except KeyError: + log.error("*** FAILED *** Line: %s", line) + raise + if field == "pub": + # Nouvelle clé + # On sauvegarde d'abord le dernier sub (si il y en a un) dans son pub parent + if current_sub != init_value: + current_pub["subkeys"].append(current_sub) + # Ensuite on sauve le pub précédent (si il y en a un) dans le ring + if current_pub != init_value: + ring[current_pub["fpr"]] = current_pub + # On place le nouveau comme pub courant + current_pub = content + # Par défaut, il n'a ni subkeys, ni uids + current_pub["subkeys"] = [] + current_pub["uids"] = [] + # On oublié l'éventuel dernier sub rencontré + current_sub = init_value + elif field == "fpr": + if current_sub != init_value: + # On a lu un sub depuis le dernier pub, donc le fingerprint est celui du dernier sub rencontré + current_sub["fpr"] = content["fpr"] + else: + # Alors c'est le fingerprint du pub + current_pub["fpr"] = content["fpr"] + elif field == "uid": + current_pub["uids"].append(content) + elif field == "sub": + # Nouvelle sous-clé + # D'abord on sauvegarde la précédente (si il y en a une) dans son pub parent + if current_sub != init_value: + current_pub["subkeys"].append(current_sub) + # On place la nouvelle comme sub courant + current_sub = content + log.debug("current_pub : %r" % current_pub) + log.debug("current_sub : %r" % current_sub) + log.debug("parsed object : %r" % content) + # À la fin, il faut sauvegarder les derniers (sub, pub) rencontrés, + # parce que leur sauvegarde n'a pas encore été déclenchée + if current_sub != init_value: + current_pub["subkeys"].append(current_sub) + if current_pub != init_value: + ring[current_pub["fpr"]] = current_pub + return ring + + +def _gpg(*argv): + """ + Run GPG and return its standard input and output. + """ + full_command = [GPG] + list(argv) + log.info("Running `%s`" % " ".join(full_command)) + proc = subprocess.Popen( + full_command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=sys.stderr, + close_fds=True, + ) + return proc.stdin, proc.stdout + + +def decrypt(content): + """ + Return decrypted content + """ + stdin, stdout = _gpg('-d') + stdin.write(content.encode("utf-8")) + stdin.close() + return stdout.read().decode("utf-8") + + +def encrypt(content: str, fingerprints: [str]) -> str: + """ + Return encrypted content for fingerprints + """ + stdin, stdout = _gpg('--armor', '-es', *fingerprints) + stdin.write(content.encode("utf-8")) + stdin.close() + return stdout.read().decode("utf-8") + + +def receive_keys(keys): + return _gpg('--recv-keys', *keys) + + +def list_keys(): + # It is not an error, you need two --with-fingerprint + # to get subkeys fingerprints. + _, out = _gpg('--list-keys', '--with-colons', '--fixed-list-mode', + '--with-fingerprint', '--with-fingerprint') + return _parse_keys(out)