shortcuts.py 6.09 KB
Newer Older
1 2 3 4 5 6
#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" Raccourcis pour se connecter facilement à la base LDAP avec le binding lc_ldap. """

import sys
7
import os
8

9 10 11 12
import functools
from ldap import SERVER_DOWN
from time import sleep

13 14 15 16 17 18 19
# Import du fichier de secrets. Le kludge de path ci-dessous ne devrait plus
# avoir lieu dans le futur
try:
    from gestion import secrets_new as secrets
except ImportError:
    sys.stderr.write("lc_ldap shortcuts: shaa, cannot import secrets_new. " +
        "try again with /usr/scripts/ in PYTHONPATH " +
20
        "(argv: %s)\n" % " ".join(getattr(sys, 'argv', [])))
21 22 23 24 25 26 27
    sys.path.append("/usr/scripts")
    from gestion import secrets_new as secrets
    sys.path.pop()

import lc_ldap as module_qui_a_le_meme_nom_que_sa_classe_principale
import variables

28 29 30
#: Pour enregistrer dans l'historique, on a besoin de savoir qui exécute le script
#: Si le script a été exécuté via un sudo, la variable SUDO_USER (l'utilisateur qui a effectué le sudo)
#:  est plus pertinente que USER (qui sera root)
31
#: À noter que Cron, par exemple, n'a ni USER ni SUDO_USER mais possède bien LOGNAME
Daniel STAN's avatar
Daniel STAN committed
32 33
import getpass
current_user = os.getenv("SUDO_USER") or os.getenv("USER") or os.getenv("LOGNAME") or getpass.getuser()
34 35
if isinstance(current_user, str):
    current_user = current_user.decode("utf-8")
36

37 38 39 40
#: Les racourcis définis plus bas pointent tous vers lc_ldap_test en mode
# debug
DB_TEST_OVERRIDE = bool(os.getenv('DBG_LDAP', False))

41 42 43 44 45 46
def lc_ldap(*args, **kwargs):
    """Renvoie une connexion à la base LDAP."""
    return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)

def lc_ldap_test(*args, **kwargs):
    """Renvoie une connexion LDAP à la base de tests."""
47
    module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root='/tmp/cimetiere_lc_%s/' % (os.getenv("USER") or os.getenv("LOGNAME"))
48 49 50 51
    try:
        os.mkdir(module_qui_a_le_meme_nom_que_sa_classe_principale.cimetiere.cimetiere_root)
    except OSError:
        pass
52 53
    # On impose le serveur
    kwargs["uri"] = 'ldap://vo.adm.crans.org'
54
    kwargs.setdefault("dn", 'cn=admin,dc=crans,dc=org')
55 56 57 58 59 60 61 62 63 64 65
    # Le mot de passe de la base de test
    kwargs.setdefault("cred", variables.ldap_test_password)
    # On en a aussi besoin pour le lookup en readonly
    kwargs.setdefault("readonly_dn", variables.readonly_dn)
    kwargs.setdefault("readonly_password", variables.ldap_test_password)
    kwargs.setdefault("user", current_user)
    return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)

def lc_ldap_admin(*args, **kwargs):
    """Renvoie une connexion LDAP à la vraie base, en admin.
       Possible seulement si on peut lire secrets.py
66

67
       """
68
    kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
69 70
    kwargs["dn"] = secrets.get('ldap_auth_dn')
    kwargs["cred"] = secrets.get('ldap_password')
71
    kwargs.setdefault("user", current_user)
72
    return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
73 74 75 76

def lc_ldap_readonly(*args, **kwargs):
    """Connexion LDAP à la vraie base, en readonly.
       Possible seulement si on peut lire secrets.py
77

78 79
       """
    kwargs["uri"] = 'ldap://ldap.adm.crans.org/'
80 81
    kwargs["dn"] = secrets.get('ldap_readonly_auth_dn')
    kwargs["cred"] = secrets.get('ldap_readonly_password')
82 83 84 85 86 87 88 89 90 91 92 93 94 95
    kwargs.setdefault("user", current_user)
    return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)

def lc_ldap_local(*args, **kwargs):
    """Connexion LDAP en lecture seule sur la base locale.
       L'idée est que les machines avec un réplica bossent
       avec elles-mêmes pour la lecture, pas avec vert.

       Attention, les accès internes en lecture seule
       ou avec une socket ldapi semblent moins prioritaires
       qu'avec cn=admin. Ne pas utiliser cette fonction
       si vous souhaitez faire beaucoup de recherches
       indépendantes (c'est le temps d'accès à la socket
       qui est problématique).
96

97 98 99 100 101 102 103 104
       """
    if os.path.exists('/var/run/slapd/ldapi'):
        ro_uri = 'ldapi://%2fvar%2frun%2fslapd%2fldapi/'
    elif os.path.exists('/var/run/ldapi'):
        ro_uri = 'ldapi://%2fvar%2frun%2fldapi/'
    else:
        ro_uri = 'ldap://127.0.0.1'
    kwargs["uri"] = ro_uri
105 106 107 108
    if not kwargs.has_key('dn'):
        kwargs['dn'] = secrets.get('ldap_readonly_auth_dn')
    if not kwargs.has_key('cred'):
        kwargs['cred'] = secrets.get('ldap_readonly_password')
109 110
    kwargs.setdefault("user", current_user)
    return module_qui_a_le_meme_nom_que_sa_classe_principale.lc_ldap(*args, **kwargs)
111

112 113 114 115 116 117 118
def lc_ldap_anonymous(*args, **kwargs):
    kwargs.update({
        'user': None,
        'dn': None,
        'cred': None,
    })
    return lc_ldap_local(*args, **kwargs)
119

120 121 122 123 124 125
if DB_TEST_OVERRIDE:
    lc_ldap_admin = lc_ldap_test
    lc_ldap_readonly = lc_ldap_test
    lc_ldap_local = lc_ldap_test
    lc_ldap_anonymous = lc_ldap_test

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
class with_ldap_conn(object):
    """Décorateur (instance = decorator) pour les fonctions nécessitant une
    connexion ldap. Rajoute un argument à la fonction (à la fin) pour passer
    la connexion"""

    #: fonction à appeler (sans argument) pour créer une connexion
    constructor = None

    #: nombre d'essais de rattrage de serveurs down, avant échec
    retries = 0

    #: délai entre deux essais (en secondes)
    delay = 1

    #: la connexion ldap
    conn = None

    def __init__(self, constructor=lc_ldap_local, retries=2, delay=1):
        self.constructor = constructor
        self.retries = retries
        self.delay = delay

    def apply(self, f, nargs, kargs):
        attempt = 0
        while True:
            attempt += 1
            try:
                if not self.conn:
                    self.conn = self.constructor()
                return f(*(nargs + (self.conn, )), **kargs)
            except SERVER_DOWN:
                if attempt >= self.retries:
                    raise
                else:
                    print "one caught !"
                    self.conn = None
                    sleep(self.delay)

    def __call__(self, f):
        @functools.wraps(f)
        def new_f(*nargs, **kargs):
            return self.apply(f, nargs, kargs)

        return new_f