test_federate.py 16 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016 Valentin Samir
"""tests for the CAS federate mode"""
from cas_server import default_settings
from cas_server.default_settings import settings

import django
from django.test import TestCase, Client
from django.test.utils import override_settings

from six.moves import reload_module

22 23
from cas_server import utils, models
from cas_server.tests.mixin import BaseServicePattern, CanLogin, FederatedIendityProviderModel
24 25 26
from cas_server.tests import utils as tests_utils

PROVIDERS = {
27 28 29 30
    "example.com": ("http://127.0.0.1:8080", '1', "Example dot com"),
    "example.org": ("http://127.0.0.1:8081", '2', "Example dot org"),
    "example.net": ("http://127.0.0.1:8082", '3', "Example dot net"),
    "example.test": ("http://127.0.0.1:8083", 'CAS_2_SAML_1_0', 'Example fot test'),
31 32 33 34 35 36 37 38 39
}


@override_settings(
    CAS_FEDERATE=True,
    CAS_AUTH_CLASS="cas_server.auth.CASFederateAuth",
    # test with a non ascii username
    CAS_TEST_USER=u"dédé"
)
40 41 42
class FederateAuthLoginLogoutTestCase(
    TestCase, BaseServicePattern, CanLogin, FederatedIendityProviderModel
):
43 44 45 46
    """tests for the views login logout and federate then the federated mode is enabled"""
    def setUp(self):
        """Prepare the test context"""
        self.setup_service_patterns()
47
        self.setup_federated_identity_provider(PROVIDERS)
48 49 50 51 52 53 54 55 56 57 58 59

    def test_default_settings(self):
        """default settings should populated some default variable then CAS_FEDERATE is True"""
        del settings.CAS_AUTH_CLASS
        reload_module(default_settings)
        self.assertEqual(settings.CAS_AUTH_CLASS, "cas_server.auth.CASFederateAuth")

    def test_login_get_provider(self):
        """some assertion about the login page in federated mode"""
        client = Client()
        response = client.get("/login")
        self.assertEqual(response.status_code, 200)
60
        for provider in models.FederatedIendityProvider.objects.all():
61
            self.assertTrue('<option value="%s">%s</option>' % (
62 63
                provider.suffix,
                provider.verbose_name
64 65 66 67 68 69 70
            ) in response.content.decode("utf-8"))
        self.assertEqual(response.context['post_url'], '/federate')

    def test_login_post_provider(self, remember=False):
        """test a successful login wrokflow"""
        tickets = []
        # choose the example.com provider
71
        for (suffix, cas_port) in [
72 73 74
            ("example.com", 8080), ("example.org", 8081),
            ("example.net", 8082), ("example.test", 8083)
        ]:
75
            provider = models.FederatedIendityProvider.objects.get(suffix=suffix)
76 77 78 79 80 81 82 83
            # get a bare client
            client = Client()
            # fetch the login page
            response = client.get("/login")
            # in federated mode, we shoudl POST do /federate on the login page
            self.assertEqual(response.context['post_url'], '/federate')
            # get current form parameter
            params = tests_utils.copy_form(response.context["form"])
84
            params['provider'] = provider.suffix
85 86 87 88 89 90 91 92 93
            if remember:
                params['remember'] = 'on'
            # post the choosed provider
            response = client.post('/federate', params)
            # we are redirected to the provider CAS client url
            self.assertEqual(response.status_code, 302)
            if remember:
                self.assertEqual(response["Location"], '%s/federate/%s?remember=on' % (
                    'http://testserver' if django.VERSION < (1, 9) else "",
94
                    provider.suffix
95 96 97 98
                ))
            else:
                self.assertEqual(response["Location"], '%s/federate/%s' % (
                    'http://testserver' if django.VERSION < (1, 9) else "",
99
                    provider.suffix
100 101
                ))
            # let's follow the redirect
102
            response = client.get('/federate/%s' % provider.suffix)
103 104 105 106 107
            # we are redirected to the provider CAS for authentication
            self.assertEqual(response.status_code, 302)
            self.assertEqual(
                response["Location"],
                "%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
108 109
                    provider.server_url,
                    provider.suffix
110 111 112 113 114 115 116
                )
            )
            # let's generate a ticket
            ticket = utils.gen_st()
            # we lauch a dummy CAS server that only validate once for the service
            # http://testserver/federate/example.com with `ticket`
            tests_utils.DummyCAS.run(
117
                ("http://testserver/federate/%s" % provider.suffix).encode("ascii"),
118 119 120 121 122 123 124
                ticket.encode("ascii"),
                settings.CAS_TEST_USER.encode("utf8"),
                [],
                cas_port
            )
            # we normally provide a good ticket and should be redirected to /login as the ticket
            # get successfully validated again the dummy CAS
125
            response = client.get('/federate/%s' % provider.suffix, {'ticket': ticket})
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
            self.assertEqual(response.status_code, 302)
            self.assertEqual(response["Location"], "%s/login" % (
                'http://testserver' if django.VERSION < (1, 9) else ""
            ))
            # follow the redirect
            response = client.get("/login")
            # we should get a page with a from with all widget hidden that auto POST to /login using
            # javascript. If javascript is disabled, a "connect" button is showed
            self.assertTrue(response.context['auto_submit'])
            self.assertEqual(response.context['post_url'], '/login')
            params = tests_utils.copy_form(response.context["form"])
            # POST ge prefiled from parameters
            response = client.post("/login", params)
            # the user should now being authenticated using username test@`provider`
            self.assert_logged(
141
                client, response, username=provider.build_username(settings.CAS_TEST_USER)
142 143 144 145 146 147 148 149 150 151 152 153
            )
            tickets.append((provider, ticket, client))

            # try to get a ticket
            response = client.get("/login", {'service': self.service})
            self.assertEqual(response.status_code, 302)
            self.assertTrue(response["Location"].startswith("%s?ticket=" % self.service))
        return tickets

    def test_login_twice(self):
        """Test that user id db is used for the second login (cf coverage)"""
        self.test_login_post_provider()
154 155 156 157 158 159 160 161
        tickets = self.test_login_post_provider()
        # trying to authenticated while being already authenticated should redirect to /login
        for (provider, _, client) in tickets:
            response = client.get("/federate/%s" % provider.suffix)
            self.assertEqual(response.status_code, 302)
            self.assertEqual(response["Location"], "%s/login" % (
                'http://testserver' if django.VERSION < (1, 9) else ""
            ))
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202

    @override_settings(CAS_FEDERATE=False)
    def test_auth_federate_false(self):
        """federated view should redirect to /login then CAS_FEDERATE is False"""
        provider = "example.com"
        client = Client()
        response = client.get("/federate/%s" % provider)
        self.assertEqual(response.status_code, 302)
        self.assertEqual(response["Location"], "%s/login" % (
            'http://testserver' if django.VERSION < (1, 9) else ""
        ))
        response = client.post("%s/federate/%s" % (
            'http://testserver' if django.VERSION < (1, 9) else "",
            provider
        ))
        self.assertEqual(response.status_code, 302)
        self.assertEqual(response["Location"], "%s/login" % (
            'http://testserver' if django.VERSION < (1, 9) else ""
        ))

    def test_auth_federate_errors(self):
        """
            The federated view should redirect to /login if the provider is unknown or not provided,
            try to fetch a new ticket if the provided ticket validation fail
            (network error or bad ticket)
        """
        good_provider = "example.com"
        bad_provider = "exemple.fr"
        client = Client()
        response = client.get("/federate/%s" % bad_provider)
        self.assertEqual(response.status_code, 302)
        self.assertEqual(response["Location"], "%s/login" % (
            'http://testserver' if django.VERSION < (1, 9) else ""
        ))

        # test CAS not avaible
        response = client.get("/federate/%s" % good_provider, {'ticket': utils.gen_st()})
        self.assertEqual(response.status_code, 302)
        self.assertEqual(
            response["Location"],
            "%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
203
                models.FederatedIendityProvider.objects.get(suffix=good_provider).server_url,
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
                good_provider
            )
        )

        # test CAS avaible but bad ticket
        tests_utils.DummyCAS.run(
            ("http://testserver/federate/%s" % good_provider).encode("ascii"),
            utils.gen_st().encode("ascii"),
            settings.CAS_TEST_USER.encode("utf-8"),
            [],
            8080
        )
        response = client.get("/federate/%s" % good_provider, {'ticket': utils.gen_st()})
        self.assertEqual(response.status_code, 302)
        self.assertEqual(
            response["Location"],
            "%s/login?service=http%%3A%%2F%%2Ftestserver%%2Ffederate%%2F%s" % (
221
                models.FederatedIendityProvider.objects.get(suffix=good_provider).server_url,
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
                good_provider
            )
        )

        response = client.post("/federate")
        self.assertEqual(response.status_code, 302)
        self.assertEqual(response["Location"], "%s/login" % (
            'http://testserver' if django.VERSION < (1, 9) else ""
        ))

    def test_auth_federate_slo(self):
        """test that SLO receive from backend CAS log out the users"""
        # get tickets and connected clients
        tickets = self.test_login_post_provider()
        for (provider, ticket, client) in tickets:
            # SLO for an unkown ticket should do nothing
            response = client.post(
239
                "/federate/%s" % provider.suffix,
240 241 242 243 244 245
                {'logoutRequest': tests_utils.logout_request(utils.gen_st())}
            )
            self.assertEqual(response.status_code, 200)
            self.assertEqual(response.content, b"ok")
            # Bad SLO format should do nothing
            response = client.post(
246
                "/federate/%s" % provider.suffix,
247 248 249 250 251 252
                {'logoutRequest': ""}
            )
            self.assertEqual(response.status_code, 200)
            self.assertEqual(response.content, b"ok")
            # Bad SLO format should do nothing
            response = client.post(
253
                "/federate/%s" % provider.suffix,
254 255 256 257 258 259
                {'logoutRequest': "<root></root>"}
            )
            self.assertEqual(response.status_code, 200)
            self.assertEqual(response.content, b"ok")
            response = client.get("/login")
            self.assert_logged(
260
                client, response, username=provider.build_username(settings.CAS_TEST_USER)
261 262 263 264 265
            )

            # SLO for a previously logged ticket should log out the user if CAS version is
            # 3 or 'CAS_2_SAML_1_0'
            response = client.post(
266
                "/federate/%s" % provider.suffix,
267 268 269 270 271 272
                {'logoutRequest': tests_utils.logout_request(ticket)}
            )
            self.assertEqual(response.status_code, 200)
            self.assertEqual(response.content, b"ok")

            response = client.get("/login")
273
            if provider.cas_protocol_version in {'3', 'CAS_2_SAML_1_0'}:  # support SLO
274 275 276
                self.assert_login_failed(client, response)
            else:
                self.assert_logged(
277
                    client, response, username=provider.build_username(settings.CAS_TEST_USER)
278 279 280 281 282 283 284
                )

    def test_federate_logout(self):
        """
            test the logout function: the user should be log out
            and redirected to his CAS logout page
        """
285
        # get tickets and connected clients, then follow normal logout
286 287 288 289 290 291
        tickets = self.test_login_post_provider()
        for (provider, _, client) in tickets:
            response = client.get("/logout")
            self.assertEqual(response.status_code, 302)
            self.assertEqual(
                response["Location"],
292
                "%s/logout" % provider.server_url,
293 294 295 296
            )
            response = client.get("/login")
            self.assert_login_failed(client, response)

297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
            # test if the user is already logged out
            response = client.get("/logout")
            # no redirection
            self.assertEqual(response.status_code, 200)
            self.assertTrue(
                (
                    b"You were already logged out from the Central Authentication Service."
                ) in response.content
            )

        tickets = self.test_login_post_provider()
        if django.VERSION >= (1, 8):
            # assume the username session variable has been tempered (should not happend)
            for (provider, _, client) in tickets:
                session = client.session
                session["username"] = settings.CAS_TEST_USER
                session.save()
                response = client.get("/logout")
                self.assertEqual(response.status_code, 200)
                response = client.get("/login")
                self.assert_login_failed(client, response)

319 320 321 322 323 324 325 326 327 328 329 330
    def test_remember_provider(self):
        """
            If the user check remember, next login should not offer the chose of the backend CAS
            and use the one store in the cookie
        """
        tickets = self.test_login_post_provider(remember=True)
        for (provider, _, client) in tickets:
            client.get("/logout")
            response = client.get("/login")
            self.assertEqual(response.status_code, 302)
            self.assertEqual(response["Location"], "%s/federate/%s" % (
                'http://testserver' if django.VERSION < (1, 9) else "",
331
                provider.suffix
332 333 334 335 336 337 338 339 340 341 342
            ))

    def test_login_bad_ticket(self):
        """
            Try login with a bad ticket:
            login should fail and the main login page should be displayed to the user
        """
        provider = "example.com"
        # get a bare client
        client = Client()
        session = client.session
343 344 345 346
        session["federate_username"] = models.FederatedIendityProvider.build_username_from_suffix(
            settings.CAS_TEST_USER,
            provider
        )
347
        session["federate_ticket"] = utils.gen_st()
348
        if django.VERSION >= (1, 8):
349 350 351 352 353 354 355 356 357 358
            session.save()
            response = client.get("/login")
            # we should get a page with a from with all widget hidden that auto POST to /login using
            # javascript. If javascript is disabled, a "connect" button is showed
            self.assertTrue(response.context['auto_submit'])
            self.assertEqual(response.context['post_url'], '/login')
            params = tests_utils.copy_form(response.context["form"])
            # POST, as (username, ticket) are not valid, we should get the federate login page
            response = client.post("/login", params)
            self.assertEqual(response.status_code, 200)
359 360 361 362 363 364 365 366
            for provider in models.FederatedIendityProvider.objects.all():
                self.assertIn(
                    '<option value="%s">%s</option>' % (
                        provider.suffix,
                        provider.verbose_name
                    ),
                    response.content.decode("utf-8")
                )
367
            self.assertEqual(response.context['post_url'], '/federate')