From f5e62f579f8c31c55ee1f08da39272d610af0715 Mon Sep 17 00:00:00 2001 From: Maxime Bombar Date: Wed, 27 May 2020 17:21:12 +0200 Subject: [PATCH] [remote.py] If authentic fingerprint found in DNS, then automatically accept connection. Otherwise, ask user. --- cpasswords/remote.py | 92 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/cpasswords/remote.py b/cpasswords/remote.py index aaec0db..d400d86 100644 --- a/cpasswords/remote.py +++ b/cpasswords/remote.py @@ -8,15 +8,22 @@ Authors : Daniel Stan SPDX-License-Identifier: GPL-3.0-or-later """ -from functools import lru_cache -from getpass import getpass +import base64 import json import logging +from functools import lru_cache +from getpass import getpass +from hashlib import sha1, sha256 from pathlib import Path -from paramiko.client import SSHClient +from dns import flags, resolver +from paramiko.client import MissingHostKeyPolicy, SSHClient from paramiko.config import SSHConfig -from paramiko.ssh_exception import SSHException, PasswordRequiredException, AuthenticationException +from paramiko.ssh_exception import ( + AuthenticationException, + PasswordRequiredException, + SSHException, +) from .locale import _ @@ -24,6 +31,82 @@ from .locale import _ log = logging.getLogger(__name__) +AUTHENTICITY_MSG = """ +The authenticity of host '{host}' can't be established. +{ktype} key fingerprint is SHA256:{kfp}.{dnsfp} +Are you sure you want to continue connecting (yes/no)? +""" + +_key_algorithms = { + "ssh-rsa": "1", + "ssh-dss": "2", + "ecdsa-sha2-nistp256": "3", + "ecdsa-sha2-nistp384": "3", + "ecdsa-sha2-nistp521": "3", + "ssh-ed25519": "4", +} + +_hash_funcs = { + "1": sha1, + "2": sha256, +} + + +class AskUserOrDNSPolicy(MissingHostKeyPolicy): + """ + Policy for automatically trusting DNSSEC authenticated fingerprint + or asking the user for unknown hostname & key. This is used by `.SSHClient` + """ + + def missing_host_key(self, client, hostname, key): + kfp256 = sha256(key.asbytes()).digest() + kfp = base64.b64encode(kfp256).decode("utf-8").replace("=", "") + ktype = key.get_name().upper().replace("SSH-", "") + kalg = _key_algorithms.get(key.get_name()) + kres = resolver.Resolver() + kres.use_edns(True, flags.DO, 1280) + dnssec = False + found = False + dnsfp = "" + try: + kans = kres.query(hostname, "SSHFP") + if kans.response.flags & flags.AD: + dnssec = True + for rdata in kans: + try: + alg, fptype, fp = rdata.to_text().split() + except ValueError: # Invalid SSHFP record format, don't care. + pass + if alg != kalg: + continue + if fptype not in _hash_funcs: + continue + expected = _hash_funcs.get(fptype)(key.asbytes()).hexdigest() + if expected == fp: + found = True + break + except resolver.NoAnswer: # Can't find SSHFP for this host. + pass + if found and dnssec: + log.debug( + "Authentic {} host key for {} found in DNS".format( + key.get_name(), hostname + ) + ) + return + if found: + dnsfp = "\nMatching host key fingerprint found in DNS." + inp = input( + AUTHENTICITY_MSG.format(host=hostname, ktype=ktype, kfp=kfp, dnsfp=dnsfp) + ) + if inp not in ["yes", "y", ""]: + log.debug("Rejecting {} host key for {}: {}".format(ktype, hostname, kfp)) + raise SSHException( + "Connection to {!r} rejected by the user".format(hostname) + ) + log.debug("Accepting {} host key for {}: {}".format(ktype, hostname, kfp256)) + + @lru_cache() def create_ssh_client(host, password=None): """ @@ -31,6 +114,7 @@ def create_ssh_client(host, password=None): """ # Create SSH client with system host keys and agent client = SSHClient() + client.set_missing_host_key_policy(AskUserOrDNSPolicy) # Load config file and use the right username try: -- GitLab