Commit d204bbde authored by Charlie Jacomme's avatar Charlie Jacomme

wiki auth file gitted

parent 3f9ae0df
# -*- coding: iso-8859-1 -*-
"""
MoinMoin - CAS authentication
Jasig CAS (see http://www.jasig.org/cas) authentication module.
@copyright: 2012 MoinMoin:RichardLiao
@license: GNU GPL, see COPYING for details.
"""
import sys
import os
import time, re
import urlparse
import urllib, urllib2
from MoinMoin import log
logging = log.getLogger(__name__)
from MoinMoin.auth import BaseAuth
from MoinMoin import user, wikiutil
from MoinMoin.Page import Page
from werkzeug import get_host
class AnonymousAuth(BaseAuth):
name = 'AnonymousAuth'
login_inputs = []
logout_possible = False
def __init__(self, auth_username="Connexion"):
BaseAuth.__init__(self)
self.auth_username = auth_username
def can_view(self, request):
raise NotImplementedError
def request(self, request, user_obj, **kw):
action = request.args.get("action", "")
# Si l'utilisateur est en train de se connecter
# On droppe la pseudo connexion anonyme, si elle
# existe bien.
if action == 'login':
if user_obj:
user_obj.valid = False
return user_obj, True
# authenticated user
if user_obj and user_obj.valid and user_obj.auth_method != self.name and user_obj.name != self.auth_username:
return user_obj, True
p = urlparse.urlparse(request.url)
# Prevent preference edition and quicklink when anonymous
if action == "userprefs" or action == "quicklink":
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", "")).encode('utf8')
request.http_redirect(url)
# anonymous
if self.can_view(request):
if user_obj and user_obj.valid:
return user_obj, True
sys.stderr.write("Authentification anonyme dans %s\n" % self.name)
u = user.User(request, auth_username=self.auth_username, auth_method=self.name)
u.auth_username=self.auth_username
u.name=self.auth_username
u.valid = True
u.auth_method = self.name
u.name = self.auth_username
elif user_obj and user_obj.valid:
u=user_obj
u.valid = False
else:
u = user_obj
# If reach anonymous user personnal page, redirect to referer with action=login
if p.path == "/%s" % self.auth_username and action != "login":
referer_p = urlparse.urlparse(request.http_referer)
if get_host(request.environ) == referer_p.netloc:
referer_url = urlparse.urlunparse(('https', p.netloc, referer_p.path, "", "", ""))
request.http_redirect(referer_url + "?action=login")
else:
request.http_redirect("https://"+ p.netloc + "/?action=login")
return u, True
# -*- coding: iso-8859-1 -*-
"""
MoinMoin - CAS authentication
Jasig CAS (see http://www.jasig.org/cas) authentication module.
@copyright: 2012 MoinMoin:RichardLiao
@license: GNU GPL, see COPYING for details.
"""
import sys
import os
import time
import re
import urlparse
import urllib, urllib2
from lxml import etree
from lxml.etree import XMLSyntaxError
from MoinMoin import log
logging = log.getLogger(__name__)
from MoinMoin.auth import BaseAuth
from MoinMoin import user, wikiutil
from MoinMoin.theme import load_theme_fallback
class PyCAS(object):
"""A class for working with a CAS server."""
def __init__(self, server_url, renew=False, login_path='/login', logout_path='/logout',
validate_path='/validate', coding='utf-8'):
self.server_url = server_url
self.renew = renew
self.login_path = login_path
self.logout_path = logout_path
self.validate_path = validate_path
self.coding = coding
def login_url(self, service):
"""Return the login URL for the given service."""
url = self.server_url + self.login_path + '?service=' + urllib.quote_plus(service.encode('utf8'))
if self.renew:
url += "&renew=true"
return url.decode('utf8')
def logout_url(self, redirect_url=None):
"""Return the logout URL."""
url = self.server_url + self.logout_path
if redirect_url:
url += '?url=' + urllib.quote_plus(redirect_url.encode('utf8'))
url += '&service=' + urllib.quote_plus(redirect_url.encode('utf8'))
return url
def validate_url(self, service, ticket):
"""Return the validation URL for the given service. (For CAS 1.0)"""
url = self.server_url + self.validate_path + '?service=' + urllib.quote_plus(service.encode('utf8')) + '&ticket=' + urllib.quote_plus(ticket)
if self.renew:
url += "&renew=true"
return url
def singlesignout(self, callback, body):
try:
nodes = etree.fromstring(body).xpath("/samlp:LogoutRequest/samlp:SessionIndex", namespaces={'samlp' : 'urn:oasis:names:tc:SAML:2.0:protocol'})
for node in nodes:
callback(node.text)
except XMLSyntaxError:
pass
def validate_ticket(self, service, ticket):
"""Validate the given ticket against the given service."""
f = urllib2.urlopen(self.validate_url(service, ticket))
valid = f.readline()
valid = valid.strip() == 'yes'
user = f.readline().strip()
return valid, user
class CASAuth(BaseAuth):
""" handle login from CAS """
name = 'CAS'
login_inputs = []
logout_possible = True
def __init__(self, auth_server, login_path="/login", logout_path="/logout", validate_path="/validate", action="login_cas", create_user=False, fallback_url=None, ticket_path=None, assoc_path=None):
BaseAuth.__init__(self)
self.cas = PyCAS(auth_server, login_path=login_path,
validate_path=validate_path, logout_path=logout_path)
self.action = action
self.create_user = create_user
self.fallback_url = fallback_url
self.ticket_path = ticket_path
self.assoc_path = assoc_path
def request(self, request, user_obj, **kw):
ticket = request.args.get("ticket", "")
action = request.args.get("action", "")
force = request.args.get("force", None) is not None
logoutRequest = request.args.get("logoutRequest", [])
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
def store_ticket(ticket, username):
with open(self.ticket_path + ticket, 'w') as f:
f.write(username)
def username_of_ticket(ticket):
try:
with open(self.ticket_path + ticket) as f:
username = f.read()
os.remove(self.ticket_path + ticket)
return username
except IOError:
return None
def logout_user(ticket):
username = username_of_ticket(ticket)
if username:
u = user.User(request, None, username)
checks = []
if u.exists():
def user_matches(session):
try:
return session['user.id'] == u.id
except KeyError:
return False
session_service = request.cfg.session_service
for sid in session_service.get_all_session_ids(request):
session = session_service.get_session(request, sid)
if user_matches(session):
session_service.destroy_session(request, session)
# authenticated user
if not force and user_obj and user_obj.valid:
if (action == self.action or (ticket and ticket.startswith('ST-'))) and user_obj.auth_method == self.name:
request.http_redirect(url)
if self.ticket_path and request.method == 'POST':
logoutRequest=request.form.get('logoutRequest', None)
if logoutRequest is not None:
sys.stderr.write(u"Tentative de déconnexion du CAS : %s\n" % logoutRequest.decode('ascii', 'ignore'))
self.cas.singlesignout(logout_user, logoutRequest)
# valid ticket on CAS
if ticket and ticket.startswith('ST-'):
valid, cas_login = self.cas.validate_ticket(url, ticket)
try:
f = open(os.path.join(self.assoc_path, cas_login), 'r')
username = f.read().decode('utf-8', "ignore").strip()
except IOError:
valid = False
# TODO redirect to explanation how to create/link account with cas_login
if valid:
sys.stderr.write(u"Authentifiction de %s sur le CAS\n" % username.decode("ascii", "ignore"))
u = user.User(request, auth_username=username.decode(self.cas.coding), auth_method=self.name)
# auto create user ?
if self.create_user:
u.valid = valid
u.create_or_update(True)
else:
u.valid = u.exists()
if self.fallback_url and not u.valid:
request.http_redirect("%s?action=%s&wiki_url=%s" % (self.fallback_url, self.action, url))
if u.valid:
store_ticket(ticket, username)
load_theme_fallback(request, u.theme_name)
return u, True
else:
request.http_redirect(self.cas.login_url(url))
elif self.action == action: # Redirect login
request.http_redirect(self.cas.login_url(url))
return user_obj, True
def logout(self, request, user_obj, **kw):
if self.name and user_obj and user_obj.auth_method == self.name:
user_obj.valid = False
request.cfg.session_service.destroy_session(request, request.session)
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse((p.scheme, p.netloc, p.path, "", "", ""))
request.http_redirect(self.cas.logout_url(url))
return user_obj, False
def login_hint(self, request):
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
_ = request.getText
msg = _('<p><a href="%s">Se connecter via le CAS</a> (vous devez disposer d\'un compte Cr@ns pour cela)</p>' % self.cas.login_url(url))
return msg
# -*- coding: iso-8859-1 -*-
"""
MoinMoin - CAS authentication
Jasig CAS (see http://www.jasig.org/cas) authentication module.
@copyright: 2012 MoinMoin:RichardLiao
@license: GNU GPL, see COPYING for details.
"""
import sys
import os
import time, re
import urlparse
import urllib, urllib2
from netaddr import IPNetwork, IPAddress
from MoinMoin import log
logging = log.getLogger(__name__)
from MoinMoin.auth import BaseAuth
from MoinMoin import user, wikiutil
from MoinMoin.Page import Page
from anonymous_user import AnonymousAuth
class PublicCategories(AnonymousAuth):
name = 'PublicCategories'
def __init__(self, pub_cats=[], auth_username="Connexion"):
AnonymousAuth.__init__(self, auth_username=auth_username)
self.pub_cats=pub_cats
def can_view(self, request):
p = urlparse.urlparse(request.url)
if p.path[1:] == "":
return True
if request.page is not None:
sys.stderr.write("Getting page from cache")
page = request.page
else:
pagename= urllib.unquote(p.path[1:]).encode('ascii', 'ignore')
if pagename.endswith(u'/'):
pagename=pagename[:-1]
page = Page(request, pagename)
acl = page.getACL(request)
if acl.acl and [a[0] for a in acl.acl if a[0] == 'All' and a[1].get('read',False)]:
sys.stderr.write("acl +All:read\n")
return False
categories = page.getCategories(request)
if categories:
for cat in self.pub_cats:
if cat in categories:
sys.stderr.write("%r dans la page %r\n" % (cat, p.path[1:]))
return True
else:
sys.stderr.write("categories is empty, searching in page body of %s\n" % p.path[1:].encode('utf8'))
body = page.getPageText()
for cat in self.pub_cats:
if cat in body:
sys.stderr.write("%s dans la page %s\n" % (cat.encode('utf8'), p.path[1:].encode('utf8')))
return True
sys.stderr.write("%s n'est pas dans la page %s, %r\n" % (cat.encode('utf8'), p.path[1:].encode('utf8'), categories))
return False
def request(self, request, user_obj, **kw):
user_obj, cont = AnonymousAuth.request(self, request, user_obj, **kw)
if user_obj and not user_obj.valid and user_obj.auth_method == self.name:
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
action = request.args.get("action", "")
# pas de redirection si action est déjà deny ou que l'utilisateur se connecte
# sinon délogue si page pas publique et précédement pseudo-logué
if action not in [ "deny", "login"]:
request.http_redirect(url + "?action=deny")
return user_obj, cont
# -*- coding: iso-8859-1 -*-
"""
MoinMoin - CAS authentication
Jasig CAS (see http://www.jasig.org/cas) authentication module.
@copyright: 2012 MoinMoin:RichardLiao
@license: GNU GPL, see COPYING for details.
"""
import urlparse
from netaddr import IPNetwork, IPAddress
from MoinMoin.auth import BaseAuth
from MoinMoin.Page import Page
from MoinMoin import user
from anonymous_user import AnonymousAuth
class IpRange(AnonymousAuth):
name = 'IpRange'
def __init__(self, auth_username="Connexion", local_nets=[], actions=[], actions_msg={}, proxy_ips=[]):
AnonymousAuth.__init__(self, auth_username=auth_username)
self.local_nets = local_nets;
self.actions = actions
self.actions_msg=actions_msg
self.proxy_ips = [IPAddress(ip) for ip in proxy_ips]
def can_view(self, request):
if request.remote_addr is None:
# Pas de remote_addr => on est command-line
return True
remote_addr = IPAddress(request.remote_addr)
try:
if remote_addr in self.proxy_ips:
remote_addr = IPAddress(request.in_headers['P-Real-Ip'])
except KeyError:
return False
try:
for net in self.local_nets:
if remote_addr in IPNetwork(net):
return True
except:
pass
return False
def request(self, request, user_obj, **kw):
user_obj, cont = AnonymousAuth.request(self, request, user_obj, **kw)
if not user_obj or not user_obj.valid:
# Are we trying to do a protected action (eg newaccount)
action = request.args.get("action", "")
if action in self.actions:
if action in self.actions_msg.keys():
request.theme.add_msg(self.actions_msg[action])
p = urlparse.urlparse(request.url)
url = urlparse.urlunparse(('https', p.netloc, p.path, "", "", ""))
request.http_redirect(url.encode('ascii', 'ignore'))
return user_obj, True
# -*- coding: iso-8859-1 -*-
"""
MoinMoin - CAS authentication
Jasig CAS (see http://www.jasig.org/cas) authentication module.
@copyright: 2012 MoinMoin:RichardLiao
@license: GNU GPL, see COPYING for details.
"""
import sys
import os
import time, re
import urlparse
import urllib, urllib2
from netaddr import IPNetwork, IPAddress
from werkzeug import get_host
from MoinMoin import log
logging = log.getLogger(__name__)
from MoinMoin.auth import BaseAuth, ContinueLogin
from MoinMoin import user, wikiutil
from MoinMoin.theme import load_theme_fallback
class ThemeVhost(BaseAuth):
name = 'ThemeVhost'
login_inputs = []
logout_possible = False
def __init__(self, theme_vhost={}):
BaseAuth.__init__(self)
self.theme_vhost=theme_vhost;
def request(self, request, user_obj, **kw):
host = get_host(request.environ)
if not user_obj:
theme_name = self.theme_vhost.get(host, request.user.theme_name)
else:
theme_name = self.theme_vhost.get(host, user_obj.theme_name)
load_theme_fallback(request, theme_name)
return user_obj, True
def login(self, request, user_obj, **kw):
return ContinueLogin(user_obj)
def logout(self, request, user_obj, **kw):
load_theme_fallback(request, request.cfg.theme_default)
return user_obj, True
def login_hint(self, request):
_ = request.getText
msg = _(u'<p>Plus d\'informations sur comment cr&eacute;er un compte sont disponibles sur la page <a href="/Cr&eacute;erUnCompteWiki">Cr&eacute;erUnCompteWiki</a></p>')
return msg
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment