test_view.py 88.6 KB
Newer Older
1
# -*- coding: utf-8 -*-
Valentin Samir's avatar
Valentin Samir committed
2 3 4 5 6 7 8 9 10 11
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
12
"""Tests module for views"""
13
from cas_server.default_settings import settings
14

15
import django
16 17
from django.test import TestCase, Client
from django.test.utils import override_settings
18 19
from django.utils import timezone

20

21
import random
22
import json
23
import mock
24
from lxml import etree
25
from six.moves import range
26

Valentin Samir's avatar
Valentin Samir committed
27 28
from cas_server import models
from cas_server import utils
29 30 31 32 33 34
from cas_server.tests.utils import (
    copy_form,
    get_login_page_params,
    get_auth_client,
    get_user_ticket_request,
    get_pgt,
Valentin Samir's avatar
Valentin Samir committed
35
    get_proxy_ticket,
36 37 38
    get_validated_ticket,
    HttpParamsHandler,
    Http404Handler
39
)
40
from cas_server.tests.mixin import BaseServicePattern, XmlContent, CanLogin
41

42 43

@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
44
class LoginTestCase(TestCase, BaseServicePattern, CanLogin):
45 46 47
    """Tests for the login view"""
    def setUp(self):
        """Prepare the test context:"""
48
        # we prepare a bunch a service url and service patterns for tests
49 50
        self.setup_service_patterns()

51
    @override_settings(CAS_NEW_VERSION_HTML_WARNING=True)
52
    @mock.patch("cas_server.utils.last_version", lambda: "1.2.3")
53 54
    @mock.patch("cas_server.utils.VERSION", "0.1.2")
    def test_new_version_available_ok(self):
Valentin Samir's avatar
Valentin Samir committed
55
        """test the new version info box"""
56 57 58 59 60
        client = Client()
        response = client.get("/login")
        self.assertIn(b"A new version of the application is available", response.content)

    @override_settings(CAS_NEW_VERSION_HTML_WARNING=True)
61
    @mock.patch("cas_server.utils.last_version", lambda: None)
62 63
    @mock.patch("cas_server.utils.VERSION", "0.1.2")
    def test_new_version_available_badpypi(self):
Valentin Samir's avatar
Valentin Samir committed
64 65 66
        """
            test the new version info box if pypi is not available (unable to retreive last version)
        """
67 68 69 70 71 72
        client = Client()
        response = client.get("/login")
        self.assertNotIn(b"A new version of the application is available", response.content)

    @override_settings(CAS_NEW_VERSION_HTML_WARNING=False)
    def test_new_version_available_disabled(self):
Valentin Samir's avatar
Valentin Samir committed
73
        """test the new version info box is disabled"""
74 75 76 77
        client = Client()
        response = client.get("/login")
        self.assertNotIn(b"A new version of the application is available", response.content)

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    @override_settings(CAS_INFO_MESSAGES_ORDER=["cas_explained"])
    def test_messages_info_box_enabled(self):
        """test that the message info-box is displayed then enabled"""
        client = Client()
        response = client.get("/login")
        self.assertIn(
            b"The Central Authentication Service grants you access to most of our websites by ",
            response.content
        )

    @override_settings(CAS_INFO_MESSAGES_ORDER=[])
    def test_messages_info_box_disabled(self):
        """test that the message info-box is not displayed then disabled"""
        client = Client()
        response = client.get("/login")
        self.assertNotIn(
            b"The Central Authentication Service grants you access to most of our websites by ",
            response.content
        )

    # test1 and test2 are malformed and should be ignored, test3 is ok, test5 do not
    # exists and should be ignored
    @override_settings(CAS_INFO_MESSAGES_ORDER=["test1", "test2", "test3", "test5"])
    @override_settings(CAS_INFO_MESSAGES={
        "test1": "test",  # not a dict, should be ignored
        "test2": {"type": "success"},  # not "message" key, should be ignored
        "test3": {"message": "test3"},
        "test4": {"message": "test4"},
    })
    def test_messages_info_box_bad_messages(self):
        """test that mal formated messages dict are ignored"""
        client = Client()
        # not errors should be raises
        response = client.get("/login")
        # test3 is ok est should be there
        self.assertIn(b"test3", response.content)
        # test4 is not in CAS_INFO_MESSAGES_ORDER and should not be there
        self.assertNotIn(b"test4", response.content)

117 118
    def test_login_view_post_goodpass_goodlt(self):
        """Test a successul login"""
119 120
        # we get a client who fetch a frist time the login page and the login form default
        # parameters
121
        client, params = get_login_page_params()
122
        # we set username/password in the form
123 124
        params["username"] = settings.CAS_TEST_USER
        params["password"] = settings.CAS_TEST_PASSWORD
125
        # the LoginTicket in the form should match a valid LT in the user session
126 127
        self.assertTrue(params['lt'] in client.session['lt'])

128
        # we post a login attempt
129
        response = client.post('/login', params)
130
        # as username/password/lt are all valid, the login should succed
131
        self.assert_logged(client, response)
132
        # The LoginTicket is conssumed and should no longer be valid
133 134
        self.assertTrue(params['lt'] not in client.session['lt'])

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
    def test_login_post_missing_params(self):
        """Test a login attempt with missing POST parameters (username or password or both)"""
        # we get a client who fetch a frist time the login page and the login form default
        # parameters
        client, params = get_login_page_params()
        # we set only set username
        params["username"] = settings.CAS_TEST_USER
        # we post a login attempt
        response = client.post('/login', params)
        # as the LT is not valid, login should fail
        self.assert_login_failed(client, response)

        # we get a client who fetch a frist time the login page and the login form default
        # parameters
        client, params = get_login_page_params()
        # we set only set password
        params["password"] = settings.CAS_TEST_PASSWORD
        # we post a login attempt
        response = client.post('/login', params)
        # as the LT is not valid, login should fail
        self.assert_login_failed(client, response)

        # we get a client who fetch a frist time the login page and the login form default
        # parameters
        client, params = get_login_page_params()
        # we set neither username nor password
        # we post a login attempt
        response = client.post('/login', params)
        # as the LT is not valid, login should fail
        self.assert_login_failed(client, response)

