Fork me 🍴

Willian Antunes

Validating JWT: Authentication using Django Rest Framework

23 minute read

django, drf, auth0, jwt

Table of contents
  1. Where should JWT validation be implemented in DRF?
    1. Do not reinvent the wheel
  2. What we need to do
  3. Starting with tests
    1. Raise error if no authorization header is available
    2. Raise error if the authorization header value is invalid
    3. Raise error if the bearer has an invalid JWT in terms of its format
    4. Raise error if the provided token has no key ID
    5. Raise error if the provided token has an invalid signature
    6. Should return token user
    7. Things you should handle, but we didn't cover
  4. Seeing the authentication class in action
  5. Next steps and conclusion

In my last article, I described how Auth0 Deploy CLI works with a practical example. Unfortunately, that example does not have a resource server. An essential factor appears when you need one: how to properly validate a JWT to accept an incoming request on your backend? Auth0 explains what you need to do, but how to achieve it using Django Rest Framework?

Where should JWT validation be implemented in DRF?

It doesn't matter which framework you are using; it's crucial to understand its API to configure your project without mistakes. DRF has an excellent API guide for this sole purpose:

DRF has many topics about its API. In addition, its website has a dedicated menu about it.

So, where should JWT validation be implemented? A good hint is that we want to authenticate the JWT to guarantee it's a valid one. Another interesting tip is looking at other projects, and Auth0 has many samples. Our situation is quite evident because of this post's purpose: we will talk about Authentication. But know this: every time you're feeling you're reinventing the wheel, maybe you're doing it the wrong way. If there is a framework, then someone has already solved your issue.

Do not reinvent the wheel

The title describes what I do when creating a project from scratch. My focus should be on business code, not on infrastructure, let's say. This is even more true when using a mature framework such as Django. So, before coding anything, I try to find an open-source project to handle my problem. In this case, I tried to find one to handle the JWT validation. As I'm using Auth0 as the identity provider, only validation is required, nothing more. Searching, I encountered DRF Simple JWT. It's fantastic, but unfortunately, it has too many features. Exploring the project, I discovered an experimental feature called JWTTokenUserAuthentication backend. It is an Authentication mechanism. Let's use it as an example to create ours 😁!

What we need to do

The custom authentication mechanism must be able to:

  • Retrieve and store the JSON Web Key Set (JWKS) as it contains the public keys used to verify any JWT issued by the authorization server.
  • Consult the authorization header and then analyze the value of the bearer token.
  • If the token is valid, proceed with the request returning its details.
  • Raise a 401 error if anything different occurs.

According to the documentation:

To implement a custom authentication scheme, subclass BaseAuthentication and override the .authenticate(self, request) method. The method should return a two-tuple of (user, auth) if authentication succeeds, or None otherwise.

In some circumstances instead of returning None, you may want to raise an AuthenticationFailed exception from the .authenticate() method.

So this is how we are going to start:

from rest_framework import authentication


class JWTAccessTokenAuthentication(authentication.BaseAuthentication):
    def __init__(self, *args, **kwargs):
        pass

    def authenticate(self, request):
        pass

Starting with tests

DRF has a testing guide, so let's use it to create our first test scenario.

Raise error if no authorization header is available

The method authenticate receives a request. We can create a fake one using APIRequestFactory. So this is our first test:

import pytest

from rest_framework.exceptions import AuthenticationFailed
from rest_framework.test import APIRequestFactory

from authentication_django_rest_framework.apps.core.api.authentication.authentications import JWTAccessTokenAuthentication


class TestAccessTokenValidation:
    def test_should_raise_error_if_no_authorization_header_is_available(self, jwt_access_token_authentication_scenario):
        # Arrange
        factory = APIRequestFactory()
        request = factory.get("/your-endpoint/v1/salt")
        backend = JWTAccessTokenAuthentication()
        # Act
        with pytest.raises(AuthenticationFailed) as authentication_failed_exception:
            backend.authenticate(request)
        # Assert
        assert authentication_failed_exception.value.status_code == 401
        assert authentication_failed_exception.value.detail == "Authorization header is not present"

To make this test pass:

