In this blog post, I'm going to integrate FastAPI with KeyCloak to authenticate the users through KeyCloak. Instead of implementing authentication layer in each service, we take the advantage of KeyCloak to handle it in a centralized point. We can achieve that by using JWT tokens with RSA signing.

Install requirements in below

Requirements
- uvicorn, fastapi, pydantic, pydantic-settings, python-keycloak, pyjwt, python-multipart

KeyCloak Realm and Client Configuration

  • First create a realm in KeyCloak named local and select it.
  • Under clients , create a new client with below specs
    Client type: OpenID Connect
    Client ID: local-api
  • Click next and enable below capabilities
    Client Authentication: On
    Direct Access Grants: On
    Standart Flow: On
  • Click next and configure login settings
    Root URL: http://localhost:8000
    Home URL: http://localhost:8000
    Valid Redirect URL: http://localhost:8000/*
    Web Origins: http://localhost:8000
  • Click save to create the client

Obtain Client Credentials

  • Go to client details page of local-api client and select credentials tab.
  • Copy client secret, it will be used by FastAPI app

Create a Test User in Keycloak

  • Click Users tab, and Click Create new user or Add user button
  • Fill the form and click create button.
  • In the user details page, click credentials tab and set a new password. Don't forget to disable Temporary password toggle to keep password permanent.

Create settings.py

I'm going to use pydantic-settings to load env variables based on the runtime env. If you set RUNTIME_ENV as local or unset, then application will look for .local.env in the app directory to load env variables. You must set KEYCLOAK variables in .local.env file. An example .env file would look like this.

RUNTIME_ENV=local
KEYCLOAK_SERVER_URL=http://localhost:8080/auth
KEYCLOAK_CLIENT_ID=local-api
KEYCLOAK_REALM_NAME=local
KEYCLOAK_CLIENT_SECRET_KEY=<secret that you get from keycloak>
from pydantic_settings import BaseSettings, SettingsConfigDict
import os 


class CommonSettings():
    VERSION: str = "0.1.0"
    
    KEYCLOAK_SERVER_URL: str
    KEYCLOAK_CLIENT_ID: str
    KEYCLOAK_REALM_NAME: str 
    KEYCLOAK_CLIENT_SECRET_KEY: str


class LocalSettings(CommonSettings, BaseSettings):
    model_config = SettingsConfigDict(env_file='.local.env', env_file_encoding='utf-8')
    RUNTIME_ENV: str = "local"


class ProdSettings(CommonSettings, BaseSettings):
    model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')
    RUNTIME_ENV: str = "prod"


runtime_env = os.environ.get("RUNTIME_ENV", "local")
settings = LocalSettings() if runtime_env == "local" else ProdSettings()

Create security.py

In this module, we implement functions to use later and initiliaze and configure keycloak class for connecting to keycloak server.

from keycloak import KeycloakOpenID
from .settings import settings as s


idp = KeycloakOpenID(
    s.KEYCLOAK_SERVER_URL,
    s.KEYCLOAK_REALM_NAME,
    s.KEYCLOAK_CLIENT_ID,
    s.KEYCLOAK_CLIENT_SECRET_KEY
)

Create auth.py and Login endpoint

In this module we implement login endpoint and issue a token from Keycloak by using credentials provided by user. That's why you need to enable Direct Access Grants in client capabilities to exchange token with username and password. We return access and refresh tokens to user.

from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel


from .settings import settings as s
from .security import idp
from keycloak.exceptions import KeycloakAuthenticationError, KeycloakError


class OpenIdToken(BaseModel):
    token_type: str
    access_token: str 
    refresh_token: str
    expires_in: int 
    refresh_expires_in: int 
    

app = FastAPI(title="Code Reference API",
              description="API for code reference",
              version=s.VERSION)


@app.post("/login")
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
    try: 
        resp = idp.token(form.username, form.password, scope="openid profile email")
        return OpenIdToken(**resp)

    except KeycloakAuthenticationError as e:
        raise HTTPException(401, "Username or password is incorrect!") from e

    except KeycloakError as e:
        raise HTTPException(400, "Request malformed!") from e

By here, we have managed to login through Keycloak. Now you must be able to get access token from /login endpoint. Give it a try!

Protect an Endpoint and Validate Access Tokens

Now it's time to make JWT tokens mandatory for protected endpoints. Let's decode and validate JWT tokens.

Add below lines to security.py

  • oauth_token gives information where to obtain token from for swagger UI.
  • decode_token calls keycloak decode_token function to decode and verify token by fetching public key from KeyCloak. (I'm going to show how to verify it without fetching key every time)
    Function also handles the exceptions and returns proper error messages.
from fastapi.security import OAuth2PasswordBearer


oauth_token = OAuth2PasswordBearer("/login")


async def decode_token(token: str) -> dict:
    try: 
        return await idp.a_decode_token(token, validate=True)
        
    except JWTExpired as e:
        raise HTTPException(401, "Token has expired") from e 
    
    except InvalidJWSSignature as e: 
        raise HTTPException(400, "Token signature couldn't be verified") from e 
    
    except JWException as e: 
        raise HTTPException(400, "Token is malformed") from e 
    
    except Exception as e:
        print(e)
        raise HTTPException(500, "Error while decoding token") from e 

Then we need to implement a dependency function to mark Authorization header as required and inject it into endpoint.

from typing import Annotated
from fastapi import Depends
from .security import oauth_token, decode_token


async def get_user(token: Annotated[str, Depends(oauth_token)]):
    return await decode_token(token)
@app.post("/me")
async def me(user: Annotated[dict, Depends(get_user)]):
    return token

When you visit the swagger UI, you will see an lock icon next to /me endpoint and login button in top-right corner. Login and try to send a request to me endpoint. You should see the issued token by Keycloak.

How to Verify without fetching key every time?

On the first startup, we can load the key and store it in the memory. While decoding the token, we use the from memory instead of fetching it from Keycloak. It reduces the load on keycloak and latency, but it comes with its consequences such as how to handle key rotation? Just keep in mind.

We're going to use lifespan feature in FastAPI. We load the key on app startup and save it to settings variable to make it available everywhere.
Don't forget to add JWT_KEY to your settings class.

from contextlib import asynccontextmanager
from jwcrypto import jwk
from .settings import settings as s 
from .security import idp


@asynccontextmanager
async def lifespan(app: FastAPI):
    key = (
        "-----BEGIN PUBLIC KEY-----\n"
        + await idp.a_public_key()
        + "\n-----END PUBLIC KEY-----"
    )
    s.JWT_KEY = jwk.JWK.from_pem(key.encode("utf-8"))
    yield 


app = FastAPI(version=s.VERSION,
              lifespan=lifespan)

Now we should provide the key to Keycloak class to use it, instead of fetching it. Find and change decode_token function content.

return await idp.a_decode_token(token, validate=True, key=s.JWT_KEY)