FastAPI - KeyCloak OAuth2 Integration
In this blog post, I'm going to integrate FastAPI with KeyCloak to authenticate the users through KeyCloak. Instead of implementing authentication layer in each service, we take the advantage of KeyCloak to handle it in a centralized point. We can achieve that by using JWT tokens with RSA signing.
Install requirements in below
Requirements
- uvicorn, fastapi, pydantic, pydantic-settings, python-keycloak, pyjwt, python-multipart
KeyCloak Realm and Client Configuration
- First create a realm in KeyCloak named
local
and select it. - Under
clients
, create a new client with below specs
Client type: OpenID Connect
Client ID: local-api - Click next and enable below capabilities
Client Authentication: On
Direct Access Grants: On
Standart Flow: On - Click next and configure login settings
Root URL: http://localhost:8000
Home URL: http://localhost:8000
Valid Redirect URL: http://localhost:8000/*
Web Origins: http://localhost:8000 - Click save to create the client
Obtain Client Credentials
- Go to client details page of
local-api
client and select credentials tab. - Copy client secret, it will be used by FastAPI app
Create a Test User in Keycloak
- Click Users tab, and Click Create new user or Add user button
- Fill the form and click create button.
- In the user details page, click credentials tab and set a new password. Don't forget to disable Temporary password toggle to keep password permanent.
Create settings.py
I'm going to use pydantic-settings to load env variables based on the runtime env. If you set RUNTIME_ENV
as local or unset, then application will look for .local.env
in the app directory to load env variables. You must set KEYCLOAK variables in .local.env
file. An example .env file would look like this.
RUNTIME_ENV=local
KEYCLOAK_SERVER_URL=http://localhost:8080/auth
KEYCLOAK_CLIENT_ID=local-api
KEYCLOAK_REALM_NAME=local
KEYCLOAK_CLIENT_SECRET_KEY=<secret that you get from keycloak>
from pydantic_settings import BaseSettings, SettingsConfigDict
import os
class CommonSettings():
VERSION: str = "0.1.0"
KEYCLOAK_SERVER_URL: str
KEYCLOAK_CLIENT_ID: str
KEYCLOAK_REALM_NAME: str
KEYCLOAK_CLIENT_SECRET_KEY: str
class LocalSettings(CommonSettings, BaseSettings):
model_config = SettingsConfigDict(env_file='.local.env', env_file_encoding='utf-8')
RUNTIME_ENV: str = "local"
class ProdSettings(CommonSettings, BaseSettings):
model_config = SettingsConfigDict(env_file='.env', env_file_encoding='utf-8')
RUNTIME_ENV: str = "prod"
runtime_env = os.environ.get("RUNTIME_ENV", "local")
settings = LocalSettings() if runtime_env == "local" else ProdSettings()
Create security.py
In this module, we implement functions to use later and initiliaze and configure keycloak class for connecting to keycloak server.
from keycloak import KeycloakOpenID
from .settings import settings as s
idp = KeycloakOpenID(
s.KEYCLOAK_SERVER_URL,
s.KEYCLOAK_REALM_NAME,
s.KEYCLOAK_CLIENT_ID,
s.KEYCLOAK_CLIENT_SECRET_KEY
)
Create auth.py and Login endpoint
In this module we implement login endpoint and issue a token from Keycloak by using credentials provided by user. That's why you need to enable Direct Access Grants
in client capabilities to exchange token with username and password. We return access and refresh tokens to user.
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from .settings import settings as s
from .security import idp
from keycloak.exceptions import KeycloakAuthenticationError, KeycloakError
class OpenIdToken(BaseModel):
token_type: str
access_token: str
refresh_token: str
expires_in: int
refresh_expires_in: int
app = FastAPI(title="Code Reference API",
description="API for code reference",
version=s.VERSION)
@app.post("/login")
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
try:
resp = idp.token(form.username, form.password, scope="openid profile email")
return OpenIdToken(**resp)
except KeycloakAuthenticationError as e:
raise HTTPException(401, "Username or password is incorrect!") from e
except KeycloakError as e:
raise HTTPException(400, "Request malformed!") from e
By here, we have managed to login through Keycloak. Now you must be able to get access token from /login endpoint. Give it a try!
Protect an Endpoint and Validate Access Tokens
Now it's time to make JWT tokens mandatory for protected endpoints. Let's decode and validate JWT tokens.
Add below lines to security.py
oauth_token
gives information where to obtain token from for swagger UI.decode_token
calls keycloak decode_token function to decode and verify token by fetching public key from KeyCloak. (I'm going to show how to verify it without fetching key every time)
Function also handles the exceptions and returns proper error messages.
from fastapi.security import OAuth2PasswordBearer
oauth_token = OAuth2PasswordBearer("/login")
async def decode_token(token: str) -> dict:
try:
return await idp.a_decode_token(token, validate=True)
except JWTExpired as e:
raise HTTPException(401, "Token has expired") from e
except InvalidJWSSignature as e:
raise HTTPException(400, "Token signature couldn't be verified") from e
except JWException as e:
raise HTTPException(400, "Token is malformed") from e
except Exception as e:
print(e)
raise HTTPException(500, "Error while decoding token") from e
Then we need to implement a dependency function to mark Authorization header as required and inject it into endpoint.
from typing import Annotated
from fastapi import Depends
from .security import oauth_token, decode_token
async def get_user(token: Annotated[str, Depends(oauth_token)]):
return await decode_token(token)
@app.post("/me")
async def me(user: Annotated[dict, Depends(get_user)]):
return token
When you visit the swagger UI, you will see an lock icon next to /me endpoint and login button in top-right corner. Login and try to send a request to me endpoint. You should see the issued token by Keycloak.
How to Verify without fetching key every time?
On the first startup, we can load the key and store it in the memory. While decoding the token, we use the from memory instead of fetching it from Keycloak. It reduces the load on keycloak and latency, but it comes with its consequences such as how to handle key rotation? Just keep in mind.
We're going to use lifespan feature in FastAPI. We load the key on app startup and save it to settings
variable to make it available everywhere.
Don't forget to add JWT_KEY
to your settings class.
from contextlib import asynccontextmanager
from jwcrypto import jwk
from .settings import settings as s
from .security import idp
@asynccontextmanager
async def lifespan(app: FastAPI):
key = (
"-----BEGIN PUBLIC KEY-----\n"
+ await idp.a_public_key()
+ "\n-----END PUBLIC KEY-----"
)
s.JWT_KEY = jwk.JWK.from_pem(key.encode("utf-8"))
yield
app = FastAPI(version=s.VERSION,
lifespan=lifespan)
Now we should provide the key to Keycloak class to use it, instead of fetching it. Find and change decode_token
function content.
return await idp.a_decode_token(token, validate=True, key=s.JWT_KEY)