Commit 667483fc authored by Valentin Samir's avatar Valentin Samir

initial commit

parents
*.pyc
bootstrap3
cas/
db.sqlite3
manage.py
import default_settings
from django.contrib import admin
from models import *
from forms import *
# Register your models here.
class ServiceTicketInline(admin.TabularInline):
model = ServiceTicket
extra = 0
form = TicketForm
class ProxyTicketInline(admin.TabularInline):
model = ProxyTicket
extra = 0
form = TicketForm
class ProxyGrantingInline(admin.TabularInline):
model = ProxyGrantingTicket
extra = 0
form = TicketForm
class UserAdmin(admin.ModelAdmin):
inlines = (ServiceTicketInline, ProxyTicketInline, ProxyGrantingInline)
class ServicePatternAdmin(admin.ModelAdmin):
list_display = ('pos', 'pattern', 'proxy')
admin.site.register(User, UserAdmin)
admin.site.register(ServicePattern, ServicePatternAdmin)
#admin.site.register(ProxyGrantingTicketIOU, admin.ModelAdmin)
from django.conf import settings
def setting_default(name, default_value):
value = getattr(settings, name, default_value)
setattr(settings, name, value)
class AuthUser(object):
def __init__(self, username):
self.username = username
def test_password(self, password):
return self.username == "test" and password == "test"
def attributs(self):
return {'nom':'Nymous', 'prenom':'Ano', 'email':'anonymous@example.net'}
setting_default('CAS_LOGIN_TEMPLATE', 'cas_server/login.html')
setting_default('CAS_WARN_TEMPLATE', 'cas_server/warn.html')
setting_default('CAS_LOGGED_TEMPLATE', 'cas_server/logged.html')
setting_default('CAS_AUTH_CLASS', AuthUser)
setting_default('CAS_ST_LEN', 30)
setting_default('CAS_TICKET_VALIDITY', 300)
setting_default('CAS_PROXY_CA_CERTIFICATE_PATH', True)
from django import forms
from django.conf import settings
import models
class UserCredential(forms.Form):
username = forms.CharField(label='login')
service = forms.CharField(widget=forms.HiddenInput(), required=False)
password = forms.CharField(label='password', widget=forms.PasswordInput)
method = forms.CharField(widget=forms.HiddenInput(), required=False)
warn = forms.BooleanField(label='warn', required=False)
def __init__(self, *args, **kwargs):
super(UserCredential, self).__init__(*args, **kwargs)
def clean(self):
cleaned_data = super(UserCredential, self).clean()
auth = settings.CAS_AUTH_CLASS(cleaned_data.get("username"))
if auth.test_password(cleaned_data.get("password")):
try:
user = models.User.objects.get(username=auth.username)
user.attributs=auth.attributs()
user.save()
except models.User.DoesNotExist:
user = models.User.objects.create(username=auth.username, attributs=auth.attributs())
user.save()
self.user = user
else:
raise forms.ValidationError("Bad user")
class TicketForm(forms.ModelForm):
class Meta:
model = models.Ticket
exclude = []
service = forms.CharField(widget=forms.TextInput)
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
import cas_server.models
import picklefield.fields
class Migration(migrations.Migration):
dependencies = [
]
operations = [
migrations.CreateModel(
name='Proxy',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('url', models.CharField(max_length=255)),
],
options={
'ordering': ('-pk',),
},
bases=(models.Model,),
),
migrations.CreateModel(
name='ProxyGrantingTicket',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('attributs', picklefield.fields.PickledObjectField(editable=False)),
('validate', models.BooleanField(default=False)),
('service', models.TextField()),
('creation', models.DateTimeField(auto_now_add=True)),
('renew', models.BooleanField(default=False)),
('value', models.CharField(default=cas_server.models._gen_pgt, unique=True, max_length=255)),
],
options={
'abstract': False,
},
bases=(models.Model,),
),
migrations.CreateModel(
name='ProxyTicket',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('attributs', picklefield.fields.PickledObjectField(editable=False)),
('validate', models.BooleanField(default=False)),
('service', models.TextField()),
('creation', models.DateTimeField(auto_now_add=True)),
('renew', models.BooleanField(default=False)),
('value', models.CharField(default=cas_server.models._gen_pt, unique=True, max_length=255)),
],
options={
'abstract': False,
},
bases=(models.Model,),
),
migrations.CreateModel(
name='ServicePattern',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('pos', models.IntegerField(default=100)),
('pattern', models.CharField(unique=True, max_length=255)),
('user_field', models.CharField(default=b'', help_text=b"Nom de l'attribut transmit comme username, vide = login", max_length=255, blank=True)),
('usernames', models.CharField(default=b'', help_text=b"Liste d'utilisateurs accept\xc3\xa9s s\xc3\xa9par\xc3\xa9 par des virgules, vide = tous les utilisateur", max_length=255, blank=True)),
('attributs', models.CharField(default=b'', help_text=b"Liste des nom d'attributs \xc3\xa0 transmettre au service, s\xc3\xa9par\xc3\xa9 par une virgule. vide = aucun", max_length=255, blank=True)),
('proxy', models.BooleanField(default=False, help_text=b"Un ProxyGrantingTicket peut \xc3\xaatre d\xc3\xa9livr\xc3\xa9 au service pour s'authentifier en temps que l'utilisateur sur d'autres services")),
('filter', models.CharField(default=b'', help_text=b'Une lambda fonction pour filtrer sur les utilisateur o\xc3\xb9 leurs attribut, arg1: username, arg2:attrs_dict. vide = pas de filtre', max_length=255, blank=True)),
],
options={
'ordering': ('pos',),
},
bases=(models.Model,),
),
migrations.CreateModel(
name='ServiceTicket',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('attributs', picklefield.fields.PickledObjectField(editable=False)),
('validate', models.BooleanField(default=False)),
('service', models.TextField()),
('creation', models.DateTimeField(auto_now_add=True)),
('renew', models.BooleanField(default=False)),
('value', models.CharField(default=cas_server.models._gen_st, unique=True, max_length=255)),
],
options={
'abstract': False,
},
bases=(models.Model,),
),
migrations.CreateModel(
name='User',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('username', models.CharField(unique=True, max_length=30)),
('attributs', picklefield.fields.PickledObjectField(editable=False)),
('date', models.DateTimeField(auto_now=True, auto_now_add=True)),
],
options={
},
bases=(models.Model,),
),
migrations.AddField(
model_name='serviceticket',
name='user',
field=models.ForeignKey(related_name='serviceticket', to='cas_server.User'),
preserve_default=True,
),
migrations.AddField(
model_name='proxyticket',
name='user',
field=models.ForeignKey(related_name='proxyticket', to='cas_server.User'),
preserve_default=True,
),
migrations.AddField(
model_name='proxygrantingticket',
name='user',
field=models.ForeignKey(related_name='proxygrantingticket', to='cas_server.User'),
preserve_default=True,
),
migrations.AddField(
model_name='proxy',
name='proxy_ticket',
field=models.ForeignKey(related_name='proxies', to='cas_server.ProxyTicket'),
preserve_default=True,
),
]
# ⁻*- coding: utf-8 -*-
from django.conf import settings
from django.db import models
from django.contrib import messages
from picklefield.fields import PickledObjectField
import re
import os
import time
import random
import string
import requests
import utils
def _gen_ticket(prefix):
return '%s-%s' % (prefix, ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(settings.CAS_ST_LEN)))
def _gen_st():
return _gen_ticket('ST')
def _gen_pt():
return _gen_ticket('PT')
def _gen_pgt():
return _gen_ticket('PGT')
class User(models.Model):
username = models.CharField(max_length=30, unique=True)
attributs = PickledObjectField()
date = models.DateTimeField(auto_now_add=True, auto_now=True)
def __unicode__(self):
return self.username
def logout(self, request):
for ticket in ServiceTicket.objects.filter(user=self):
ticket.logout(request)
ticket.delete()
for ticket in ProxyTicket.objects.filter(user=self):
ticket.logout(request)
ticket.delete()
for ticket in ProxyGrantingTicket.objects.filter(user=self):
ticket.logout(request)
ticket.delete()
def delete(self):
super(User, self).delete()
def get_service_url(self, service, service_pattern, renew):
attributs = [s.strip() for s in service_pattern.attributs.split(',')]
ticket = ServiceTicket.objects.create(user=self, attributs = dict([(k, v) for (k, v) in self.attributs.items() if k in attributs]), service=service, renew=renew)
ticket.save()
url = utils.update_url(service, {'ticket':ticket.value})
return url
class Ticket(models.Model):
class Meta:
abstract = True
user = models.ForeignKey(User, related_name="%(class)s")
attributs = PickledObjectField()
validate = models.BooleanField(default=False)
service = models.TextField()
creation = models.DateTimeField(auto_now_add=True)
renew = models.BooleanField(default=False)
def __unicode__(self):
return u"%s: %s %s" % (self.user, self.value, self.service)
def logout(self, request):
#if self.validate:
xml = """<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
ID="%(id)s" Version="2.0" IssueInstant="%(datetime)s">
<saml:NameID xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"></saml:NameID>
<samlp:SessionIndex>%(ticket)s</samlp:SessionIndex>
</samlp:LogoutRequest>""" % {'id' : os.urandom(20).encode("hex"), 'datetime' : int(time.time()), 'ticket': self.value}
headers = {'Content-Type': 'text/xml'}
try:
requests.post(self.service.encode('utf-8'), data=xml.encode('utf-8'), headers=headers)
except Exception as e:
messages.add_message(request, messages.WARNING, u'Erreur lors de la déconnexion du service %s:\n%s' % (self.service, e))
class ServiceTicket(Ticket):
value = models.CharField(max_length=255, default=_gen_st, unique=True)
class ProxyTicket(Ticket):
value = models.CharField(max_length=255, default=_gen_pt, unique=True)
class ProxyGrantingTicket(Ticket):
value = models.CharField(max_length=255, default=_gen_pgt, unique=True)
#class ProxyGrantingTicketIOU(Ticket):
# value = models.CharField(max_length=255, default=lambda:_gen_ticket('PGTIOU'), unique=True)
class Proxy(models.Model):
class Meta:
ordering = ("-pk", )
url = models.CharField(max_length=255)
proxy_ticket = models.ForeignKey(ProxyTicket, related_name="proxies")
class BadUsername(Exception):
pass
class BadFilter(Exception):
pass
class ServicePattern(models.Model):
class Meta:
ordering = ("pos", )
pos = models.IntegerField(default=100)
pattern = models.CharField(max_length=255, unique=True)
user_field = models.CharField(max_length=255, default="", blank=True, help_text="Nom de l'attribut transmit comme username, vide = login")
usernames = models.CharField(max_length=255, default="", blank=True, help_text="Liste d'utilisateurs acceptés séparé par des virgules, vide = tous les utilisateur")
attributs = models.CharField(max_length=255, default="", blank=True, help_text="Liste des nom d'attributs à transmettre au service, séparé par une virgule. vide = aucun")
proxy = models.BooleanField(default=False, help_text="Un ProxyGrantingTicket peut être délivré au service pour s'authentifier en temps que l'utilisateur sur d'autres services")
filter = models.CharField(max_length=255, default="", blank=True, help_text="Une lambda fonction pour filtrer sur les utilisateur où leurs attribut, arg1: username, arg2:attrs_dict. vide = pas de filtre")
def __unicode__(self):
return u"%s: %s" % (self.pos, self.pattern)
def check_user(self, user):
if self.usernames and not user.username in self.usernames.split(','):
raise BadUsername()
if self.filter and self.filter.startswith("lambda") and not eval(str(self.filter))(user.username, user.attributs):
raise BadFilter()
return True
@classmethod
def validate(cls, service):
for s in cls.objects.all():
if re.match(s.pattern, service):
return s
raise cls.DoesNotExist()
body {
padding-top: 40px;
padding-bottom: 40px;
background-color: #eee;
}
/*.form-signin {
max-width: 330px;
padding: 15px;
margin: 0 auto;
}
*/
.form-signin .form-signin-heading,
.form-signin .checkbox {
margin-bottom: 10px;
}
.form-signin .checkbox {
font-weight: normal;
}
.form-signin .form-control {
position: relative;
height: auto;
-webkit-box-sizing: border-box;
-moz-box-sizing: border-box;
box-sizing: border-box;
padding: 10px;
font-size: 16px;
}
.form-signin .form-control:focus {
z-index: 2;
}
.form-signin input[type="text"] {
margin-bottom: -1px;
border-bottom-right-radius: 0;
border-bottom-left-radius: 0;
}
.form-signin input[type="password"] {
margin-bottom: 10px;
border-top-left-radius: 0;
border-top-right-radius: 0;
}
{% extends 'bootstrap3/bootstrap3.html' %}
{% block bootstrap3_title %}{% block title %}{% endblock %}{% endblock %}
{% load url from future %}
{% load bootstrap3 %}
{% extends "cas_server/base.html" %}
{% load bootstrap3 %}
{% load staticfiles %}
{% block bootstrap3_extra_head %}
<link href="{% static "cas_server/login.css" %}" rel="stylesheet">
{% endblock %}
{% block bootstrap3_content %}
<div class="container">
<div class="row">
<div class="col-md-3"></div>
<div class="col-md-6">
{% bootstrap_messages %}
<div class="alert alert-success" role="alert">Logged</div>
{% bootstrap_button 'Arrêter' size='lg' button_class="btn-danger btn-block" href="logout" %}
</div>
<div class="col-md-3"></div>
</div>
</div> <!-- /container -->
{% endblock %}
{% extends "cas_server/base.html" %}
{% load bootstrap3 %}
{% load staticfiles %}
{% block bootstrap3_extra_head %}
<link href="{% static "cas_server/login.css" %}" rel="stylesheet">
{% endblock %}
{% block bootstrap3_content %}
<div class="container">
<div class="row">
<div class="col-md-3"></div>
<div class="col-md-6">
{% bootstrap_messages %}
<form class="form-signin" method="post">
<h2 class="form-signin-heading">Merci de se connecter</h2>
{% csrf_token %}
{% bootstrap_form form %}
{% bootstrap_button 'Connection' size='lg' button_type="submit" button_class="btn-block"%}
</form>
</div>
<div class="col-md-3"></div>
</div>
</div> <!-- /container -->
{% endblock %}
<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:proxySuccess>
<cas:proxyTicket>{{ticket}}</cas:proxyTicket>
</cas:proxySuccess>
</cas:serviceResponse>
<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:authenticationSuccess>
<cas:user>{{username}}</cas:user>
<cas:attributes>
{% for key, value in attributes %} <cas:{{key}}>{{value}}</cas:{{key}}>
{% endfor %}
</cas:attributes>
{% if proxyGrantingTicket %}
<cas:proxyGrantingTicket>{{proxyGrantingTicket}}</cas:proxyGrantingTicket>
{% endif %}
{% if proxies %}
<cas:proxies>
{% for proxy in proxies %}
<cas:proxy>{{proxy}}</cas:proxy>
{% endfor %}
</cas:proxies>
{% endif %}
</cas:authenticationSuccess>
</cas:serviceResponse>
<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:authenticationFailure code="{{code}}">
{{msg}}
</cas:authenticationFailure>
</cas:serviceResponse>
{% extends "cas_server/base.html" %}
{% load bootstrap3 %}
{% load staticfiles %}
{% block bootstrap3_extra_head %}
<link href="{% static "cas_server/login.css" %}" rel="stylesheet">
{% endblock %}
{% block bootstrap3_content %}
<div class="container">
<div class="row">
<div class="col-md-3"></div>
<div class="col-md-6">
{% bootstrap_messages %}
<div class="alert alert-warning" role="alert">Une demande d'authentification a été émise pour le service {{service}}</div>
{% bootstrap_button 'Se connecter au service' size='lg' button_class="btn-primary btn-block" href=service_ticket_url %}
</div>
<div class="col-md-3"></div>
</div>
</div> <!-- /container -->
{% endblock %}
from django.test import TestCase
# Create your tests here.
# ⁻*- coding: utf-8 -*-
from django.conf.urls import patterns, url
from django.views.generic import RedirectView
import views
urlpatterns = patterns('',
url(r'^$', RedirectView.as_view(pattern_name="login")),
url('^login$', views.login, name='login'),
url('^logout$', views.logout, name='logout'),
url('^validate$', views.validate, name='validate'),
url('^serviceValidate$', views.serviceValidate, name='serviceValidate'),
url('^proxyValidate$', views.proxyValidate, name='proxyValidate'),
url('^proxy$', views.proxy, name='proxy'),
url('^p3/serviceValidate$', views.p3_serviceValidate, name='p3_serviceValidate'),
url('^p3/proxyValidate$', views.p3_proxyValidate, name='p3_proxyValidate'),
)
import urlparse
import urllib
def update_url(url, params):
url = urlparse.urlparse(url)
url_parts = list(urlparse.urlparse(service))
query = dict(urlparse.parse_qsl(url_parts[4]))
query.update(params)
url_parts[4] = urllib.urlencode(query)
return urlparse.urlunparse(url_parts)
# ⁻*- coding: utf-8 -*-
from django.shortcuts import render, redirect
from django.http import HttpResponse, StreamingHttpResponse
from django.conf import settings
from django.contrib import messages
import requests
from datetime import datetime, timedelta
import utils
import forms
import models
def _logout(request):
try: del request.session["authenticated"]
except KeyError: pass
try: del request.session["username"]
except KeyError: pass
try: del request.session["warn"]
except KeyError: pass
def login(request):
user = None
form = None
service_pattern = None
renewed = False
if request.method == 'POST':
service = request.POST.get('service')
renew = True if request.POST.get('renew') else False
gateway = request.POST.get('gateway')
method = request.POST.get('method')
if not request.session.get("authenticated") or renew:
form = forms.UserCredential(request.POST, initial={'service':service,'method':method,'warn':request.session.get("warn")})
if form.is_valid():
user = models.User.objects.get(username=form.cleaned_data['username'])
request.session["username"] = form.cleaned_data['username']
request.session["warn"] = True if form.cleaned_data.get("warn") else False
request.session["authenticated"] = True
renewed = True
else:
_logout(request)
else:
service = request.GET.get('service')
renew = True if request.GET.get('renew') else False
gateway = request.GET.get('gateway')
method = request.GET.get('method')
if not request.session.get("authenticated") or renew:
form = forms.UserCredential(initial={'service':service,'method':method,'warn':request.session.get("warn")})
# if authenticated and successfully renewed authentication if needed
if request.session.get("authenticated") and (not renew or renewed):
try:
user = models.User.objects.get(username=request.session["username"])
except models.User.DoesNotExist:
_logout(request)
# if login agains a service is requestest
if service:
try:
# is the service allowed
service_pattern = models.ServicePattern.validate(service)
# is the current user allowed on this service
service_pattern.check_user(user)
# if the user has asked to be warned before any login to a service (no transparent SSO)
if request.session["warn"]:
return render(request, settings.CAS_WARN_TEMPLATE, {'service_ticket_url':user.get_service_url(service, service_pattern, renew=renew),'service':service})
else:
return redirect(user.get_service_url(service, service_pattern, renew=renew)) # redirect, using method ?
except models.ServicePattern.DoesNotExist:
messages.add_message(request, messages.ERROR, u'Service %s non autorisé.' % service)
except models.BadUsername:
messages.add_message(request, messages.ERROR, u"Nom d'utilisateur non autorisé")
except models.BadFilter:
messages.add_message(request, messages.ERROR, u"Caractéristique utilisateur non autorisé")
# if gateway is set and auth failed redirect to the service without authentication
if gateway:
list(messages.get_messages(request)) # clean messages before leaving the django app
return redirect(service)
return render(request, settings.CAS_LOGGED_TEMPLATE, {})
else:
if service:
if gateway:
list(messages.get_messages(request)) # clean messages before leaving the django app
return redirect(service)
return render(request, settings.CAS_LOGIN_TEMPLATE, {'form':form})
def logout(request):
service = request.GET.get('service')
if request.session.get("authenticated"):
user = models.User.objects.get(username=request.session["username"])
user.logout(request)
user.delete()
_logout(request)
# if service is set, redirect to service after logout
if service:
list(messages.get_messages(request)) # clean messages before leaving the django app
return redirect(service)
# else redirect to login page
else:
messages.add_message(request, messages.SUCCESS, u'Déconnecté avec succès')
return redirect("login")
def validate(request):
service = request.GET.get('service')
ticket = request.GET.get('ticket')
renew = True if request.GET.get('renew') else False
if service and ticket:
try:
ticket = models.ServiceTicket.objects.get(value=ticket, service=service, validate=False, renew=renew, creation__gt=(datetime.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY)))
ticket.validate = True
ticket.save()
return HttpResponse("yes\n", content_type="text/plain")
except models.ServiceTicket.DoesNotExist:
return HttpResponse("no\n", content_type="text/plain")
else:
return HttpResponse("no\n", content_type="text/plain")
def psValidate(request, typ=['ST']):
service = request.GET.get('service')
ticket = request.GET.get('ticket')
pgtUrl = request.GET.get('pgtUrl')
renew = True if request.GET.get('renew') else False
if service and ticket:
for t in typ:
if ticket.startswith(t):
break
else:
return render(request, "cas_server/serviceValidateError.xml", {'code':'INVALID_TICKET'}, content_type="text/xml; charset=utf-8")
try:
proxies = []
if ticket.startswith("ST"):
ticket = models.ServiceTicket.objects.get(value=ticket, service=service, validate=False, renew=renew, creation__gt=(datetime.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY)))
elif ticket.startswith("PT"):
ticket = models.ProxyTicket.objects.get(value=ticket, service=service, validate=False, renew=renew, creation__gt=(datetime.now() - timedelta(seconds=settings.CAS_TICKET_VALIDITY)))
for p in ticket.proxies.all():
proxies.add(p.url)
ticket.validate = True
ticket.save()
attributes = []
for key, value in ticket.attributs.items():
if isinstance(value, list):
for v in value:
attributes.append((key, v))
else:
attributes.append((key, value))
params = {'username':ticket.user.username, 'attributes':attributes, 'proxies':proxies}
if pgtUrl and pgtUrl.startswith("https://"):
pattern =