class JWTAccessTokenAuthentication(authentication.BaseAuthentication):
    def __init__(self, *args, **kwargs):
        pass

    def authenticate(self, request: HttpRequest):
        # Extract header
        header_authorization_value = request.headers.get("authorization")
        if not header_authorization_value:
            raise exceptions.AuthenticationFailed("Authorization header is not present")
        raise NotImplementedError

Raise error if the authorization header value is invalid

About this test:

def test_should_raise_error_when_authorization_header_value_is_invalid(self):
    # Arrange
    headers = {
        "HTTP_AUTHORIZATION": "Salt addicted",
    }
    factory = APIRequestFactory()
    request = factory.get("/your-endpoint/v1/salt", **headers)
    backend = JWTAccessTokenAuthentication()
    # Act
    with pytest.raises(AuthenticationFailed) as authentication_failed_exception:
        backend.authenticate(request)
    # Assert
    assert authentication_failed_exception.value.status_code == 401
    assert (
        authentication_failed_exception.value.detail
        == "Authorization header must start with Bearer followed by its token"
    )

A regular expression is an easy way to validate the value of the authorization header. Using bearer token, a regex such as ^[Bb]earer (.*)$ is enough. This is the concrete code to make the test pass:

def authenticate(self, request: HttpRequest):
    # Extract header
    header_authorization_value = request.headers.get("authorization")
    if not header_authorization_value:
        raise exceptions.AuthenticationFailed("Authorization header is not present")
    # Extract raw JWT
    match = self.regex_bearer.match(header_authorization_value)
    if not match:
        raise exceptions.AuthenticationFailed("Authorization header must start with Bearer followed by its token")
    raw_jwt = match.groups()[-1]

    raise NotImplementedError

Why did I use HTTP_AUTHORIZATION instead of Authorization? I recommend this Django comment about HTTP_ prefix.

Raise error if the bearer has an invalid JWT in terms of its format

This is the part where we need to extract the "kid" (key ID), which denotes which key was used to secure the JWT, and we can do this with the help of a library. We'll use PyJWT. So, this is the test:

def test_should_raise_error_if_bearer_has_invalid_jwt(self):
    # Arrange
    headers = {
        "HTTP_AUTHORIZATION": f"Bearer the-one-where-monica-gets-a-new-roommate",
    }
    factory = APIRequestFactory()
    request = factory.get("/your-endpoint/v1/friends", **headers)
    backend = JWTAccessTokenAuthentication()
    # Act
    with pytest.raises(AuthenticationFailed) as authentication_failed_exception:
        backend.authenticate(request)
    # Assert
    assert authentication_failed_exception.value.status_code == 401
    assert authentication_failed_exception.value.detail == "Bearer does not contain a valid JWT"

How is our authentication class so far:

import re

from django.http import HttpRequest
from jwt import DecodeError
from jwt import PyJWKClient
from rest_framework import authentication
from rest_framework import exceptions

from authentication_django_rest_framework import settings


class JWTAccessTokenAuthentication(authentication.BaseAuthentication):
    regex_bearer = re.compile(r"^[Bb]earer (.*)$")

    def __init__(self, *args, **kwargs):
        self.jwks_client = PyJWKClient(settings.AUTH0_TENANT_JWKS)

    def authenticate(self, request: HttpRequest):
        # Extract header
        header_authorization_value = request.headers.get("authorization")
        if not header_authorization_value:
            raise exceptions.AuthenticationFailed("Authorization header is not present")
        # Extract supposed raw JWT
        match = self.regex_bearer.match(header_authorization_value)
        if not match:
            raise exceptions.AuthenticationFailed("Authorization header must start with Bearer followed by its token")
        raw_jwt = match.groups()[-1]
        # Extract Key ID
        try:
            key_id = self.jwks_client.get_signing_key_from_jwt(raw_jwt)
        except DecodeError:
            raise exceptions.AuthenticationFailed("Bearer does not contain a valid JWT")

        raise NotImplementedError

Raise error if the provided token has no key ID

To test this scenario, we'll need a JWT, and we can generate as many as we want in Token DEV. Another important thing is the JWKS. The library PyJWT requires it to verify the token. Looking at its implementation, I could understand its internal process and see where we could return the JWKS using a mock. To do it, let's use the fixture below, and by the way, I already included three kinds of tokens so we can use them in our tests:

@pytest.fixture
def jwt_access_token_authentication_scenario(mocker):
    factory = APIRequestFactory()
    # You can create invalid tokens on TOKEN DEV: https://token.dev/
    token_with_invalid_kid = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6ImRvLXlvdS1saWtlLXNhbHQta2V5LWlkIn0.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkNhcmwgU2FnYW4iLCJhZG1pbiI6dHJ1ZSwiaWF0IjoxNjQ2NjAwNTIxLCJleHAiOjE2NDY2MDQxMjF9.eyrVJEU84OsRUPIhsHpJVTqltn4ITD8LTJbhdgLU20VkDVzZS80u7HsTE_J4Ih4wdxOS3l6jBUOv-6DmBbGkahHM59SBY4aibsIQdGA9s2BWNatl90LpifI4Wjs-0ptkMDVO_i_Pie6RlscThM_jHdj8agi50YTJKRonXjYjLd1wNbleU53tss0ePslh3yynV8lvIjjNT2bSRHpcllh6qFLpiPm_k7K4Ft69oGq3k9BvXCaNGKd5zjsyzP8704aRj0DaXGrqZ-yYwo0FoAGheVl2EKccFn7l4kJTjnwnaeP3eO6FadLjjHS1KEMs6du4AzmSPJSY_T3thV-FmWjekg"
    token_with_valid_kid = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsImtpZCI6ImZhWVp3X0NFSTBJUnotU2FHOWJoaSJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkNhcmwgU2FnYW4iLCJhZG1pbiI6dHJ1ZSwiaWF0IjoxNjQ2NjAwNTIxLCJleHAiOjE2NDY2MDQxMjF9.Obmnybm2oFwC0jOFSrZjVjY3F0W6w1zjCJFwp64rEeLcFa6yTh3zWGqiEtzlCYNGWNg1KS3JkycWqCRDGcsVkFhV1WRiLSHz8kA5m1rEk9A4pl1uSqt3vkGrC7X9h9LkPU2wp4YnCp3fZhIp7Z66rfy1L7Rebu6FyLnM-MFsk6IDikv01kFZkUNFQCYn5Uv1dY3xLWfdnOYllHmOs8boXt5z2DJKtWsNSe7-PBnrW0haQtihrI2cp9jVRj8815r1RBBfVbTQWslAQxdMxvk2ZtxCOvjv3UYG81k-ezVn2zoQXjG-JS4Uox6UQ5j6hR1arF-spkP1mXEtKH3EeB7G-g"
    valid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6ImZhWVp3X0NFSTBJUnotU2FHOWJoaSJ9.eyJpc3MiOiJodHRwczovL2FudHVuZXMudXMuYXV0aDAuY29tLyIsInN1YiI6ImZhY2Vib29rfDEwMjE4OTI1OTU2NDkxNjQyIiwiYXVkIjpbInVzZXItbWFuYWdlbWVudC9hcGl2aWV3LWRyZi1hcGkvYXBpL3YxIiwiaHR0cHM6Ly9hbnR1bmVzLnVzLmF1dGgwLmNvbS91c2VyaW5mbyJdLCJpYXQiOjE2NDY2MDI2ODUsImV4cCI6MTY0NjY4OTA4NSwiYXpwIjoiUDRjQkI4YThuMFJybE5FR1c3OWRKbmhBNFpyTVZ5S2oiLCJzY29wZSI6Im9wZW5pZCBwcm9maWxlIGVtYWlsIn0.vKE-EdlbSx1KVXenEBkTG62pnUwoWIbHCCoW_Td2rr6pLIbuQzI2XW0lRtVRqlNhVifvFOAkRaascnm24S9_9KFii2IjMYJ4uQ0HpKTC55LgOV1KMkiE-qLS6aSVu5qftWpQmJRQJ9sPoie7siFL8cXznrkWlE4JFtEU4abFkjt5HEbL-BeX_70jrt60til1ImlMuDNmK6g0TiE0Oc6TKilHoRppUE1gN9XFvaaISJLZg5jtYvc-Gj0jeNOsjhyXZaiiCFSCyOZ8VNlZxyJ7EEyRHjVymRNH9vp_u8kKyMzV324Wlzmcbw5tQPUrk3hvnYf3IOT0QM5FFj6AvM228Q"
    fake_jwks = {
        "keys": [
            {
                "alg": "RS256",
                "kty": "RSA",
                "use": "sig",
                "n": "01EW-npmkOYEpwM6LLKpr6OJ1s_gQQz3biUzBY5QdH3JwWS37h6WFUdyv-CJEBWBetbzHLBYx_58HbGcGwmhht7bXJ8WDlRroxvt7MoYhINMaG8aXo3Giw0_st-VaEC8BuNEemfhJHBlcpJR8-ZdSLx5Q-rFojePOdnVrbcGIviVu9b6pOPHI1jnW_WmyBfG5XmXPHy2aL3OxjLFa8uVkxyHIu1mN3hWEdzZqewUqrFe91egCwT7u4MOkLgfmym_meXjXgIJZSp-GvNGJzk8Iyr0EszlrimP8eBgLg4AjEmwQzRkcRSXYsGCjO8-Dy4ecch-YNhOXpzWSf4bC22XYw",
                "e": "AQAB",
                "kid": "faYZw_CEI0IRz-SaG9bhi",
                "x5t": "ovFav4LfCHs4qZaOImYWpxoXdzA",
                "x5c": [
                    "MIIDAzCCAeugAwIBAgIJbqiVa0rLk9wpMA0GCSqGSIb3DQEBCwUAMB8xHTAbBgNVBAMTFGFudHVuZXMudXMuYXV0aDAuY29tMB4XDTIxMTAwNjE1MDIwNVoXDTM1MDYxNTE1MDIwNVowHzEdMBsGA1UEAxMUYW50dW5lcy51cy5hdXRoMC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDTURb6emaQ5gSnAzossqmvo4nWz+BBDPduJTMFjlB0fcnBZLfuHpYVR3K/4IkQFYF61vMcsFjH/nwdsZwbCaGG3ttcnxYOVGujG+3syhiEg0xobxpejcaLDT+y35VoQLwG40R6Z+EkcGVyklHz5l1IvHlD6sWiN4852dWttwYi+JW71vqk48cjWOdb9abIF8bleZc8fLZovc7GMsVry5WTHIci7WY3eFYR3Nmp7BSqsV73V6ALBPu7gw6QuB+bKb+Z5eNeAgllKn4a80YnOTwjKvQSzOWuKY/x4GAuDgCMSbBDNGRxFJdiwYKM7z4PLh5xyH5g2E5enNZJ/hsLbZdjAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFC21wXqeOFczGIEjlB+FCDVuaqleMA4GA1UdDwEB/wQEAwIChDANBgkqhkiG9w0BAQsFAAOCAQEAl1I7bmq6ihafH9Zts+Fc9/Pe6kQ6C8yUMTJheNpiX6FTBfEWPuX/KWDz2WcC2/1S8tsQZPD3GJEF899LDa8F+mHY2adWMgFep5e5AejcwdmnZlCZoKmVAZ2HZHMgQr7RM0c0HZ2laBbzv4XcZPiDBP8YCuJlmL4zQFMeuWlA4ShCPB8Vk0VhDIJ/GBHvKYgy2pSa7mfZpoC4JcUc5XV4q6fZahEL27eqC3l4ffXaEcBK1axy769SaJpxHgpEeniMkfGcbuAYamInO64lhqKLf0hq9kQ6WId17hOt9nMa2q2ct88s5ZJirDzkE9uEKr0m9tqqaTgupN/xgq0xHVXkww=="
                ],
            },
            {
                "alg": "RS256",
                "kty": "RSA",
                "use": "sig",
                "n": "uPw-p0UpUVWd54qkPEfxt6GRqt1kJFDmzWmwVBfJxtRLp4m7jixzX9KNQrRWhBNJ1rlAxqpookqeB6cm74aEJ_UAJ-uPHnGKqYdA41VBOMrCgMl-DH86peK-HtGg_0vg6D0qMkcmXZJBGeKdK6UAhw0uwALEqN_twlBwdvtVocS30fvYdt_JqTnSb8uimRnoaA5GoAet5fAG7cph5ZnZuIAYdVf4T3RiPBdRNtHJbP9cuCZatJWb7CabjuIN9wmztAsex8n9wuSp06_wuVWJQQiCDGQF8tT11yn4TlFnzdlwxpQ8ngrvsoAt0KPfA_1rrFBL9vhGIGFkkRvfC3WFUw",
                "e": "AQAB",
                "kid": "1Yjr6qd1riVeCrHC-DuhH",
                "x5t": "oLlrWY6HThi71U-AJZwN4Jn24IU",
                "x5c": [
                    "MIIDAzCCAeugAwIBAgIJU21sCpl+udZDMA0GCSqGSIb3DQEBCwUAMB8xHTAbBgNVBAMTFGFudHVuZXMudXMuYXV0aDAuY29tMB4XDTIxMTAwNjE1MDIwNloXDTM1MDYxNTE1MDIwNlowHzEdMBsGA1UEAxMUYW50dW5lcy51cy5hdXRoMC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC4/D6nRSlRVZ3niqQ8R/G3oZGq3WQkUObNabBUF8nG1EunibuOLHNf0o1CtFaEE0nWuUDGqmiiSp4HpybvhoQn9QAn648ecYqph0DjVUE4ysKAyX4Mfzql4r4e0aD/S+DoPSoyRyZdkkEZ4p0rpQCHDS7AAsSo3+3CUHB2+1WhxLfR+9h238mpOdJvy6KZGehoDkagB63l8AbtymHlmdm4gBh1V/hPdGI8F1E20cls/1y4Jlq0lZvsJpuO4g33CbO0Cx7Hyf3C5KnTr/C5VYlBCIIMZAXy1PXXKfhOUWfN2XDGlDyeCu+ygC3Qo98D/WusUEv2+EYgYWSRG98LdYVTAgMBAAGjQjBAMA8GA1UdEwEB/wQFMAMBAf8wHQYDVR0OBBYEFG1wVasCVyhsSQnaDuAxSj0AF3/qMA4GA1UdDwEB/wQEAwIChDANBgkqhkiG9w0BAQsFAAOCAQEArfE7/0Khu6Dupfyy9dy5FdL9HUxBF1YgFeOWPBZg8VilRkldjq+S8axYUdbhpyCuEcnnInqO17t+KJ5/oRdEb1Ma4Lj4XD2GdyN1wTniUoq2P/r5aGRqToISAEtwpvVgYmQZflDC9xYq+d5ddZ43LfouWKSkE01OfL7YQJM+yNWm4dwQAT0gXNchnBGRtlajhStYLDb6Sci/AizEIMcqZnkXoBJQXwaEBYZCsXCkWgUoiQFVPzZr07m4iQF4FtoyPsjlbxbhQ2ymEbVCS986zmApkTfv9GcTSpIonoom7fMjkww+7s5/CoKgPCvYeEu+phmW8nzY8o0FsSeNUfI6nw=="
                ],
            },
        ]
    }
    mocked_jwks_client = PyJWKClient("https://agrabah/.well-known/jwks.json")
    mocker.patch.object(mocked_jwks_client, "get_jwk_set", lambda: PyJWKSet.from_dict(fake_jwks))
    extra_internal_options = {
        "internal_extra_jwt_decode_options": {"verify_exp": False},
        "internal_jwks_client": mocked_jwks_client,
    }
    backend = JWTAccessTokenAuthentication(**extra_internal_options)
    return factory, backend, (token_with_invalid_kid, token_with_valid_kid, valid_token)

Did you notice the extra_internal_options variable and how we're using it to instantiate our authentication class? We'll get there soon. Before showing how is the authentication class, let's see the test:

def test_should_raise_error_if_provided_jwt_has_no_kid(self, jwt_access_token_authentication_scenario):
    # Arrange
    factory, backend, tokens = jwt_access_token_authentication_scenario
    token_with_invalid_kid, _ = tokens
    headers = {
        "HTTP_AUTHORIZATION": f"Bearer {token_with_invalid_kid}",
    }
    request = factory.get("/your-endpoint/v1/friends", **headers)
    # Act
    with pytest.raises(AuthenticationFailed) as authentication_failed_exception:
        backend.authenticate(request)
    # Assert
    assert authentication_failed_exception.value.status_code == 401
    assert authentication_failed_exception.value.detail == "JWT does not have a valid Key ID"

The authentication class so far:

import re

from django.http import HttpRequest
from jwt import DecodeError
from jwt import PyJWKClient
from jwt import PyJWKClientError
from rest_framework import authentication
from rest_framework import exceptions

from authentication_django_rest_framework import settings


class JWTAccessTokenAuthentication(authentication.BaseAuthentication):
    regex_bearer = re.compile(r"^[Bb]earer (.*)$")

    def __init__(self, *args, **kwargs):
        internal_extra_jwt_decode_options = kwargs.get("internal_extra_jwt_decode_options")
        if internal_extra_jwt_decode_options:
            self.internal_extra_jwt_decode_options = internal_extra_jwt_decode_options
        # Retrieving JWKS
        jwks_client = kwargs.get("internal_jwks_client")
        if jwks_client:
            self.jwks_client = jwks_client
        else:
            self.jwks_client = PyJWKClient(settings.AUTH0_TENANT_JWKS)

    def authenticate(self, request: HttpRequest):
        # Extract header
        header_authorization_value = request.headers.get("authorization")
        if not header_authorization_value:
            raise exceptions.AuthenticationFailed("Authorization header is not present")
        # Extract supposed raw JWT
        match = self.regex_bearer.match(header_authorization_value)
        if not match:
            raise exceptions.AuthenticationFailed("Authorization header must start with Bearer followed by its token")
        raw_jwt = match.groups()[-1]
        # Extract "kid"
        try:
            key_id = self.jwks_client.get_signing_key_from_jwt(raw_jwt)
        except PyJWKClientError as e:
            error_message = str(e)
            if "Unable to find a signing key" in error_message:
                raise exceptions.AuthenticationFailed("JWT does not have a valid Key ID")
            else:
                raise NotImplementedError
        except DecodeError:
            raise exceptions.AuthenticationFailed("Bearer does not contain a valid JWT")

        raise NotImplementedError

Depending on which error PyJWKClientError has, we raise NotImplementedError. Actually, this is bad practice. So instead, we should ask ourselves: do we raise an exception to show a 5XX to whoever is calling the API, or do we treat it as 401? If I were doing this for real, I would scrutinize the library I'm using for tips. So I leave this up to you. Let's move on.

Raise error if the provided token has an invalid signature

This is our last test where we want an error to be raised. This is our test:

def test_should_raise_error_if_provided_jwt_has_invalid_signature(self, jwt_access_token_authentication_scenario):
    # Arrange
    factory, backend, tokens = jwt_access_token_authentication_scenario
    _, token_with_valid_kid, _ = tokens
    headers = {
        "HTTP_AUTHORIZATION": f"Bearer {token_with_valid_kid}",
    }
    request = factory.get("/your-endpoint/v1/friends", **headers)
    # Act
    with pytest.raises(AuthenticationFailed) as authentication_failed_exception:
        backend.authenticate(request)
    # Assert
    assert authentication_failed_exception.value.status_code == 401
    assert authentication_failed_exception.value.detail == "Bearer token is invalid"

The business code:

def authenticate(self, request: HttpRequest):
    # Extract header
    # (...)
    # Extract "kid"
    # (...)

    options = None
    extra_params = {"algorithms": ["RS256"], "audience": settings.AUTH0_MY_APPLICATION_AUDIENCE}
    # See the constructor method to understand this
    if hasattr(self, "internal_extra_jwt_decode_options"):
        options = getattr(self, "internal_extra_jwt_decode_options")
    try:
        data = jwt.decode(raw_jwt, key_id.key, **extra_params, options=options)
    except InvalidTokenError:
        raise exceptions.AuthenticationFailed("Bearer token is invalid")

    raise NotImplementedError

