Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""
GPG binding
Copyright (C) 2010-2020 Cr@ns <roots@crans.org>
Authors : Daniel Stan <daniel.stan@crans.org>
Vincent Le Gallic <legallic@crans.org>
Alexandre Iooss <erdnaxe@crans.org>
SPDX-License-Identifier: GPL-3.0-or-later
"""
import sys
import subprocess
import logging
import time
import datetime
# Local logger
log = logging.getLogger(__name__)
# GPG Definitions
#: Path du binaire gpg
GPG = 'gpg'
#: Mapping (lettre de trustlevel) -> (signification, faut-il faire confiance à la clé)
GPG_TRUSTLEVELS = {
"-": ("inconnue (pas de valeur assignée)", False),
"o": ("inconnue (nouvelle clé)", False),
"i": ("invalide (self-signature manquante ?)", False),
"n": ("nulle (il ne faut pas faire confiance à cette clé)", False),
"m": ("marginale (pas assez de lien de confiance vers cette clé)", False),
"f": ("entière (clé dans le réseau de confiance)", True),
"u": ("ultime (c'est probablement ta clé)", True),
"r": ("révoquée", False),
"e": ("expirée", False),
"q": ("non définie", False),
}
def _parse_timestamp(string, canbenone=False):
"""Interprète ``string`` comme un timestamp depuis l'Epoch."""
if string == '' and canbenone:
return None
return datetime.datetime(*time.localtime(int(string))[:7])
def _parse_pub(data):
"""Interprète une ligne ``pub:``"""
d = {
'trustletter': data[1],
'length': int(data[2]),
'algorithm': int(data[3]),
'longid': data[4],
'signdate': _parse_timestamp(data[5]),
'expiredate': _parse_timestamp(data[6], canbenone=True),
'ownertrustletter': data[8],
'capabilities': data[11],
}
return d
def _parse_uid(data):
"""Interprète une ligne ``uid:``"""
d = {
'trustletter': data[1],
'signdate': _parse_timestamp(data[5], canbenone=True),
'hash': data[7],
'uid': data[9],
}
return d
def _parse_fpr(data):
"""Interprète une ligne ``fpr:``"""
d = {
'fpr': data[9],
}
return d
def _parse_sub(data):
"""Interprète une ligne ``sub:``"""
d = {
'trustletter': data[1],
'length': int(data[2]),
'algorithm': int(data[3]),
'longid': data[4],
'signdate': _parse_timestamp(data[5]),
'expiredate': _parse_timestamp(data[6], canbenone=True),
'capabilities': data[11],
}
return d
#: Functions to parse the recognized fields
GPG_PARSERS = {
'pub': _parse_pub,
'uid': _parse_uid,
'fpr': _parse_fpr,
'sub': _parse_sub,
}
def _parse_keys(gpgout, debug=False):
"""
Parse l'output d'un listing de clés gpg.
TODO: à déméler et simplifier
"""
ring = {}
# Valeur utilisée pour dire "cet objet n'a pas encore été rencontré pendant le parsing"
init_value = "initialize"
current_pub = init_value
current_sub = init_value
for line in iter(gpgout.readline, ''):
# La doc dit que l'output est en UTF-8 «regardless of any --display-charset setting»
try:
line = line.decode("utf-8")
except UnicodeDecodeError:
try:
line = line.decode("iso8859-1")
except UnicodeDecodeError:
line = line.decode("iso8859-1", "ignore")
log.warning(
"gpg is telling shit, it is neither ISO-8859-1 nor UTF-8. Dropping!")
line = line.split(":")
field = line[0]
if field in GPG_PARSERS.keys():
log.debug("begin loop, met %s :" % (field))
log.debug("current_pub : %r" % current_pub)
log.debug("current_sub : %r" % current_sub)
try:
content = GPG_PARSERS[field](line)
except KeyError:
log.error("*** FAILED *** Line: %s", line)
raise
if field == "pub":
# Nouvelle clé
# On sauvegarde d'abord le dernier sub (si il y en a un) dans son pub parent
if current_sub != init_value:
current_pub["subkeys"].append(current_sub)
# Ensuite on sauve le pub précédent (si il y en a un) dans le ring
if current_pub != init_value:
ring[current_pub["fpr"]] = current_pub
# On place le nouveau comme pub courant
current_pub = content
# Par défaut, il n'a ni subkeys, ni uids
current_pub["subkeys"] = []
current_pub["uids"] = []
# On oublié l'éventuel dernier sub rencontré
current_sub = init_value
elif field == "fpr":
if current_sub != init_value:
# On a lu un sub depuis le dernier pub, donc le fingerprint est celui du dernier sub rencontré
current_sub["fpr"] = content["fpr"]
else:
# Alors c'est le fingerprint du pub
current_pub["fpr"] = content["fpr"]
elif field == "uid":
current_pub["uids"].append(content)
elif field == "sub":
# Nouvelle sous-clé
# D'abord on sauvegarde la précédente (si il y en a une) dans son pub parent
if current_sub != init_value:
current_pub["subkeys"].append(current_sub)
# On place la nouvelle comme sub courant
current_sub = content
log.debug("current_pub : %r" % current_pub)
log.debug("current_sub : %r" % current_sub)
log.debug("parsed object : %r" % content)
# À la fin, il faut sauvegarder les derniers (sub, pub) rencontrés,
# parce que leur sauvegarde n'a pas encore été déclenchée
if current_sub != init_value:
current_pub["subkeys"].append(current_sub)
if current_pub != init_value:
ring[current_pub["fpr"]] = current_pub
return ring
def _gpg(*argv):
"""
Run GPG and return its standard input and output.
"""
full_command = [GPG] + list(argv)
log.info("Running `%s`" % " ".join(full_command))
proc = subprocess.Popen(
full_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr,
close_fds=True,
)
return proc.stdin, proc.stdout
def decrypt(content):
"""
Return decrypted content
"""
stdin, stdout = _gpg('-d')
stdin.write(content.encode("utf-8"))
stdin.close()
return stdout.read().decode("utf-8")
def encrypt(content: str, fingerprints: [str]) -> str:
"""
Return encrypted content for fingerprints
"""
stdin, stdout = _gpg('--armor', '-es', *fingerprints)
stdin.write(content.encode("utf-8"))
stdin.close()
return stdout.read().decode("utf-8")
def receive_keys(keys):
return _gpg('--recv-keys', *keys)
def list_keys():
# It is not an error, you need two --with-fingerprint
# to get subkeys fingerprints.
_, out = _gpg('--list-keys', '--with-colons', '--fixed-list-mode',
'--with-fingerprint', '--with-fingerprint')
return _parse_keys(out)