166 167
    def test_login_view_post_goodpass_goodlt_warn(self):
        """Test a successul login requesting to be warned before creating services tickets"""
168
        # get a client and initial login params
169
        client, params = get_login_page_params()
170
        # set valids usernames/passswords
171 172
        params["username"] = settings.CAS_TEST_USER
        params["password"] = settings.CAS_TEST_PASSWORD
173
        # this time, we check the warn checkbox
174 175
        params["warn"] = "on"

176
        # postings login request
177
        response = client.post('/login', params)
178
        # as username/password/lt are all valid, the login should succed and warn be enabled
179 180 181 182
        self.assert_logged(client, response, warn=True)

    def test_lt_max(self):
        """Check we only keep the last 100 Login Ticket for a user"""
183
        # get a client and initial login params
184
        client, params = get_login_page_params()
185
        # get a first LT that should be valid
186
        current_lt = params["lt"]
187 188
        # we keep the last 100 generated LT by user, so after having generated `i_in_test` we
        # test if `current_lt` is still valid
Valentin Samir's avatar
Valentin Samir committed
189
        i_in_test = random.randint(0, 99)
190
        # after `i_not_in_test` `current_lt` should be valid not more
Valentin Samir's avatar
Valentin Samir committed
191
        i_not_in_test = random.randint(101, 150)
192
        # start generating 150 LT
193 194
        for i in range(150):
            if i == i_in_test:
195
                # before more than 100 LT generated, the first TL should be valid
196 197
                self.assertTrue(current_lt in client.session['lt'])
            if i == i_not_in_test:
198
                # after more than 100 LT generated, the first LT should be valid no more
199
                self.assertTrue(current_lt not in client.session['lt'])
200
                # assert that we do not keep more that 100 valid LT
201
                self.assertTrue(len(client.session['lt']) <= 100)
202
            # generate a new LT by getting the login page
203
            client, params = get_login_page_params(client)
204
        # in the end, we still have less that 100 valid LT
205 206
        self.assertTrue(len(client.session['lt']) <= 100)

207
    def test_login_view_post_badlt(self):
208 209
        """Login attempt with a bad LoginTicket, login should fail"""
        # get a client and initial login params
210
        client, params = get_login_page_params()
211
        # set valid username/password
212 213
        params["username"] = settings.CAS_TEST_USER
        params["password"] = settings.CAS_TEST_PASSWORD
214
        # set a bad LT
215 216
        params["lt"] = 'LT-random'

217
        # posting the login request
218 219
        response = client.post('/login', params)

220
        # as the LT is not valid, login should fail
221
        self.assert_login_failed(client, response)
222
        # the reason why login has failed is displayed to the user
Valentin Samir's avatar
Valentin Samir committed
223
        self.assertTrue(b"Invalid login ticket" in response.content)
224 225

    def test_login_view_post_badpass_good_lt(self):
226
        """Login attempt with a bad password"""
227
        # get a client and initial login params
228
        client, params = get_login_page_params()
229
        # set valid username but invalid password
230 231
        params["username"] = settings.CAS_TEST_USER
        params["password"] = "test2"
232
        # posting the login request
233 234
        response = client.post('/login', params)

235
        # as the password is wrong, login should fail
236
        self.assert_login_failed(client, response)
237
        # the reason why login has failed is displayed to the user
Valentin Samir's avatar
Valentin Samir committed
238 239
        self.assertTrue(
            (
Valentin Samir's avatar
Valentin Samir committed
240 241
                b"The credentials you provided cannot be "
                b"determined to be authentic"
Valentin Samir's avatar
Valentin Samir committed
242 243
            ) in response.content
        )
244 245 246

    def assert_ticket_attributes(self, client, ticket_value):
        """check the ticket attributes in the db"""
247
        # Get get current session user in the db
248 249 250
        user = models.User.objects.get(
            username=settings.CAS_TEST_USER,
            session_key=client.session.session_key
Valentin Samir's avatar
Valentin Samir committed
251
        )
252
        # we should find exactly one user
253
        self.assertTrue(user)
254
        # get the ticker object corresponting to `ticket_value`
255
        ticket = models.ServiceTicket.objects.get(value=ticket_value)
256
        # chek that the ticket is well attributed to the user
257
        self.assertEqual(ticket.user, user)
258
        # check that the user attributes match the attributes registered on the ticket
259
        self.assertEqual(ticket.attributs, settings.CAS_TEST_ATTRIBUTES)
260
        # check that the ticket has not being validated yet
261
        self.assertEqual(ticket.validate, False)
262
        # check that the service pattern registered on the ticket is the on we use for tests
263
        self.assertEqual(ticket.service_pattern, self.service_pattern)
264

265 266
    def assert_service_ticket(self, client, response):
        """check that a ticket is well emited when requested on a allowed service"""
267 268
        # On ticket emission, we should be redirected to the service url, setting the ticket
        # GET parameter
269 270
        self.assertEqual(response.status_code, 302)
        self.assertTrue(response.has_header('Location'))
Valentin Samir's avatar
Valentin Samir committed
271 272 273 274 275
        self.assertTrue(
            response['Location'].startswith(
                "https://www.example.com?ticket=%s-" % settings.CAS_SERVICE_TICKET_PREFIX
            )
        )
276 277
        # check that the value of the ticket GET parameter match the value of the ticket
        # created in the db
278
        ticket_value = response['Location'].split('ticket=')[-1]
279 280 281 282
        self.assert_ticket_attributes(client, ticket_value)

    def test_view_login_get_allowed_service(self):
        """Request a ticket for an allowed service by an unauthenticated client"""