Do you remember the extra_internal_options dictionary? We're using it to activate some flags during the decoding process, which is vital for our tests to continue working as all tokens JWT might expire.

Another important thing is about InvalidTokenError. This is a generic base class for all token validation exceptions.

Should return token user

Finally, the point where the authentication class is functional! The test scenario:

    def test_should_return_jwt_user(self, jwt_access_token_authentication_scenario, settings, mocker):
        # Arrange
        settings.AUTH0_MY_APPLICATION_AUDIENCE = "user-management/apiview-drf-api/api/v1"
        mocker.patch(
            "authentication_django_rest_framework.apps.core.api.authentication.authentications.settings", settings
        )
        factory, backend, tokens = jwt_access_token_authentication_scenario
        *_, valid_token = tokens
        headers = {
            "HTTP_AUTHORIZATION": f"Bearer {valid_token}",
        }
        request = factory.get("/your-endpoint/v1/friends", **headers)
        expected_token = {
            "iss": "https://antunes.us.auth0.com/",
            "sub": "facebook|10218925956491642",
            "aud": ["user-management/apiview-drf-api/api/v1", "https://antunes.us.auth0.com/userinfo"],
            "iat": 1646602685,
            "exp": 1646689085,
            "azp": "P4cBB8a8n0RrlNEGW79dJnhA4ZrMVyKj",
            "scope": "openid profile email",
        }
        # Act
        result = backend.authenticate(request)
        # Assert
        assert result == (TokenUser(expected_token), expected_token)

