Skip to content
Snippets Groups Projects

[moimoin-gendoc] Write ssh fingerprint instead of the full key that no one would ever read.

Merged Maxime Bombar requested to merge action_sshfp into newinfra
4 files
+ 83
3
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 75
0
import base64
import hashlib
from ansible.errors import AnsibleError
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display
from ansible.module_utils._text import to_native
display = Display()
class ActionModule(ActionBase):
TRANSFERS_FILES = False
def _sshfp(self, key):
return(b'SHA256:'+base64.b64encode(
hashlib.sha256(base64.b64decode(key)
).digest()).replace(b'=',b'')).decode('utf-8')
def run(self, tmp=None, task_vars=None):
"""
The run method is the main Action Plugin driver. All work is done from within this method.
tmp: Temporary directory. Sometimes an action plugin sets up
a temporary directory and then calls another module. This parameter
allows us to reuse the same directory for both.
task_vars: The variables (host vars, group vars, config vars, etc) associated with this task.
Note that while this will contain Ansible facts from the host, they should be used
with caution as a user running Ansible can disable their collection. If you want
make sure that your Action Plugin always has access to the ones it needs, you may
want to consider running the setup module directly in the run the method and getting
the Ansible facts that way.
The strategy plugin which manages running tasks on instances uses an ansible.vars.manager
VariableManager instance to retrieve this context specific dict of variables.
"""
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
self._supports_check_mode = True
self._supports_async = False
setup_module_args=dict(
gather_subset='all',
gather_timeout=10
)
setup_result = self._execute_module(
module_name='setup',
module_args=setup_module_args,
persist_files=False,
task_vars=task_vars,
tmp=tmp,
)
hostname = task_vars.get('inventory_hostname')
for algo in ['rsa', 'ecdsa', 'ed25519']:
key = f'ansible_ssh_host_key_{algo}_public'
keyblob = setup_result['ansible_facts'].get(key)
if not keyblob:
display.vvv(f"host {hostname} doesn't offer {algo} ssh host key. Skipping...")
result[f'ssh_host_key_{algo}_fp'] = None
continue
try:
display.vvv("Trying to get fingerprint for {algo} ssh host key.")
fp = self._sshfp(keyblob)
result[f'ssh_host_key_{algo}_fp'] = fp
except Exception as e:
err_msg = to_native(e)
raise AnsibleError(err_msg)
return result
Loading