283
        # get a bare new http client
284
        client = Client()
285 286
        # we are not authenticated and are asking for a ticket for https://www.example.com
        # which is a valid service matched by self.service_pattern
287
        response = client.get("/login?service=https://www.example.com")
288
        # the login page should be displayed
289
        self.assertEqual(response.status_code, 200)
290
        # we warn the user why it need to authenticated
291 292
        self.assertTrue(
            (
293 294
                b"Authentication required by service "
                b"example (https://www.example.com)"
295
            ) in response.content
Valentin Samir's avatar
Valentin Samir committed
296
        )
297

298
    @override_settings(CAS_SHOW_SERVICE_MESSAGES=False)
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
    def test_view_login_get_allowed_service_no_message(self):
        """Request a ticket for an allowed service by an unauthenticated client"""
        # get a bare new http client
        client = Client()
        # we are not authenticated and are asking for a ticket for https://www.example.com
        # which is a valid service matched by self.service_pattern
        response = client.get("/login?service=https://www.example.com")
        # the login page should be displayed
        self.assertEqual(response.status_code, 200)
        # we warn the user why it need to authenticated
        self.assertFalse(
            (
                b"Authentication required by service "
                b"example (https://www.example.com)"
            ) in response.content
        )

316 317
    def test_view_login_get_denied_service(self):
        """Request a ticket for an denied service by an unauthenticated client"""
318
        # get a bare new http client
319
        client = Client()
320 321
        # we are not authenticated and are asking for a ticket for https://www.example.net
        # which is NOT a valid service
322 323
        response = client.get("/login?service=https://www.example.net")
        self.assertEqual(response.status_code, 200)
324
        # we warn the user that https://www.example.net is not an allowed service url
325
        self.assertTrue(b"Service https://www.example.net not allowed" in response.content)
326

327
    @override_settings(CAS_SHOW_SERVICE_MESSAGES=False)
328 329 330 331 332 333 334 335 336 337 338
    def test_view_login_get_denied_service_no_message(self):
        """Request a ticket for an denied service by an unauthenticated client"""
        # get a bare new http client
        client = Client()
        # we are not authenticated and are asking for a ticket for https://www.example.net
        # which is NOT a valid service
        response = client.get("/login?service=https://www.example.net")
        self.assertEqual(response.status_code, 200)
        # we warn the user that https://www.example.net is not an allowed service url
        self.assertFalse(b"Service https://www.example.net not allowed" in response.content)

339 340
    def test_view_login_get_auth_allowed_service(self):
        """Request a ticket for an allowed service by an authenticated client"""
341
        # get a client that is already authenticated
342
        client = get_auth_client()
343
        # ask for a ticket for https://www.example.com
344
        response = client.get("/login?service=https://www.example.com")
345 346
        # as https://www.example.com is a valid service a ticket should be created and the
        # user redirected to the service url
347 348 349 350
        self.assert_service_ticket(client, response)

    def test_view_login_get_auth_allowed_service_warn(self):
        """Request a ticket for an allowed service by an authenticated client"""
351 352
        # get a client that is already authenticated and has ask to be warned befor we
        # generated a ticket
353
        client = get_auth_client(warn="on")
354
        # ask for a ticket for https://www.example.com
355
        response = client.get("/login?service=https://www.example.com")
356 357
        # we display a warning to the user, asking him to validate the ticket creation (insted
        # a generating and redirecting directly to the service url)
358 359 360
        self.assertEqual(response.status_code, 200)
        self.assertTrue(
            (
361 362
                b"Authentication has been required by service "
                b"example (https://www.example.com)"
363 364
            ) in response.content
        )
365
        # get the displayed form parameters
366
        params = copy_form(response.context["form"])
367
        # we post, confirming we want a ticket
368
        response = client.post("/login", params)
369 370
        # as https://www.example.com is a valid service a ticket should be created and the
        # user redirected to the service url
371
        self.assert_service_ticket(client, response)
372 373

    def test_view_login_get_auth_denied_service(self):
374
        """Request a ticket for a not allowed service by an authenticated client"""
375
        # get a client that is already authenticated
376
        client = get_auth_client()
377 378
        # we are authenticated and are asking for a ticket for https://www.example.org
        # which is NOT a valid service
379 380
        response = client.get("/login?service=https://www.example.org")
        self.assertEqual(response.status_code, 200)
381 382
        # we warn the user that https://www.example.net is not an allowed service url
        # NO ticket are created
383
        self.assertTrue(b"Service https://www.example.org not allowed" in response.content)
384

385 386
    def test_user_logged_not_in_db(self):
        """If the user is logged but has been delete from the database, it should be logged out"""
387
        # get a client that is already authenticated
388
        client = get_auth_client()
389
        # delete the user in the db
390 391 392 393
        models.User.objects.get(
            username=settings.CAS_TEST_USER,
            session_key=client.session.session_key
        ).delete()
394
        # fetch the login page
395 396
        response = client.get("/login")

397
        # The user should be logged out
398
        self.assert_login_failed(client, response, code=302)
399 400
        # and redirected to the login page. We branch depending on the version a django as
        # the test client behaviour changed after django 1.9
401
        if django.VERSION < (1, 9):  # pragma: no cover coverage is computed with dango 1.9
402 403 404
            self.assertEqual(response["Location"], "http://testserver/login")
        else:
            self.assertEqual(response["Location"], "/login?")
405 406

    def test_service_restrict_user(self):
407 408
        """Testing the restric user capability from a service"""
        # get a client that is already authenticated
409
        client = get_auth_client()
410

411 412 413
        # trying to get a ticket from a service url matched by a service pattern having a
        # restriction on the usernames allowed to get tickets. the test user username is not one
        # of this username.
414
        response = client.get("/login", {'service': self.service_restrict_user_fail})
415
        self.assertEqual(response.status_code, 200)
416
        # the ticket is not created and a warning is displayed to the user
417
        self.assertTrue(b"Username not allowed" in response.content)
418

419
        # same but with the tes user username being one of the allowed usernames
420
        response = client.get("/login", {'service': self.service_restrict_user_success})
421
        # the ticket is created and we are redirected to the service url
422
        self.assertEqual(response.status_code, 302)
423 424 425
        self.assertTrue(
            response["Location"].startswith("%s?ticket=" % self.service_restrict_user_success)
        )
426 427 428

    def test_service_filter(self):
        """Test the filtering on user attributes"""
429
        # get a client that is already authenticated
430
        client = get_auth_client()
431

432 433 434 435
        # trying to get a ticket from a service url matched by a service pattern having
        # a restriction on the user attributes. The test user if ailing these restrictions
        # We try first with a single value attribut (aka a string) and then with
        # a multi values attributs (aka a list of strings)
Valentin Samir's avatar
Valentin Samir committed
436 437
        for service in [self.service_filter_fail, self.service_filter_fail_alt]:
            response = client.get("/login", {'service': service})
438
            # the ticket is not created and a warning is displayed to the user
Valentin Samir's avatar
Valentin Samir committed
439
            self.assertEqual(response.status_code, 200)
440
            self.assertTrue(b"User characteristics not allowed" in response.content)
441

442
        # same but with rectriction that a valid upon the test user attributes
443
        response = client.get("/login", {'service': self.service_filter_success})
444
        # the ticket us created and the user redirected to the service url
445
        self.assertEqual(response.status_code, 302)
446
        self.assertTrue(response["Location"].startswith("%s?ticket=" % self.service_filter_success))
447 448 449

    def test_service_user_field(self):
        """Test using a user attribute as username: case on if the attribute exists or not"""
450
        # get a client that is already authenticated
451
        client = get_auth_client()
452

453 454
        # trying to get a ticket from a service url matched by a service pattern that use
        # a particular attribute has username. The test user do NOT have this attribute
455
        response = client.get("/login", {'service': self.service_field_needed_fail})
456
        # the ticket is not created and a warning is displayed to the user
457
        self.assertEqual(response.status_code, 200)
458
        self.assertTrue(b"The attribute uid is needed to use that service" in response.content)
459

460
        # same but with a attribute that the test user has
461
        response = client.get("/login", {'service': self.service_field_needed_success})
462
        # the ticket us created and the user redirected to the service url
463
        self.assertEqual(response.status_code, 302)
464 465 466
        self.assertTrue(
            response["Location"].startswith("%s?ticket=" % self.service_field_needed_success)
        )
467

468
    @override_settings(CAS_TEST_ATTRIBUTES={'alias': []})
469 470 471 472 473
    def test_service_user_field_evaluate_to_false(self):
        """
            Test using a user attribute as username:
            case the attribute exists but evaluate to False
        """
474
        # get a client that is already authenticated
475
        client = get_auth_client()
476 477 478
        # trying to get a ticket from a service url matched by a service pattern that use
        # a particular attribute has username. The test user have this attribute, but it is
        # evaluated to False (eg an empty string "" or an empty list [])
479
        response = client.get("/login", {"service": self.service_field_needed_success})
480
        # the ticket is not created and a warning is displayed to the user
481
        self.assertEqual(response.status_code, 200)
482
        self.assertTrue(b"The attribute alias is needed to use that service" in response.content)
483

484 485 486 487 488
    def test_gateway(self):
        """test gateway parameter"""

        # First with an authenticated client that fail to get a ticket for a service
        service = "https://restrict_user_fail.example.com"
489
        # get a client that is already authenticated
490
        client = get_auth_client()
491
        # the authenticated client fail to get a ticket for some reason
492
        response = client.get("/login", {'service': service, 'gateway': 'on'})
493
        # as gateway is set, he is redirected to the service url without any ticket
494 495 496 497 498
        self.assertEqual(response.status_code, 302)
        self.assertEqual(response["Location"], service)

        # second for an user not yet authenticated on a valid service
        client = Client()
499
        # the client fail to get a ticket since he is not yep authenticated
500
        response = client.get('/login', {'service': service, 'gateway': 'on'})
501
        # as gateway is set, he is redirected to the service url without any ticket
502 503 504
        self.assertEqual(response.status_code, 302)
        self.assertEqual(response["Location"], service)

505
    def test_renew(self):
506
        """test the authentication renewal request from a service"""
507
        # use the default test service
508
        service = "https://www.example.com"
509
        # get a client that is already authenticated
510
        client = get_auth_client()
511
        # ask for a ticket for the service but aks for authentication renewal
512
        response = client.get("/login", {'service': service, 'renew': 'on'})
513
        # we are ask to reauthenticate and tell the user why
514 515 516 517 518 519 520
        self.assertEqual(response.status_code, 200)
        self.assertTrue(
            (
                b"Authentication renewal required by "
                b"service example (https://www.example.com)"
            ) in response.content
        )
521
        # get the form default parameter
522
        params = copy_form(response.context["form"])
523
        # set valid username/password
524 525
        params["username"] = settings.CAS_TEST_USER
        params["password"] = settings.CAS_TEST_PASSWORD
526
        # the renew parameter from the form should be True
527
        self.assertEqual(params["renew"], True)
528
        # post the authentication request
529
        response = client.post("/login", params)
530 531 532 533 534 535 536 537
        # the request succed, a ticket is created and we are redirected to the service url
        self.assertEqual(response.status_code, 302)
        ticket_value = response['Location'].split('ticket=')[-1]
        ticket = models.ServiceTicket.objects.get(value=ticket_value)
        # the created ticket is marked has being gottent after a renew. Futher testing about
        # renewing authentication is done in the validate and serviceValidate views tests
        self.assertEqual(ticket.renew, True)

538
    @override_settings(CAS_SHOW_SERVICE_MESSAGES=False)
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
    def test_renew_message_disabled(self):
        """test the authentication renewal request from a service"""
        # use the default test service
        service = "https://www.example.com"
        # get a client that is already authenticated
        client = get_auth_client()
        # ask for a ticket for the service but aks for authentication renewal
        response = client.get("/login", {'service': service, 'renew': 'on'})
        # we are ask to reauthenticate and tell the user why
        self.assertEqual(response.status_code, 200)
        self.assertFalse(
            (
                b"Authentication renewal required by "
                b"service example (https://www.example.com)"
            ) in response.content
        )
        # get the form default parameter
        params = copy_form(response.context["form"])
        # set valid username/password
        params["username"] = settings.CAS_TEST_USER
        params["password"] = settings.CAS_TEST_PASSWORD
        # the renew parameter from the form should be True
        self.assertEqual(params["renew"], True)
        # post the authentication request
        response = client.post("/login", params)
564
        # the request succed, a ticket is created and we are redirected to the service url
565 566 567
        self.assertEqual(response.status_code, 302)
        ticket_value = response['Location'].split('ticket=')[-1]
        ticket = models.ServiceTicket.objects.get(value=ticket_value)
568 569
        # the created ticket is marked has being gottent after a renew. Futher testing about
        # renewing authentication is done in the validate and serviceValidate views tests
570 571
        self.assertEqual(ticket.renew, True)

572
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
573
    def test_ajax_login_required(self):
574 575 576 577 578 579 580 581 582 583 584 585 586
        """
            test ajax, login required.
            The ajax methods allow the log a user in using javascript.
            For doing so, every 302 redirection a replaced by a 200 returning a json with the
            url to  redirect to.
            By default, ajax login is disabled.
            If CAS_ENABLE_AJAX_AUTH is True, ajax login is enable and only page on the same domain
            as the CAS can do ajax request. To allow pages on other domains, you need to use CORS.
            You can use the django app corsheaders for that. Be carefull to only allow domains
            you completly trust as any javascript on these domaine will be able to authenticate
            as the user.
        """
        # get a bare client
587
        client = Client()
588 589
        # fetch the login page setting up the custom header HTTP_X_AJAX to tell we wish to de
        # ajax requests
590
        response = client.get("/login", HTTP_X_AJAX='on')
591
        # we get a json as response telling us the user need to be authenticated
592
        self.assertEqual(response.status_code, 200)
593 594 595
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "error")
        self.assertEqual(data["detail"], "login required")
596
        self.assertEqual(data["url"], "/login")
597

598
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
599 600
    def test_ajax_logged_user_deleted(self):
        """test ajax user logged deleted: login required"""
601
        # get a client that is already authenticated
602
        client = get_auth_client()
603
        # delete the user in the db
604 605 606 607 608
        user = models.User.objects.get(
            username=settings.CAS_TEST_USER,
            session_key=client.session.session_key
        )
        user.delete()
609
        # fetch the login page with ajax on
610
        response = client.get("/login", HTTP_X_AJAX='on')
611
        # we get a json telling us that the user need to authenticate
612
        self.assertEqual(response.status_code, 200)
613 614 615
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "error")
        self.assertEqual(data["detail"], "login required")
616
        self.assertEqual(data["url"], "/login")
617

618
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
619 620
    def test_ajax_logged(self):
        """test ajax user is successfully logged"""
621
        # get a client that is already authenticated
622
        client = get_auth_client()
623
        # fetch the login page with ajax on
624
        response = client.get("/login", HTTP_X_AJAX='on')
625
        # we get a json telling us that the user is well authenticated
626
        self.assertEqual(response.status_code, 200)
627 628 629 630
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "success")
        self.assertEqual(data["detail"], "logged")

631
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
632 633
    def test_ajax_get_ticket_success(self):
        """test ajax retrieve a ticket for an allowed service"""
634
        # using the default test service
635
        service = "https://www.example.com"
636
        # get a client that is already authenticated
637
        client = get_auth_client()
638
        # fetch the login page with ajax on
639
        response = client.get("/login", {'service': service}, HTTP_X_AJAX='on')
640 641 642
        # we get a json telling us that the ticket has being created
        # and we get the url to fetch to authenticate the user to the service
        # contening the ticket has GET parameter
643
        self.assertEqual(response.status_code, 200)
644 645 646 647 648
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "success")
        self.assertEqual(data["detail"], "auth")
        self.assertTrue(data["url"].startswith('%s?ticket=' % service))

649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
    def test_ajax_get_ticket_success_alt(self):
        """
            test ajax retrieve a ticket for an allowed service.
            Same as above but with CAS_ENABLE_AJAX_AUTH=False
        """
        # using the default test service
        service = "https://www.example.com"
        # get a client that is already authenticated
        client = get_auth_client()
        # fetch the login page with ajax on
        response = client.get("/login", {'service': service}, HTTP_X_AJAX='on')
        # as CAS_ENABLE_AJAX_AUTH is False the ajax request is ignored and word normally:
        # 302 redirect to the service url with ticket as GET parameter. javascript
        # cannot retieve the ticket info and try follow the redirect to an other domain and fail
        # silently
        self.assertEqual(response.status_code, 302)

    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
667 668
    def test_ajax_get_ticket_fail(self):
        """test ajax retrieve a ticket for a denied service"""
669
        # using a denied service url
670
        service = "https://www.example.org"
671
        # get a client that is already authenticated
672
        client = get_auth_client()
673
        # fetch the login page with ajax on
674
        response = client.get("/login", {'service': service}, HTTP_X_AJAX='on')
675
        # we get a json telling us that the service is not allowed
676
        self.assertEqual(response.status_code, 200)
677 678 679 680 681 682
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "error")
        self.assertEqual(data["detail"], "auth")
        self.assertEqual(data["messages"][0]["level"], "error")
        self.assertEqual(
            data["messages"][0]["message"],
683
            "Service https://www.example.org not allowed."
684 685
        )