Instead of ending with raise NotImplementedError, I just changed it to the following:

return TokenUser(data), data

That's it 😛!

Things you should handle, but we didn't cover

What happens if we can't retrieve the JWKS? How about the audience? Well, I could keep going, but I think you get the point. It's critical to cover other scenarios if they make sense for you.

Seeing the authentication class in action

Here's the example view using it:

class ExampleView(APIView):
    authentication_classes = [JWTAccessTokenAuthentication]

    def get(self, request):
        authenticated_user: TokenUser = request.user
        body = {
            "user": authenticated_user.id,
        }
        return Response(body, status=status.HTTP_200_OK)

Here's the test validating it:

import pytest


@pytest.fixture
def accept_fake_access_token(mocker):
    mock_class = mocker.patch(
        "authentication_django_rest_framework.apps.core.api.authentication.authentications.PyJWKClient"
    )
    mock_class.return_value = mocker.MagicMock()
    mocked_jwt = mocker.patch("authentication_django_rest_framework.apps.core.api.authentication.authentications.jwt")
    fake_data = {
        "iss": "https://antunes.us.auth0.com/",
        "sub": "facebook|10218925956491642",
        "aud": ["user-management/apiview-drf-api/api/v1", "https://antunes.us.auth0.com/userinfo"],
        "iat": 1646602685,
        "exp": 1646689085,
        "azp": "P4cBB8a8n0RrlNEGW79dJnhA4ZrMVyKj",
        "scope": "openid profile email",
    }
    mocked_jwt.decode.return_value = fake_data
    return fake_data


class TestExampleView:
    def test_should_return_200_with_user_attributes(self, accept_fake_access_token, client):
        # Arrange
        token_body = accept_fake_access_token
        header = {
            "HTTP_AUTHORIZATION": "Bearer you-should-watch-arcane",
        }
        # Act
        response = client.get("/api/v1/friends", content_type="application/json", **header)
        # Assert
        assert response.status_code == 200
        result = response.json()
        assert result == {"user": token_body["sub"]}

There are other ways to configure it, for instance, you can configure it globally.

Next steps and conclusion

The authentication class we created has an issue. Every time Django starts, it will download the JWKS. So, if we're using gunicorn with 5 workers, that means it will download the same thing 5 times. It worsens if our service runs using 5 pods, representing 25 requests, mainly because Auht0 has a rate limit for the JWKS endpoint. We can solve it using a cache solution, which we'll see in the next blog entry.

See everything we did here on GitHub.

Posted listening to Closer Than Close, Bee Gees 🎶.


Have you found any mistakes 👀? Feel free to submit a PR editing this blog entry 😄.