-
Benjamin Graillot authoredc2ffc73c
ldap.py 5.56 KiB
import ipaddress
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
import ldap
display = Display()
def decode_object(object):
return {attribute: [value.decode('utf-8') for value in object[attribute]] for attribute in object}
class LookupModule(LookupBase):
def __init__(self, **kwargs):
self.base = ldap.initialize('ldaps://localhost:1636/')
self.base.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
self.base.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
self.base_dn = 'dc=crans,dc=org'
def query(self, base, scope, filter='(objectClass=*)', attr=None):
"""
Make a LDAP query
query('ldap', 'query', BASE, SCOPE[, FILTER[, ATTR]])
BASE: base dn
SCOPE: 'base', 'one' or 'sub'
FILTER: ldap filter (optional)
ATTR: list of attributes (optional)
"""
scope = { 'base': ldap.SCOPE_BASE, 'one': ldap.SCOPE_ONELEVEL, 'sub': ldap.SCOPE_SUBTREE }[scope]
query_id = self.base.search(f"{base}", scope, filter, attr)
result = self.base.result(query_id)[1]
result = { dn: decode_object(entry) for dn, entry in result }
return result
def ip(self, host, vlan):
"""
Retrieve IP addresses of an interface of a device
query('ldap', 'ip', HOST, VLAN)
"""
if isinstance(vlan, int):
network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}")
network_result = self.base.result(network_query_id)
vlan = network_result[1][0][1]['cn'][0].decode('utf-8')
if vlan == 'srv':
query_id = self.base.search(f"cn={host}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
else:
query_id = self.base.search(f"cn={host}.{vlan}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
result = self.base.result(query_id)
result = result[1][0][1]
result = [res.decode('utf-8') for res in result['ipHostNumber']]
return result
def subnet_ipv4(self, subnet):
"""
Retrive used IP addresses on a subnet
query('ldap', 'subnet_ipv4', SUBNET)
"""
network_query_id = self.base.search(f"cn={subnet},ou=networks,{self.base_dn}", ldap.SCOPE_BASE)
network_result = self.base.result(network_query_id)
network = network_result[1][0][1]
network, hostmask = network['ipNetworkNumber'][0].decode('utf-8'), network['ipNetmaskNumber'][0].decode('utf-8')
subnet = ipaddress.IPv4Network(f"{network}/{hostmask}")
query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=ipHost")
result = self.base.result(query_id)
result = [ip.decode('utf-8') for dn, entry in result[1] for ip in entry['ipHostNumber'] if ipaddress.ip_address(ip.decode('utf-8')) in subnet]
return result
def run(self, terms, variables=None, **kwargs):
if terms[0] == 'query':
result = self.query(*terms[1:])
elif terms[0] == 'ip':
result = self.ip(*terms[1:])
elif terms[0] == 'subnet_ipv4':
result = self.subnet_ipv4(*terms[1:])
elif terms[0] == 'group':
query_id = self.base.search(f"ou=group,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=posixGroup")
result = self.base.result(query_id)
result = result[1]
# query interface attribute
# query('ldap', 'hosts', HOST, VLAN, ATTR)
# HOST: device name
# VLAN: vlan name
# ATTR: attribute
elif terms[0] == 'hosts':
host = terms[1]
vlan = terms[2]
attr = terms[3]
if isinstance(vlan, int):
network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}")
network_result = self.base.result(network_query_id)
vlan = network_result[1][0][1]['cn'][0].decode('utf-8')
if vlan == 'srv':
query_id = self.base.search(f"cn={host}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
else:
query_id = self.base.search(f"cn={host}.{vlan}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE)
result = self.base.result(query_id)
result = result[1][0][1]
result = [res.decode('utf-8') for res in result[attr]]
elif terms[0] == 'networks':
network = terms[1]
query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork")
result = self.base.result(query_id)
result = result[1][0][1]
return [str(ipaddress.ip_network('{}/{}'.format(result['ipNetworkNumber'][0].decode('utf-8'), result['ipNetmaskNumber'][0].decode('utf-8'))))]
elif terms[0] == 'vlanid':
network = terms[1]
query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork")
result = self.base.result(query_id)
result = result[1][0][1]
return int(result['description'][0])
elif terms[0] == 'role':
role = terms[1]
query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={role}")
result = self.base.result(query_id)
result = [cn.decode('utf-8') for res in result[1] for cn in res[1]['cn']]
return result