686
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
687 688
    def test_ajax_get_ticket_warn(self):
        """test get a ticket but user asked to be warned"""
689
        # using the default test service
690
        service = "https://www.example.com"
691
        # get a client that is already authenticated wth warn on
692
        client = get_auth_client(warn="on")
693
        # fetch the login page with ajax on
694
        response = client.get("/login", {'service': service}, HTTP_X_AJAX='on')
695 696
        # we get a json telling us that we cannot get a ticket transparently and that the
        # user has asked to be warned
697
        self.assertEqual(response.status_code, 200)
698 699 700 701
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "error")
        self.assertEqual(data["detail"], "confirmation needed")

702

703
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
704
class LogoutTestCase(TestCase):
705
    """test fot the logout view"""
Valentin Samir's avatar
Valentin Samir committed
706
    def setUp(self):
Valentin Samir's avatar
Valentin Samir committed
707
        """Prepare the test context"""
708 709
        # for testing SingleLogOut we need to use a service on localhost were we lanch
        # a simple one request http server
Valentin Samir's avatar
Valentin Samir committed
710 711 712
        self.service = 'http://127.0.0.1:45678'
        self.service_pattern = models.ServicePattern.objects.create(
            name="localhost",
713
            pattern=r"^https?://127\.0\.0\.1(:[0-9]+)?(/.*)?$",
Valentin Samir's avatar
Valentin Samir committed
714 715
            single_log_out=True
        )
716
        # return all user attributes
Valentin Samir's avatar
Valentin Samir committed
717 718
        models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)

719 720
    def test_logout(self):
        """logout is idempotent"""
721
        # get a bare client
722 723
        client = Client()

724
        # call logout
725 726
        client.get("/logout")

727
        # we are still not logged
728 729 730
        self.assertFalse(client.session.get("username"))
        self.assertFalse(client.session.get("authenticated"))

731
    def test_logout_view(self):
732 733
        """test simple logout, logout only an user from one and only one sessions"""
        # get two authenticated client with the same test user (but two different sessions)
734
        client = get_auth_client()
735
        client2 = get_auth_client()
736

737
        # fetch login, the first client is well authenticated
738 739
        response = client.get("/login")
        self.assertEqual(response.status_code, 200)
Valentin Samir's avatar
Valentin Samir committed
740 741
        self.assertTrue(
            (
Valentin Samir's avatar
Valentin Samir committed
742 743
                b"You have successfully logged into "
                b"the Central Authentication Service"
Valentin Samir's avatar
Valentin Samir committed
744 745
            ) in response.content
        )
746
        # and session variable are well
747 748
        self.assertTrue(client.session["username"] == settings.CAS_TEST_USER)
        self.assertTrue(client.session["authenticated"] is True)
749

750
        # call logout with the first client
751
        response = client.get("/logout")
752
        # the client is logged out
753
        self.assertEqual(response.status_code, 200)
Valentin Samir's avatar
Valentin Samir committed
754 755
        self.assertTrue(
            (
Valentin Samir's avatar
Valentin Samir committed
756 757
                b"You have successfully logged out from "
                b"the Central Authentication Service"
Valentin Samir's avatar
Valentin Samir committed
758 759
            ) in response.content
        )
760
        # and session variable a well cleaned
761 762 763 764 765 766
        self.assertFalse(client.session.get("username"))
        self.assertFalse(client.session.get("authenticated"))
        # client2 is still logged
        self.assertTrue(client2.session["username"] == settings.CAS_TEST_USER)
        self.assertTrue(client2.session["authenticated"] is True)

767
        response = client.get("/login")
768
        # fetch login, the second client is well authenticated
769
        self.assertEqual(response.status_code, 200)
Valentin Samir's avatar
Valentin Samir committed
770 771
        self.assertFalse(
            (
Valentin Samir's avatar
Valentin Samir committed
772 773
                b"You have successfully logged into "
                b"the Central Authentication Service"
Valentin Samir's avatar
Valentin Samir committed
774 775
            ) in response.content
        )
776

777 778
    def test_logout_from_all_session(self):
        """test logout from all my session"""
779
        # get two authenticated client with the same test user (but two different sessions)
780
        client = get_auth_client()
781
        client2 = get_auth_client()
782

783
        # call logout with the first client and ask to be logged out from all of this user sessions
784 785 786 787 788 789 790 791 792 793
        client.get("/logout?all=1")

        # both client are logged out
        self.assertFalse(client.session.get("username"))
        self.assertFalse(client.session.get("authenticated"))
        self.assertFalse(client2.session.get("username"))
        self.assertFalse(client2.session.get("authenticated"))

    def assert_redirect_to_service(self, client, response):
        """assert logout redirect to parameter"""
794
        # assert a redirection with a service
795 796 797 798 799 800
        self.assertEqual(response.status_code, 302)
        self.assertTrue(response.has_header("Location"))
        self.assertEqual(response["Location"], "https://www.example.com")

        response = client.get("/login")
        self.assertEqual(response.status_code, 200)
801
        # assert we are not longer logged in
Valentin Samir's avatar
Valentin Samir committed
802 803
        self.assertFalse(
            (
Valentin Samir's avatar
Valentin Samir committed
804 805
                b"You have successfully logged into "
                b"the Central Authentication Service"
Valentin Samir's avatar
Valentin Samir committed
806 807
            ) in response.content
        )
808

809 810
    def test_logout_view_url(self):
        """test logout redirect to url parameter"""
811
        # get a client that is authenticated
812 813
        client = get_auth_client()

814
        # logout with an url paramer
815
        response = client.get('/logout?url=https://www.example.com')
816
        # we are redirected to the addresse of the url parameter
817 818
        self.assert_redirect_to_service(client, response)

819
    def test_logout_view_service(self):
820
        """test logout redirect to service parameter"""
821
        # get a client that is authenticated
