OAuth 2.0 and OpenID Connect - Delegated Authorization
Before you read any further, study this OAuth callback handler and predict the vulnerability:
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
@app.get("/callback")
async def oauth_callback(request: Request):
code = request.query_params.get("code")
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": "my-app",
"client_secret": "hardcoded-secret-123",
"redirect_uri": request.query_params.get("redirect_uri"),
},
)
tokens = response.json()
return {"access_token": tokens["access_token"]}
There are at least four security issues in this code. By the end of this lesson, you will understand the OAuth 2.0 and OIDC protocols deeply enough to spot all of them and build a secure implementation.
What You Will Learn
- The OAuth 2.0 authorization framework and why it exists
- The four grant types and when each is appropriate
- The authorization code flow step by step
- Why PKCE is mandatory for public clients (SPAs, mobile apps)
- How OpenID Connect adds identity on top of OAuth 2.0
- How to implement OAuth with authlib in Python
- Keycloak OIDC integration with FastAPI
- Token exchange, scopes, and consent flows
- How to build a "Login with Google" flow
Prerequisites
- JWT structure and verification (Lesson 02)
- FastAPI routing and dependency injection (from Intermediate course)
- HTTP redirects and query parameters
pip install authlib httpx
Part 1 - Why OAuth 2.0 Exists
Before OAuth, if you wanted a third-party app to access your data, you gave it your password. The app stored your password and logged in as you. This was catastrophic for security:
- The app had full access to your account, not just what it needed
- You could not revoke the app's access without changing your password
- If the app was compromised, your password was exposed
OAuth 2.0 solves this by introducing delegated authorization: the user grants the app limited access via tokens, without sharing credentials.
Key Roles in OAuth 2.0
| Role | Description | Example |
|---|---|---|
| Resource Owner | The user who owns the data | You |
| Client | The application requesting access | EngineersOfAI web app |
| Authorization Server | Issues tokens after user consent | Keycloak, Google, Auth0 |
| Resource Server | The API that holds the protected data | EngineersOfAI API |
Part 2 - Grant Types: Choosing the Right Flow
Authorization Code Grant (with PKCE)
The standard flow for web applications and SPAs. The most secure option for user-facing applications:
Client Credentials Grant
For machine-to-machine communication where no user is involved:
import httpx
async def get_machine_token() -> str:
"""Obtain a token for service-to-service communication."""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.engineersofai.com/realms/main/protocol/openid-connect/token",
data={
"grant_type": "client_credentials",
"client_id": "course-sync-service",
"client_secret": os.environ["CLIENT_SECRET"],
"scope": "courses:read courses:write",
},
)
response.raise_for_status()
return response.json()["access_token"]
When to Use Which Grant Type
| Grant Type | Use Case | Client Type |
|---|---|---|
| Authorization Code + PKCE | Web apps, SPAs, mobile apps | Public or confidential |
| Client Credentials | Service-to-service, background jobs | Confidential (server) |
| Device Code | Smart TVs, CLI tools, IoT | Input-constrained devices |
| Refresh Token | Extend sessions without re-login | Any (with refresh token) |
Implicit grant and Resource Owner Password grant are deprecated in OAuth 2.1. The implicit grant exposes tokens in URL fragments. The password grant requires the client to handle user credentials directly. Always use Authorization Code + PKCE.
Part 3 - PKCE: Proof Key for Code Exchange
PKCE (pronounced "pixy") prevents authorization code interception attacks. It is mandatory for public clients (SPAs, mobile) and recommended for all clients:
import hashlib
import base64
import os
def generate_pkce_pair() -> tuple[str, str]:
"""Generate a PKCE code_verifier and code_challenge."""
# code_verifier: 43-128 character random string
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).rstrip(b"=").decode()
# code_challenge: SHA-256 hash of the verifier, base64url-encoded
challenge_bytes = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(challenge_bytes).rstrip(b"=").decode()
return code_verifier, code_challenge
verifier, challenge = generate_pkce_pair()
print(f"Verifier: {verifier}") # Stored in session, sent during token exchange
print(f"Challenge: {challenge}") # Sent with the authorization request
# Step 1: Send code_challenge to /authorize
# Step 2: Send code_verifier to /token
# The auth server verifies: SHA256(code_verifier) == code_challenge
Without PKCE, an attacker who intercepts the authorization code (via a malicious redirect, browser extension, or shared device) can exchange it for tokens. With PKCE, the attacker also needs the code_verifier, which was never transmitted over the network during the authorization step.
Part 4 - OpenID Connect: Identity on Top of OAuth
OAuth 2.0 is an authorization framework - it tells you what the user can access, not who the user is. OpenID Connect (OIDC) adds an identity layer:
| OAuth 2.0 | OpenID Connect |
|---|---|
| Access token (opaque or JWT) | ID token (always a JWT) |
| "This token grants access to X" | "This token proves the user is Y" |
Scopes: read, write | Scopes: openid, profile, email |
| No standardized user info | /userinfo endpoint |
The ID Token
An ID token is a JWT that contains identity claims about the user:
import jwt
# Decode an ID token (after signature verification)
id_token_payload = {
"iss": "https://auth.engineersofai.com",
"sub": "user_456", # Unique user identifier
"aud": "engineersofai-web", # Client that requested the token
"exp": 1709500000,
"iat": 1709496400,
"nonce": "random-nonce-123", # Replay protection
"name": "AI Engineer",
"email_verified": True,
"preferred_username": "aiengr",
}
# The ID token is for the CLIENT, not the API
# Never send the ID token to an API as an access token
The UserInfo Endpoint
For additional user details beyond what the ID token contains:
import httpx
async def get_user_info(access_token: str) -> dict:
"""Fetch user profile from the OIDC userinfo endpoint."""
async with httpx.AsyncClient() as client:
response = await client.get(
"https://auth.engineersofai.com/realms/main/protocol/openid-connect/userinfo",
headers={"Authorization": f"Bearer {access_token}"},
)
response.raise_for_status()
return response.json()
# Returns: {"sub": "user_456", "name": "Amit", "email": "...", ...}
The ID token proves identity to the client application. The access token authorizes API requests. Never confuse the two. Do not send an ID token as a Bearer token to your API.
Part 5 - Implementing OAuth with authlib
authlib provides a clean, well-tested implementation of OAuth 2.0 and OIDC for Python:
import os
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.middleware.sessions import SessionMiddleware
app = FastAPI()
app.add_middleware(
SessionMiddleware,
secret_key=os.environ["SESSION_SECRET"],
https_only=True,
same_site="lax",
)
oauth = OAuth()
oauth.register(
name="keycloak",
client_id=os.environ["KEYCLOAK_CLIENT_ID"],
client_secret=os.environ["KEYCLOAK_CLIENT_SECRET"],
server_metadata_url=(
"https://auth.engineersofai.com/realms/main"
"/.well-known/openid-configuration"
),
client_kwargs={"scope": "openid profile email"},
)
@app.get("/login")
async def login(request: Request):
"""Initiate the OAuth authorization code flow."""
redirect_uri = request.url_for("callback")
return await oauth.keycloak.authorize_redirect(request, redirect_uri)
@app.get("/callback")
async def callback(request: Request):
"""Handle the OAuth callback after user authentication."""
token = await oauth.keycloak.authorize_access_token(request)
# token contains: access_token, id_token, refresh_token, token_type
# authlib automatically validates the ID token signature and claims
userinfo = token.get("userinfo")
if not userinfo:
userinfo = await oauth.keycloak.userinfo(token=token)
# Store user info in session
request.session["user"] = {
"sub": userinfo["sub"],
"name": userinfo.get("name", ""),
"email": userinfo.get("email", ""),
}
return RedirectResponse(url="/dashboard")
@app.get("/logout")
async def logout(request: Request):
"""Clear the session and redirect to Keycloak logout."""
request.session.clear()
# Keycloak end-session endpoint
logout_url = (
"https://auth.engineersofai.com/realms/main"
"/protocol/openid-connect/logout"
f"?post_logout_redirect_uri={request.url_for('login')}"
f"&client_id={os.environ['KEYCLOAK_CLIENT_ID']}"
)
return RedirectResponse(url=logout_url)
@app.get("/dashboard")
async def dashboard(request: Request):
user = request.session.get("user")
if not user:
return RedirectResponse(url="/login")
return {"message": f"Welcome, {user['name']}!", "user": user}
Part 6 - Keycloak OIDC with FastAPI (API Backend)
For API-only backends (where the frontend handles the OAuth flow), you verify tokens issued by Keycloak:
import os
from typing import Annotated
import jwt
from jwt import PyJWKClient
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel
app = FastAPI()
security = HTTPBearer()
KEYCLOAK_REALM_URL = os.environ.get(
"KEYCLOAK_REALM_URL",
"https://auth.engineersofai.com/realms/main",
)
JWKS_URL = f"{KEYCLOAK_REALM_URL}/protocol/openid-connect/certs"
ISSUER = KEYCLOAK_REALM_URL
AUDIENCE = os.environ.get("JWT_AUDIENCE", "engineersofai-api")
jwks_client = PyJWKClient(JWKS_URL, cache_jwk_set=True, lifespan=3600)
class KeycloakUser(BaseModel):
sub: str
email: str | None = None
name: str | None = None
roles: list[str] = []
async def get_keycloak_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
) -> KeycloakUser:
"""Verify a Keycloak-issued JWT and extract user claims."""
token = credentials.credentials
try:
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=AUDIENCE,
issuer=ISSUER,
options={"require": ["exp", "iss", "sub"]},
)
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {e}",
headers={"WWW-Authenticate": "Bearer"},
)
return KeycloakUser(
sub=payload["sub"],
email=payload.get("email"),
name=payload.get("name"),
roles=payload.get("realm_access", {}).get("roles", []),
)
def require_scope(scope: str):
"""Dependency that checks the token has a required scope."""
async def checker(
user: Annotated[KeycloakUser, Depends(get_keycloak_user)],
) -> KeycloakUser:
# Keycloak puts scopes in the 'scope' claim as a space-separated string
# For role-based access, check realm_access.roles
if scope not in user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required scope: {scope}",
)
return user
return checker
@app.get("/api/courses")
async def get_courses(
user: Annotated[KeycloakUser, Depends(get_keycloak_user)],
):
if "paid-individual" in user.roles:
return {"courses": ["Python Foundation", "Python Intermediate", "AI Systems"]}
return {"courses": ["Python Foundation"]}
Part 7 - "Login with Google" Flow
Implementing Google OAuth is nearly identical to Keycloak because both follow the OIDC standard:
import os
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
# Google OIDC registration
oauth.register(
name="google",
client_id=os.environ["GOOGLE_CLIENT_ID"],
client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
server_metadata_url=(
"https://accounts.google.com/.well-known/openid-configuration"
),
client_kwargs={"scope": "openid profile email"},
)
@app.get("/login/google")
async def login_google(request: Request):
redirect_uri = request.url_for("google_callback")
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/callback/google")
async def google_callback(request: Request):
token = await oauth.google.authorize_access_token(request)
userinfo = token.get("userinfo")
# Google userinfo includes:
# {
# "sub": "1234567890", # Google's unique user ID
# "email_verified": true,
# "name": "John Doe",
# "picture": "https://..."
# }
# Find or create user in your database
user = await find_or_create_user(
provider="google",
provider_id=userinfo["sub"],
email=userinfo["email"],
name=userinfo.get("name"),
)
# Issue your own JWT for subsequent API requests
access_token = issue_access_token(user.id, user.role)
return {"access_token": access_token, "token_type": "bearer"}
Multi-Provider Architecture
Part 8 - Scopes, Consent, and Token Exchange
Scopes
Scopes define what the access token is allowed to do. They are requested during authorization and enforced by the resource server:
# Common OIDC scopes
# openid - Required for OIDC. Returns an ID token
# profile - Access to name, picture, locale
# email - Access to email and email_verified
# offline_access - Returns a refresh token
# Custom scopes for your API
# courses:read - Read course data
# courses:write - Create/modify courses
# certificates:issue - Issue certificates
oauth.register(
name="keycloak",
# ...
client_kwargs={
"scope": "openid profile email courses:read certificates:issue"
},
)
Token Exchange (RFC 8693)
Token exchange allows a service to exchange one token for another with different permissions - useful in microservice architectures:
import httpx
async def exchange_token(
original_token: str,
target_audience: str,
) -> str:
"""Exchange an access token for one scoped to a different service."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{KEYCLOAK_REALM_URL}/protocol/openid-connect/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token": original_token,
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
"audience": target_audience,
"client_id": os.environ["CLIENT_ID"],
"client_secret": os.environ["CLIENT_SECRET"],
},
)
response.raise_for_status()
return response.json()["access_token"]
# Example: API Gateway receives a user token, exchanges it for a
# narrower-scoped token before forwarding to the course service
course_token = await exchange_token(
original_token=user_access_token,
target_audience="course-service",
)
Part 9 - Common OAuth Security Mistakes
Mistake 1: Not Validating the State Parameter
The state parameter prevents CSRF attacks on the callback endpoint:
# VULNERABLE - no state parameter
@app.get("/login")
async def login():
return RedirectResponse(
"https://auth.example.com/authorize"
"?response_type=code&client_id=myapp"
"&redirect_uri=https://myapp.com/callback"
# Missing: &state=random-csrf-token
)
# An attacker can trick a user into visiting a crafted callback URL
# that logs them into the attacker's account
# FIXED - authlib handles state automatically
@app.get("/login")
async def login(request: Request):
redirect_uri = request.url_for("callback")
# authlib generates and validates state automatically
return await oauth.keycloak.authorize_redirect(request, redirect_uri)
Mistake 2: Open Redirect via redirect_uri
# VULNERABLE - accepting redirect_uri from query params
redirect_uri = request.query_params.get("redirect_uri")
# Attacker sets redirect_uri=https://evil.com and steals the auth code
# FIXED - hardcode or whitelist redirect URIs
ALLOWED_REDIRECTS = {"https://engineersofai.com/callback"}
redirect_uri = request.url_for("callback")
if str(redirect_uri) not in ALLOWED_REDIRECTS:
raise HTTPException(status_code=400, detail="Invalid redirect URI")
Mistake 3: Storing Tokens in localStorage
// VULNERABLE - accessible to any XSS attack
localStorage.setItem("access_token", token);
// Any script on the page can read this
# FIXED - use httpOnly cookies (set from the backend)
from fastapi.responses import JSONResponse
response = JSONResponse({"message": "Logged in"})
response.set_cookie(
key="access_token",
value=access_token,
httponly=True, # Not accessible via JavaScript
secure=True, # Only sent over HTTPS
samesite="lax", # CSRF protection
max_age=900, # 15 minutes
)
return response
Never store tokens in localStorage or sessionStorage. They are accessible to any JavaScript running on the page, making them trivially exploitable via XSS. Use httpOnly cookies or keep tokens in memory only (with refresh via httpOnly cookie).
Part 10 - Returning to the Opening Puzzle
The original callback handler had four security issues:
-
Hardcoded client_secret - The secret is visible in source code and version control. It should be loaded from environment variables or a secrets manager.
-
Accepting
redirect_urifrom query parameters - An attacker can setredirect_uri=https://evil.comto intercept the authorization code. The redirect URI must be hardcoded or validated against a whitelist. -
No state parameter validation - Without state validation, the callback is vulnerable to CSRF attacks. An attacker can craft a URL that associates the victim's session with the attacker's account.
-
Returning the access token directly in the response body - The token should be stored in a server-side session or set as an httpOnly cookie, never exposed in a JSON response that could be intercepted or cached.
Key Takeaways
- OAuth 2.0 is for authorization (what can you access); OIDC adds authentication (who are you)
- Always use Authorization Code + PKCE, even for confidential clients
- Implicit grant and password grant are deprecated - do not use them
- Use the state parameter to prevent CSRF on the callback endpoint
- Hardcode or whitelist redirect URIs - never accept them from user input
- Store tokens in httpOnly cookies, not localStorage
- ID tokens prove identity to the client; access tokens authorize API requests - do not confuse them
- Use authlib for clean, well-tested OAuth/OIDC implementation in Python
- Keycloak, Google, and Auth0 all follow the same OIDC standard - switching providers requires minimal code changes
- Use token exchange for microservice-to-microservice authorization
Graded Practice Challenges
Level 1 - Identify the Vulnerability
Question 1: What is the security risk in this authorization URL?
https://auth.example.com/authorize?response_type=code&client_id=myapp
&redirect_uri=https://myapp.com/callback&scope=openid+profile
Answer
Missing the state parameter. Without it, the callback endpoint is vulnerable to CSRF attacks. An attacker can initiate an authorization flow, get the callback URL with their authorization code, and trick the victim into visiting it. The victim's session is then linked to the attacker's account. Also missing code_challenge for PKCE, which protects against authorization code interception.
Question 2: A developer stores the OAuth client_secret in a JavaScript SPA. Why is this wrong?
Answer
JavaScript in the browser is fully visible to the user and any browser extension. The client_secret is not secret in this context - anyone can extract it from the JavaScript source. SPAs are "public clients" and must use PKCE instead of client secrets. The client_secret should only be used by confidential clients (server-side applications) where the source code is not accessible to users.
Question 3: An application accepts both openid and offline_access scopes but never uses the refresh token. What is the risk?
Answer
By requesting offline_access, the application receives a refresh token that can obtain new access tokens indefinitely. If this refresh token is stolen (from server logs, database, or memory), the attacker has persistent access to the user's account until the refresh token is explicitly revoked. Only request offline_access if the application actually implements refresh token rotation and storage. Following the principle of least privilege, request only the scopes you need.
Level 2 - Fix the Vulnerability
This OAuth implementation has multiple security issues. Fix all of them:
from fastapi import FastAPI, Request
import httpx
import json
app = FastAPI()
CLIENT_SECRET = "my-secret-123"
@app.get("/login")
async def login():
return RedirectResponse(
"https://auth.example.com/authorize"
"?response_type=code"
"&client_id=myapp"
f"&redirect_uri=https://myapp.com/callback"
"&scope=openid profile email offline_access"
)
@app.get("/callback")
async def callback(request: Request):
code = request.query_params["code"]
redirect_uri = request.query_params.get("redirect_uri", "https://myapp.com/callback")
async with httpx.AsyncClient() as client:
resp = await client.post("https://auth.example.com/token", data={
"grant_type": "authorization_code",
"code": code,
"client_id": "myapp",
"client_secret": CLIENT_SECRET,
"redirect_uri": redirect_uri,
})
tokens = resp.json()
return tokens # Returns all tokens including refresh token to the browser
Solution
import os
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.middleware.sessions import SessionMiddleware
app = FastAPI()
app.add_middleware(
SessionMiddleware,
secret_key=os.environ["SESSION_SECRET"],
https_only=True,
same_site="lax",
)
oauth = OAuth()
oauth.register(
name="provider",
client_id=os.environ["OAUTH_CLIENT_ID"], # Fix 1: from env
client_secret=os.environ["OAUTH_CLIENT_SECRET"], # Fix 1: from env
server_metadata_url="https://auth.example.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid profile email"}, # Fix 2: no offline_access
)
@app.get("/login")
async def login(request: Request):
redirect_uri = request.url_for("callback")
# Fix 3: authlib handles state + PKCE automatically
return await oauth.provider.authorize_redirect(request, redirect_uri)
@app.get("/callback")
async def callback(request: Request):
# Fix 4: authlib validates state, exchanges code securely
token = await oauth.provider.authorize_access_token(request)
userinfo = token.get("userinfo")
# Fix 5: Store in server-side session, not returned to browser
request.session["user"] = {
"sub": userinfo["sub"],
"email": userinfo.get("email"),
}
# Fix 6: Redirect instead of returning tokens
return RedirectResponse(url="/dashboard")
Fixes: (1) Secrets from environment. (2) Removed unnecessary offline_access scope. (3) State and PKCE handled automatically. (4) Redirect URI is hardcoded, not from query params. (5) Tokens stored server-side. (6) Tokens never returned to the browser.
Level 3 - Design a Secure System
Design an authentication system for EngineersOfAI that supports:
- Login with Keycloak (primary), Google, and GitHub
- Role-based access:
free-user,paid-individual,org-member - A Docusaurus frontend (static SPA) and a FastAPI API backend
- Users who sign up via Google should be auto-created in Keycloak
- Course access is determined by role, stored in Keycloak
Document your architecture including: OAuth flow for each provider, how user accounts are linked across providers, where tokens are stored, and how role-based access control is enforced.
Design Hints
- Architecture: Keycloak acts as the sole identity provider. Google and GitHub are configured as Identity Providers within Keycloak (identity brokering). The frontend and API only talk to Keycloak.
- OAuth flow: The Docusaurus SPA uses Authorization Code + PKCE directly with Keycloak. The API verifies Keycloak-issued JWTs via JWKS.
- Account linking: Keycloak handles this natively. When a user logs in with Google for the first time, Keycloak creates a local user and links the Google identity. If an existing user later adds GitHub, both identities link to the same Keycloak user.
- Token storage: The SPA stores the access token in memory (JavaScript variable). The refresh token is stored in an httpOnly cookie managed by a backend-for-frontend (BFF) pattern, or the SPA uses Keycloak's silent refresh via iframe.
- Role enforcement: Roles (
free-user,paid-individual) are stored in Keycloak and included in the JWT'srealm_access.rolesclaim. The FastAPIrequire_roledependency checks this claim. - Auto-creation: Keycloak's "First Login Flow" for Google/GitHub automatically creates local users and assigns the
free-userrole by default.
What's Next
In the next lesson, Input Validation and Sanitization, you will learn how to treat every user input as potentially malicious - using Pydantic validators as security boundaries to prevent injection attacks, XSS, path traversal, and SSRF.
