# -*- coding: iso-8859-1 -*- """ MoinMoin - CAS authentication Jasig CAS (see http://www.jasig.org/cas) authentication module. @copyright: 2012 MoinMoin:RichardLiao @license: GNU GPL, see COPYING for details. """ import sys import os import time import re import urlparse import urllib, urllib2 from lxml import etree from lxml.etree import XMLSyntaxError from MoinMoin import log logging = log.getLogger(__name__) from MoinMoin.auth import BaseAuth from MoinMoin import user, wikiutil from MoinMoin.theme import load_theme_fallback class PyCAS(object): """A class for working with a CAS server.""" def __init__(self, server_url, renew=False, login_path='/login', logout_path='/logout', validate_path='/validate', coding='utf-8'): self.server_url = server_url self.renew = renew self.login_path = login_path self.logout_path = logout_path self.validate_path = validate_path self.coding = coding def login_url(self, service): """Return the login URL for the given service.""" url = self.server_url + self.login_path + '?service=' + urllib.quote_plus(service) if self.renew: url += "&renew=true" return url def logout_url(self, redirect_url=None): """Return the logout URL.""" url = self.server_url + self.logout_path if redirect_url: url += '?url=' + urllib.quote_plus(redirect_url) url += '&service=' + urllib.quote_plus(redirect_url) return url def validate_url(self, service, ticket): """Return the validation URL for the given service. (For CAS 1.0)""" url = self.server_url + self.validate_path + '?service=' + urllib.quote_plus(service) + '&ticket=' + urllib.quote_plus(ticket) if self.renew: url += "&renew=true" return url def singlesignout(self, callback, body): try: nodes = etree.fromstring(body).xpath("/samlp:LogoutRequest/samlp:SessionIndex", namespaces={'samlp' : 'urn:oasis:names:tc:SAML:2.0:protocol'}) for node in nodes: callback(node.text) except XMLSyntaxError: pass def validate_ticket(self, service, ticket): """Validate the given ticket against the given service.""" f = urllib2.urlopen(self.validate_url(service, ticket)) valid = f.readline() valid = valid.strip() == 'yes' user = f.readline().strip() user = user.decode(self.coding) return valid, user class CASAuth(BaseAuth): """ handle login from CAS """ name = 'CAS' login_inputs = [] logout_possible = True def __init__(self, auth_server, login_path="/login", logout_path="/logout", validate_path="/validate", action="login_cas", create_user=False, fallback_url=None, ticket_path=None): BaseAuth.__init__(self) self.cas = PyCAS(auth_server, login_path=login_path, validate_path=validate_path, logout_path=logout_path) self.action = action self.create_user = create_user self.fallback_url = fallback_url self.ticket_path = ticket_path def request(self, request, user_obj, **kw): ticket = request.args.get("ticket", "") action = request.args.get("action", "") force = request.args.get("force", None) is not None logoutRequest = request.args.get("logoutRequest", []) p = urlparse.urlparse(request.url) url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", "")) def store_ticket(ticket, username): with open(self.ticket_path + ticket, 'w') as f: f.write(username) def username_of_ticket(ticket): try: with open(self.ticket_path + ticket) as f: username = f.read() os.remove(self.ticket_path + ticket) return username except IOError: return None def logout_user(ticket): username = username_of_ticket(ticket) if username: u = user.User(request, None, username) checks = [] if u.exists(): def user_matches(session): try: return session['user.id'] == u.id except KeyError: return False session_service = request.cfg.session_service for sid in session_service.get_all_session_ids(request): session = session_service.get_session(request, sid) if user_matches(session): session_service.destroy_session(request, session) # authenticated user if not force and user_obj and user_obj.valid: if (action == self.action or (ticket and ticket.startswith('ST-'))) and user_obj.auth_method == self.name: request.http_redirect(url) if self.ticket_path and request.method == 'POST': logoutRequest=request.form.get('logoutRequest', None) if logoutRequest is not None: sys.stderr.write("Tentative de deconnexion du CAS : %s\n" % logoutRequest) self.cas.singlesignout(logout_user, logoutRequest) # valid ticket on CAS if ticket and ticket.startswith('ST-'): valid, username = self.cas.validate_ticket(url, ticket) if valid: sys.stderr.write("Authentifiaction de %s sur le CAS\n" % username) u = user.User(request, auth_username=username, auth_method=self.name) # auto create user ? if self.create_user: u.valid = valid u.create_or_update(True) else: u.valid = u.exists() if self.fallback_url and not u.valid: request.http_redirect("%s?action=%s&wiki_url=%s" % (self.fallback_url, self.action, url)) if u.valid: store_ticket(ticket, username) load_theme_fallback(request, u.theme_name) return u, True else: request.http_redirect(self.cas.login_url(url)) elif self.action == action: # Redirect login request.http_redirect(self.cas.login_url(url)) return user_obj, True def logout(self, request, user_obj, **kw): if self.name and user_obj and user_obj.auth_method == self.name: user_obj.valid = False request.cfg.session_service.destroy_session(request, request.session) p = urlparse.urlparse(request.url) url = urlparse.urlunparse((p.scheme, p.netloc, p.path, "", "", "")) request.http_redirect(self.cas.logout_url(url)) return user_obj, False def login_hint(self, request): p = urlparse.urlparse(request.url) url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", "")) _ = request.getText msg = _('
Se connecter via le CAS (vous devez disposer d\'un compte Cr@ns pour cela)
' % self.cas.login_url(url)) return msg