822 823
        client = get_auth_client()

824
        # logout with a service parameter
825
        response = client.get('/logout?service=https://www.example.com')
826
        # we are redirected to the addresse of the service parameter
827 828
        self.assert_redirect_to_service(client, response)

Valentin Samir's avatar
Valentin Samir committed
829 830
    def test_logout_slo(self):
        """test logout from a service with SLO support"""
Valentin Samir's avatar
Valentin Samir committed
831 832 833
        parameters = []

        # test normal SLO
834
        # setup a simple one request http server
835
        (httpd, host, port) = HttpParamsHandler.run()[0:3]
836
        # build a service url depending on which port the http server has binded
Valentin Samir's avatar
Valentin Samir committed
837
        service = "http://%s:%s" % (host, port)
838
        # get a ticket requested by client and being validated by the service
Valentin Samir's avatar
Valentin Samir committed
839
        (client, ticket) = get_validated_ticket(service)[:2]
840
        # the client logout triggering the send of the SLO requests
Valentin Samir's avatar
Valentin Samir committed
841
        client.get('/logout')
842
        # we store the POST parameters send for this ticket for furthur analisys
Valentin Samir's avatar
Valentin Samir committed
843
        parameters.append((httpd.PARAMS, ticket))
Valentin Samir's avatar
Valentin Samir committed
844

Valentin Samir's avatar
Valentin Samir committed
845
        # text SLO with a single_log_out_callback
846
        # setup a simple one request http server
847
        (httpd, host, port) = HttpParamsHandler.run()[0:3]
848 849 850
        # set the default test service pattern to use the http server port for SLO requests.
        # in fact, this single_log_out_callback parametter is usefull to implement SLO
        # for non http service like imap or ftp
Valentin Samir's avatar
Valentin Samir committed
851 852
        self.service_pattern.single_log_out_callback = "http://%s:%s" % (host, port)
        self.service_pattern.save()
853
        # get a ticket requested by client and being validated by the service
Valentin Samir's avatar
Valentin Samir committed
854
        (client, ticket) = get_validated_ticket(self.service)[:2]
855
        # the client logout triggering the send of the SLO requests
Valentin Samir's avatar
Valentin Samir committed
856
        client.get('/logout')
857
        # we store the POST parameters send for this ticket for furthur analisys
Valentin Samir's avatar
Valentin Samir committed
858
        parameters.append((httpd.PARAMS, ticket))
Valentin Samir's avatar
Valentin Samir committed
859

860
        # for earch POST parameters and corresponding ticket
Valentin Samir's avatar
Valentin Samir committed
861
        for (params, ticket) in parameters:
862
            # there is a POST parameter 'logoutRequest'
Valentin Samir's avatar
Valentin Samir committed
863
            self.assertTrue(b'logoutRequest' in params and params[b'logoutRequest'])
Valentin Samir's avatar
Valentin Samir committed
864

865
            # it is a valid xml
Valentin Samir's avatar
Valentin Samir committed
866
            root = etree.fromstring(params[b'logoutRequest'][0])
867
            # contening a <samlp:LogoutRequest> tag
Valentin Samir's avatar
Valentin Samir committed
868 869 870 871 872 873
            self.assertTrue(
                root.xpath(
                    "//samlp:LogoutRequest",
                    namespaces={"samlp": "urn:oasis:names:tc:SAML:2.0:protocol"}
                )
            )
874
            # with a tag <samlp:SessionIndex> enclosing the value of the ticket
Valentin Samir's avatar
Valentin Samir committed
875 876
            session_index = root.xpath(
                "//samlp:SessionIndex",
Valentin Samir's avatar
Valentin Samir committed
877 878
                namespaces={"samlp": "urn:oasis:names:tc:SAML:2.0:protocol"}
            )
Valentin Samir's avatar
Valentin Samir committed
879 880
            self.assertEqual(len(session_index), 1)
            self.assertEqual(session_index[0].text, ticket.value)
Valentin Samir's avatar
Valentin Samir committed
881 882 883

        # SLO error are displayed on logout page
        (client, ticket) = get_validated_ticket(self.service)[:2]
884 885
        # the client logout triggering the send of the SLO requests but
        # not http server are listening
Valentin Samir's avatar
Valentin Samir committed
886 887 888
        response = client.get('/logout')
        self.assertTrue(b"Error during service logout" in response.content)

889
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
890
    def test_ajax_logout(self):
891
        """
892
            test ajax logout. These methods are here, but I do not really see an use case for
893 894 895
            javascript logout
        """
        # get a client that is authenticated
896 897
        client = get_auth_client()

898
        # fetch the logout page with ajax on
899
        response = client.get('/logout', HTTP_X_AJAX='on')
900
        # we get a json telling us the user is well logged out
901 902 903 904 905 906
        self.assertEqual(response.status_code, 200)
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "success")
        self.assertEqual(data["detail"], "logout")
        self.assertEqual(data['session_nb'], 1)

907
    @override_settings(CAS_ENABLE_AJAX_AUTH=True)
908 909
    def test_ajax_logout_all_session(self):
        """test ajax logout from a random number a sessions"""
910
        # fire a random int in [2, 10[
911
        nb_client = random.randint(2, 10)
912
        # get this much of logged clients all for the test user
913
        clients = [get_auth_client() for i in range(nb_client)]
914
        # fetch the logout page with ajax on, requesting to logout from all sessions
915
        response = clients[0].get('/logout?all=1', HTTP_X_AJAX='on')
916 917
        # we get a json telling us the user is well logged out and the number of session
        # the user has being logged out
918 919 920 921 922 923
        self.assertEqual(response.status_code, 200)
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "success")
        self.assertEqual(data["detail"], "logout")
        self.assertEqual(data['session_nb'], nb_client)

924
    @override_settings(CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT=True)
925 926
    def test_redirect_after_logout(self):
        """Test redirect to login after logout parameter"""
