Skip to content
Snippets Groups Projects
gpg.py 7.03 KiB
Newer Older
"""
GPG binding

Copyright (C) 2010-2020 Cr@ns <roots@crans.org>
Authors : Daniel Stan <daniel.stan@crans.org>
          Vincent Le Gallic <legallic@crans.org>
          Alexandre Iooss <erdnaxe@crans.org>
SPDX-License-Identifier: GPL-3.0-or-later
"""

import sys
import subprocess
import logging
import time
import datetime

# Local logger
log = logging.getLogger(__name__)

# GPG Definitions
#: Path du binaire gpg
GPG = 'gpg'

#: Mapping (lettre de trustlevel) -> (signification, faut-il faire confiance à la clé)
GPG_TRUSTLEVELS = {
    "-": ("inconnue (pas de valeur assignée)", False),
    "o": ("inconnue (nouvelle clé)", False),
    "i": ("invalide (self-signature manquante ?)", False),
    "n": ("nulle (il ne faut pas faire confiance à cette clé)", False),
    "m": ("marginale (pas assez de lien de confiance vers cette clé)", False),
    "f": ("entière (clé dans le réseau de confiance)", True),
    "u": ("ultime (c'est probablement ta clé)", True),
    "r": ("révoquée", False),
    "e": ("expirée", False),
    "q": ("non définie", False),
}


def _parse_timestamp(string, canbenone=False):
    """Interprète ``string`` comme un timestamp depuis l'Epoch."""
    if string == '' and canbenone:
        return None
    return datetime.datetime(*time.localtime(int(string))[:7])


def _parse_pub(data):
    """Interprète une ligne ``pub:``"""
    d = {
        'trustletter': data[1],
        'length': int(data[2]),
        'algorithm': int(data[3]),
        'longid': data[4],
        'signdate': _parse_timestamp(data[5]),
        'expiredate': _parse_timestamp(data[6], canbenone=True),
        'ownertrustletter': data[8],
        'capabilities': data[11],
    }
    return d


def _parse_uid(data):
    """Interprète une ligne ``uid:``"""
    d = {
        'trustletter': data[1],
        'signdate': _parse_timestamp(data[5], canbenone=True),
        'hash': data[7],
        'uid': data[9],
    }
    return d


def _parse_fpr(data):
    """Interprète une ligne ``fpr:``"""
    d = {
        'fpr': data[9],
    }
    return d


def _parse_sub(data):
    """Interprète une ligne ``sub:``"""
    d = {
        'trustletter': data[1],
        'length': int(data[2]),
        'algorithm': int(data[3]),
        'longid': data[4],
        'signdate': _parse_timestamp(data[5]),
        'expiredate': _parse_timestamp(data[6], canbenone=True),
        'capabilities': data[11],
    }
    return d


#: Functions to parse the recognized fields
GPG_PARSERS = {
    'pub': _parse_pub,
    'uid': _parse_uid,
    'fpr': _parse_fpr,
    'sub': _parse_sub,
}


def _parse_keys(gpgout, debug=False):
    """
    Parse l'output d'un listing de clés gpg.
    TODO: à déméler et simplifier
    """
    ring = {}
    # Valeur utilisée pour dire "cet objet n'a pas encore été rencontré pendant le parsing"
    init_value = "initialize"
    current_pub = init_value
    current_sub = init_value
    for line in iter(gpgout.readline, ''):
        # La doc dit que l'output est en UTF-8 «regardless of any --display-charset setting»
        try:
            line = line.decode("utf-8")
        except UnicodeDecodeError:
            try:
                line = line.decode("iso8859-1")
            except UnicodeDecodeError:
                line = line.decode("iso8859-1", "ignore")
                log.warning(
                    "gpg is telling shit, it is neither ISO-8859-1 nor UTF-8. Dropping!")
        line = line.split(":")
        field = line[0]
        if field in GPG_PARSERS.keys():
            log.debug("begin loop, met %s :" % (field))
            log.debug("current_pub : %r" % current_pub)
            log.debug("current_sub : %r" % current_sub)
            try:
                content = GPG_PARSERS[field](line)
            except KeyError:
                log.error("*** FAILED *** Line: %s", line)
                raise
            if field == "pub":
                # Nouvelle clé
                # On sauvegarde d'abord le dernier sub (si il y en a un) dans son pub parent
                if current_sub != init_value:
                    current_pub["subkeys"].append(current_sub)
                # Ensuite on sauve le pub précédent (si il y en a un) dans le ring
                if current_pub != init_value:
                    ring[current_pub["fpr"]] = current_pub
                # On place le nouveau comme pub courant
                current_pub = content
                # Par défaut, il n'a ni subkeys, ni uids
                current_pub["subkeys"] = []
                current_pub["uids"] = []
                # On oublié l'éventuel dernier sub rencontré
                current_sub = init_value
            elif field == "fpr":
                if current_sub != init_value:
                    # On a lu un sub depuis le dernier pub, donc le fingerprint est celui du dernier sub rencontré
                    current_sub["fpr"] = content["fpr"]
                else:
                    # Alors c'est le fingerprint du pub
                    current_pub["fpr"] = content["fpr"]
            elif field == "uid":
                current_pub["uids"].append(content)
            elif field == "sub":
                # Nouvelle sous-clé
                # D'abord on sauvegarde la précédente (si il y en a une) dans son pub parent
                if current_sub != init_value:
                    current_pub["subkeys"].append(current_sub)
                # On place la nouvelle comme sub courant
                current_sub = content
            log.debug("current_pub : %r" % current_pub)
            log.debug("current_sub : %r" % current_sub)
            log.debug("parsed object : %r" % content)
    # À la fin, il faut sauvegarder les derniers (sub, pub) rencontrés,
    # parce que leur sauvegarde n'a pas encore été déclenchée
    if current_sub != init_value:
        current_pub["subkeys"].append(current_sub)
    if current_pub != init_value:
        ring[current_pub["fpr"]] = current_pub
    return ring


def _gpg(*argv):
    """
    Run GPG and return its standard input and output.
    """
    full_command = [GPG] + list(argv)
    log.info("Running `%s`" % " ".join(full_command))
    proc = subprocess.Popen(
        full_command,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=sys.stderr,
        close_fds=True,
    )
    return proc.stdin, proc.stdout


def decrypt(content):
    """
    Return decrypted content
    """
    stdin, stdout = _gpg('-d')
    stdin.write(content.encode("utf-8"))
    stdin.close()
    return stdout.read().decode("utf-8")


def encrypt(content: str, fingerprints: [str]) -> str:
    """
    Return encrypted content for fingerprints
    """
    stdin, stdout = _gpg('--armor', '-es', *fingerprints)
    stdin.write(content.encode("utf-8"))
    stdin.close()
    return stdout.read().decode("utf-8")


def receive_keys(keys):
    return _gpg('--recv-keys', *keys)


def list_keys():
    # It is not an error, you need two --with-fingerprint
    # to get subkeys fingerprints.
    _, out = _gpg('--list-keys', '--with-colons', '--fixed-list-mode',
                  '--with-fingerprint', '--with-fingerprint')
    return _parse_keys(out)