#!/usr/bin/env python # -*- coding: utf-8 -*- """ Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """ import sys import os import functools from ldap import SERVER_DOWN from time import sleep # Import du fichier de secrets. Le kludge de path ci-dessous ne devrait plus # avoir lieu dans le futur try: from gestion import secrets_new as secrets except ImportError: sys.stderr.write("lc_ldap shortcuts: shaa, cannot import secrets_new. " + "try again with /usr/scripts/ in PYTHONPATH " + "(argv: %s)\n" % " ".join(getattr(sys, 'argv', []))) sys.path.append("/usr/scripts") from gestion import secrets_new as secrets sys.path.pop() import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale import variables #: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script #: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo) #: est plus pertinente que USER (qui sera root) #: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME import getpass current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME") or getpass.getuser() if isinstance(current_user, str): current_user = current_user.decode("utf-8") #: Les racourcis définis plus bas pointent tous vers lc_ldap_test en mode # debug DB_TEST_OVERRIDE = bool(os.getenv('DBG_LDAP', False)) def lc_ldap(*args, **kwargs): """Renvoie une connexion à la base LDAP.""" return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs) def lc_ldap_test(*args, **kwargs): """Renvoie une connexion LDAP à la base de tests.""" module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root='/tmp/cimetiere_lc_%s/' % (os.getenv("USER") or os.getenv("LOGNAME")) try: os.mkdir(module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root) except OSError: pass # On impose le serveur kwargs["uri"] = 'ldap://vo.adm.crans.org' kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org') # Le mot de passe de la base de test kwargs.setdefault("cred", variables.ldap_test_password) # On en a aussi besoin pour le lookup en readonly kwargs.setdefault("readonly_dn", variables.readonly_dn) kwargs.setdefault("readonly_password", variables.ldap_test_password) kwargs.setdefault("user", current_user) return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs) def lc_ldap_admin(*args, **kwargs): """Renvoie une connexion LDAP à la vraie base, en admin. Possible seulement si on peut lire secrets.py """ kwargs["uri"] = 'ldap://ldap.adm.crans.org/' kwargs["dn"] = secrets.get('ldap_auth_dn') kwargs["cred"] = secrets.get('ldap_password') kwargs.setdefault("user", current_user) return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs) def lc_ldap_readonly(*args, **kwargs): """Connexion LDAP à la vraie base, en readonly. Possible seulement si on peut lire secrets.py """ kwargs["uri"] = 'ldap://ldap.adm.crans.org/' kwargs["dn"] = secrets.get('ldap_readonly_auth_dn') kwargs["cred"] = secrets.get('ldap_readonly_password') kwargs.setdefault("user", current_user) return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs) def lc_ldap_local(*args, **kwargs): """Connexion LDAP en lecture seule sur la base locale. L'idée est que les machines avec un réplica bossent avec elles-mêmes pour la lecture, pas avec vert. Attention, les accès internes en lecture seule ou avec une socket ldapi semblent moins prioritaires qu'avec cn=admin. Ne pas utiliser cette fonction si vous souhaitez faire beaucoup de recherches indépendantes (c'est le temps d'accès à la socket qui est problématique). """ if os.path.exists('/var/run/slapd/ldapi'): ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/' elif os.path.exists('/var/run/ldapi'): ro_uri = 'ldapi://%2fvar%2frun%2fldapi/' else: ro_uri = 'ldap://127.0.0.1' kwargs["uri"] = ro_uri if not kwargs.has_key('dn'): kwargs['dn'] = secrets.get('ldap_readonly_auth_dn') if not kwargs.has_key('cred'): kwargs['cred'] = secrets.get('ldap_readonly_password') kwargs.setdefault("user", current_user) return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs) def lc_ldap_anonymous(*args, **kwargs): kwargs.update({ 'user': None, 'dn': None, 'cred': None, }) return lc_ldap_local(*args, **kwargs) if DB_TEST_OVERRIDE: lc_ldap_admin = lc_ldap_test lc_ldap_readonly = lc_ldap_test lc_ldap_local = lc_ldap_test lc_ldap_anonymous = lc_ldap_test class with_ldap_conn(object): """Décorateur (instance = decorator) pour les fonctions nécessitant une connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer la connexion""" #: fonction à appeler (sans argument) pour créer une connexion constructor = None #: nombre d'essais de rattrage de serveurs down, avant échec retries = 0 #: délai entre deux essais (en secondes) delay = 1 #: la connexion ldap conn = None def __init__(self, constructor=lc_ldap_local, retries=2, delay=1): self.constructor = constructor self.retries = retries self.delay = delay def apply(self, f, nargs, kargs): attempt = 0 while True: attempt += 1 try: if not self.conn: self.conn = self.constructor() return f(*(nargs + (self.conn, )), **kargs) except SERVER_DOWN: if attempt >= self.retries: raise else: print "one caught !" self.conn = None sleep(self.delay) def __call__(self, f): @functools.wraps(f) def new_f(*nargs, **kargs): return self.apply(f, nargs, kargs) return new_f