927
        # get a client that is authenticated
928 929
        client = get_auth_client()

930
        # fetch the logout page
931
        response = client.get('/logout')
932
        # as CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT is True, we are redirected to the login page
933
        self.assertEqual(response.status_code, 302)
934
        if django.VERSION < (1, 9):  # pragma: no cover coverage is computed with dango 1.9
935 936 937 938 939
            self.assertEqual(response["Location"], "http://testserver/login")
        else:
            self.assertEqual(response["Location"], "/login")
        self.assertFalse(client.session.get("username"))
        self.assertFalse(client.session.get("authenticated"))
940

941
    @override_settings(CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT=True)
942 943
    def test_redirect_after_logout_to_service(self):
        """test prevalence of redirect url/service parameter over redirect to login after logout"""
944
        # get a client that is authenticated
945 946
        client = get_auth_client()

947
        # fetch the logout page with an url parameter
948
        response = client.get('/logout?url=https://www.example.com')
949
        # we are redirected to the url parameter and not to the login page
950 951
        self.assert_redirect_to_service(client, response)

952
        # fetch the logout page with an service parameter
953
        response = client.get('/logout?service=https://www.example.com')
954
        # we are redirected to the service parameter and not to the login page
955 956
        self.assert_redirect_to_service(client, response)

957
    @override_settings(CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT=True, CAS_ENABLE_AJAX_AUTH=True)
958 959
    def test_ajax_redirect_after_logout(self):
        """Test ajax redirect to login after logout parameter"""
960
        # get a client that is authenticated
961 962
        client = get_auth_client()

963
        # fetch the logout page with ajax on
964
        response = client.get('/logout', HTTP_X_AJAX='on')
965 966
        # we get a json telling us the user is well logged out. And url key is added to aks for
        # redirection to the login page
967
        self.assertEqual(response.status_code, 200)
968 969 970 971 972 973
        data = json.loads(response.content.decode("utf8"))
        self.assertEqual(data["status"], "success")
        self.assertEqual(data["detail"], "logout")
        self.assertEqual(data['session_nb'], 1)
        self.assertEqual(data['url'], '/login')

974

975
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
976
class AuthTestCase(TestCase):
Valentin Samir's avatar
Valentin Samir committed
977 978 979 980
    """
        Test for the auth view, used for external services
        to validate (user, pass, service) tuples.
    """
981
    def setUp(self):
Valentin Samir's avatar
Valentin Samir committed
982
        """preparing test context"""
983
        # setting up a default test service url and pattern
984 985 986
        self.service = 'https://www.example.com'
        models.ServicePattern.objects.create(
            name="example",
987
            pattern=r"^https://www\.example\.com(/.*)?$"
988 989
        )

990
    @override_settings(CAS_AUTH_SHARED_SECRET='test')
991
    def test_auth_view_goodpass(self):
Valentin Samir's avatar
Valentin Samir committed
992
        """successful request are awsered by yes"""
993 994 995 996 997
        # get a bare client
        client = Client()
        # post the the auth view a valid (username, password, service) and the shared secret
        # to test the user again the service, a user is created in the database for the
        # current session and is then deleted as the user is not authenticated
998 999 1000 1001 1002 1003 1004 1005 1006
        response = client.post(
            '/auth',
            {
                'username': settings.CAS_TEST_USER,
                'password': settings.CAS_TEST_PASSWORD,
                'service': self.service,
                'secret': 'test'
            }
        )
1007
        # as (username, password, service) and the hared secret are valid, we get yes as a response
1008 1009 1010 1011 1012 1013
        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.content, b'yes\n')

    @override_settings(CAS_AUTH_SHARED_SECRET='test')
    def test_auth_view_goodpass_logged(self):
        """successful request are awsered by yes, using a logged sessions"""
1014 1015 1016 1017 1018 1019
        # same as above
        client = get_auth_client()
        # to test the user again the service, a user is fetch in the database for the
        # current session and is NOT deleted as the user is currently logged.
        # Deleting the user from the database would cause the user to be logged out as
        # showed in the login tests
Valentin Samir's avatar
Valentin Samir committed
1020 1021 1022 1023 1024 1025 1026 1027 1028
        response = client.post(
            '/auth',
            {
                'username': settings.CAS_TEST_USER,
                'password': settings.CAS_TEST_PASSWORD,
                'service': self.service,
                'secret': 'test'
            }
        )
1029
        # as (username, password, service) and the hared secret are valid, we get yes as a response
1030
        self.assertEqual(response.status_code, 200)
Valentin Samir's avatar
Valentin Samir committed
1031
        self.assertEqual(response.content, b'yes\n')
1032

1033
    @override_settings(CAS_AUTH_SHARED_SECRET='test')
1034
    def test_auth_view_badpass(self):
Valentin Samir's avatar
Valentin Samir committed
1035
        """ bag user password => no"""
1036
        client = Client()
Valentin Samir's avatar
Valentin Samir committed
1037 1038 1039 1040 1041 1042 1043 1044 1045
        response = client.post(
            '/auth',
            {
                'username': settings.CAS_TEST_USER,
                'password': 'badpass',
                'service': self.service,
                'secret': 'test'
            }
        )
1046
        self.assertEqual(response.status_code, 200)
Valentin Samir's avatar
Valentin Samir committed
1047
        self.assertEqual(response.content, b'no\n')
1048

1049
    @override_settings(CAS_AUTH_SHARED_SECRET='test')
1050
    def test_auth_view_badservice(self):
Valentin Samir's avatar
Valentin Samir committed
1051
        """bad service => no"""
1052
        client = Client()
Valentin Samir's avatar
Valentin Samir committed
1053 1054 1055 1056 1057 1058 1059 1060 1061
        response = client.post(
            '/auth',
            {
                'username': settings.CAS_TEST_USER,
                'password': settings.CAS_TEST_PASSWORD,
                'service': 'https://www.example.org',