Skip to main content

Python OAuth 2 and OIDC Practice Problems & Exercises

Practice: OAuth 2 and OIDC

11 problems4 Easy4 Medium3 Hard60–90 min
← Back to lesson

Easy

#1Build an Authorization URLEasy
oauth2authorization-urlstatescope

Build an OAuth 2.0 authorization URL for the authorization code flow.

from urllib.parse import urlencode, urlparse, parse_qs
import secrets

def build_auth_url(
auth_endpoint: str,
client_id: str,
redirect_uri: str,
scope: str,
state: str,
) -> str:
# return full authorization URL with all required parameters
pass

state = secrets.token_urlsafe(16)
url = build_auth_url(
auth_endpoint="https://auth.example.com/oauth/authorize",
client_id="myapp-client-id",
redirect_uri="https://myapp.com/callback",
scope="openid profile email",
state=state,
)

parsed = urlparse(url)
params = parse_qs(parsed.query)
print(f"URL contains response_type=code: {params.get('response_type') == ['code']}")
print(f"URL contains client_id: {'client_id' in params}")
print(f"URL contains state: {'state' in params}")
print(f"URL contains scope: {'scope' in params}")
Solution
from urllib.parse import urlencode, urlparse, parse_qs
import secrets

def build_auth_url(
auth_endpoint: str,
client_id: str,
redirect_uri: str,
scope: str,
state: str,
) -> str:
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope,
"state": state,
}
return f"{auth_endpoint}?{urlencode(params)}"

state = secrets.token_urlsafe(16)
url = build_auth_url(
auth_endpoint="https://auth.example.com/oauth/authorize",
client_id="myapp-client-id",
redirect_uri="https://myapp.com/callback",
scope="openid profile email",
state=state,
)

parsed = urlparse(url)
params = parse_qs(parsed.query)
print(f"URL contains response_type=code: {params.get('response_type') == ['code']}")
print(f"URL contains client_id: {'client_id' in params}")
print(f"URL contains state: {'state' in params}")
print(f"URL contains scope: {'scope' in params}")

Authorization code flow steps:

  1. App generates random state, stores it in the session.
  2. App redirects user to the authorization URL.
  3. User logs in at the auth server and grants permission.
  4. Auth server redirects back to redirect_uri?code=AUTH_CODE&state=STATE.
  5. App verifies state matches the stored value (CSRF check).
  6. App exchanges code for access_token at the token endpoint (server-to-server).
Expected Output
URL contains response_type=code: True
URL contains client_id: True
URL contains state: True
URL contains scope: True
Hints

Hint 1: The authorization URL is the auth server endpoint with query parameters: response_type, client_id, redirect_uri, scope, state.

Hint 2: Use urllib.parse.urlencode() to build the query string.

Hint 3: The state parameter is a random value you generate — you store it in the session and verify it when the callback arrives.

#2State Parameter CSRF VerificationEasy
oauth2statecsrfsecurity

Implement state parameter CSRF protection for an OAuth 2.0 callback handler.

import secrets

def start_oauth_flow(session: dict) -> str:
"""Generate and store state; return authorization URL."""
state = secrets.token_urlsafe(32)
session["oauth_state"] = state
return f"https://auth.example.com/oauth/authorize?state={state}&response_type=code&client_id=app1"

def handle_callback(session: dict, callback_params: dict) -> str:
"""Verify state and return authorization code, or raise ValueError."""
pass

session = {}
auth_url = start_oauth_flow(session)

# Simulate valid callback
valid_state = session["oauth_state"]
print(f"Valid callback: {handle_callback(session, {'state': valid_state, 'code': 'AUTH123'}) == 'AUTH123'}")

# Simulate missing state
try:
handle_callback(session, {"code": "AUTH123"})
except ValueError as e:
print(f"Missing state: CSRF check failed: {e}")

# Simulate mismatched state
try:
handle_callback(session, {"state": "attacker-state", "code": "AUTH123"})
except ValueError as e:
print(f"Mismatched state: CSRF check failed: {e}")
Solution
import secrets
import hmac

def start_oauth_flow(session: dict) -> str:
state = secrets.token_urlsafe(32)
session["oauth_state"] = state
return f"https://auth.example.com/oauth/authorize?state={state}&response_type=code&client_id=app1"

def handle_callback(session: dict, callback_params: dict) -> str:
stored_state = session.get("oauth_state")
returned_state = callback_params.get("state")

if not returned_state:
raise ValueError("missing state")
if not stored_state:
raise ValueError("no state in session")
if not hmac.compare_digest(stored_state, returned_state):
raise ValueError("state mismatch")

# Clear state — single use
del session["oauth_state"]

code = callback_params.get("code")
if not code:
raise ValueError("missing authorization code")
return code

session = {}
auth_url = start_oauth_flow(session)

valid_state = session["oauth_state"]
print(f"Valid callback: {handle_callback(session, {'state': valid_state, 'code': 'AUTH123'}) == 'AUTH123'}")

session2 = {}
start_oauth_flow(session2)
try:
handle_callback(session2, {"code": "AUTH123"})
except ValueError as e:
print(f"Missing state: CSRF check failed: {e}")

session3 = {}
start_oauth_flow(session3)
try:
handle_callback(session3, {"state": "attacker-state", "code": "AUTH123"})
except ValueError as e:
print(f"Mismatched state: CSRF check failed: {e}")

Why CSRF matters in OAuth: Without state verification, an attacker can trick a victim into completing an OAuth flow that logs the victim into the attacker's account. The attacker initiates the flow, gets the authorization URL, then tricks the victim into visiting the callback URL with the attacker's authorization code — the victim's session gets bound to the attacker's account. The state parameter, tied to the victim's session, prevents this.

Expected Output
Valid callback: True
Missing state: CSRF check failed: missing state
Mismatched state: CSRF check failed: state mismatch
Hints

Hint 1: Store the generated state in the session before redirecting. On callback, compare the returned state with the stored value.

Hint 2: The state must be cryptographically random — use secrets.token_urlsafe().

Hint 3: Never proceed with the token exchange if the state is missing or does not match.

#3Scope Parser and EnforcerEasy
oauth2scopesauthorizationaccess-control

Implement a scope parser and has_scope(granted_scopes, required) checker.

def parse_scopes(scope_string: str) -> set:
# parse space-separated scope string into a set
pass

def has_scope(granted_scope_string: str, required: str) -> bool:
# check if a single required scope is in granted scopes
pass

def has_all_scopes(granted_scope_string: str, required: list) -> bool:
# check if all required scopes are granted
pass

granted = "read:profile write:posts read:feed"

print(f"read:profile granted: {has_scope(granted, 'read:profile')}")
print(f"write:posts granted: {has_scope(granted, 'write:posts')}")
print(f"delete:users granted: {has_scope(granted, 'delete:users')}")
print(f"Multiple scopes check: {has_all_scopes(granted, ['read:profile', 'write:posts'])}")
print(f"Empty scope check: {has_all_scopes(granted, ['read:profile', 'admin'])}")
Solution
def parse_scopes(scope_string: str) -> set:
if not scope_string:
return set()
return set(scope_string.strip().split())

def has_scope(granted_scope_string: str, required: str) -> bool:
return required in parse_scopes(granted_scope_string)

def has_all_scopes(granted_scope_string: str, required: list) -> bool:
granted = parse_scopes(granted_scope_string)
return set(required).issubset(granted)

granted = "read:profile write:posts read:feed"

print(f"read:profile granted: {has_scope(granted, 'read:profile')}")
print(f"write:posts granted: {has_scope(granted, 'write:posts')}")
print(f"delete:users granted: {has_scope(granted, 'delete:users')}")
print(f"Multiple scopes check: {has_all_scopes(granted, ['read:profile', 'write:posts'])}")
print(f"Empty scope check: {has_all_scopes(granted, ['read:profile', 'admin'])}")

Scope design best practices:

  • Use resource:action format: read:profile, write:posts, admin:users.
  • Request the minimum scopes needed — principle of least privilege.
  • Scopes in the access token are set at issuance time — they cannot be expanded without re-authorization.
  • Check scopes on every API call, not just at login — tokens can be reused across multiple endpoints.
Expected Output
read:profile granted: True
write:posts granted: True
delete:users granted: False
Multiple scopes check: True
Empty scope check: False
Hints

Hint 1: OAuth scopes are space-separated strings. Parse into a set for O(1) membership checks.

Hint 2: To require multiple scopes, check that all required scopes are a subset of granted scopes.

Hint 3: Use set.issubset() for multi-scope checks.

#4Client Credentials Flow SimulatorEasy
oauth2client-credentialsservice-to-servicetoken

Implement a client credentials token request and response parser (simulated without real HTTP).

import secrets
import time

def simulate_token_endpoint(
client_id: str,
client_secret: str,
grant_type: str,
scope: str,
registered_clients: dict,
) -> dict:
"""Simulates an OAuth2 token endpoint for client credentials flow."""
if grant_type != "client_credentials":
return {"error": "unsupported_grant_type"}
client = registered_clients.get(client_id)
if not client or client["secret"] != client_secret:
return {"error": "invalid_client"}
allowed_scopes = set(scope.split()) & set(client["scopes"])
return {
"access_token": secrets.token_urlsafe(32),
"token_type": "bearer",
"expires_in": 3600,
"scope": " ".join(allowed_scopes),
}

CLIENTS = {
"worker-service": {"secret": "super-secret-worker", "scopes": ["read:data", "write:events"]},
}

response = simulate_token_endpoint(
client_id="worker-service",
client_secret="super-secret-worker",
grant_type="client_credentials",
scope="read:data write:events",
registered_clients=CLIENTS,
)

print(f"Token type: {response.get('token_type')}")
print(f"Has access_token: {'access_token' in response}")
print(f"Expires in seconds: {response.get('expires_in')}")
print(f"Scope matches: {set(response.get('scope', '').split()) == {'read:data', 'write:events'}}")
Solution
import secrets
import time

def simulate_token_endpoint(
client_id: str,
client_secret: str,
grant_type: str,
scope: str,
registered_clients: dict,
) -> dict:
if grant_type != "client_credentials":
return {"error": "unsupported_grant_type"}
client = registered_clients.get(client_id)
if not client or client["secret"] != client_secret:
return {"error": "invalid_client"}
allowed_scopes = set(scope.split()) & set(client["scopes"])
return {
"access_token": secrets.token_urlsafe(32),
"token_type": "bearer",
"expires_in": 3600,
"scope": " ".join(allowed_scopes),
}

CLIENTS = {
"worker-service": {"secret": "super-secret-worker", "scopes": ["read:data", "write:events"]},
}

response = simulate_token_endpoint(
client_id="worker-service",
client_secret="super-secret-worker",
grant_type="client_credentials",
scope="read:data write:events",
registered_clients=CLIENTS,
)

print(f"Token type: {response.get('token_type')}")
print(f"Has access_token: {'access_token' in response}")
print(f"Expires in seconds: {response.get('expires_in')}")
print(f"Scope matches: {set(response.get('scope', '').split()) == {'read:data', 'write:events'}}")

Client credentials vs authorization code:

  • Client credentials: Machine-to-machine. No user. Client authenticates with its own credentials. Used for background jobs, microservices, and API integrations.
  • Authorization code: User-delegated access. User approves what scopes the client gets. Used when the application acts on behalf of a user.
  • Client credentials tokens should be short-lived (1 hour) — the client can always request a new one. No refresh token is needed or issued.
Expected Output
Token type: bearer
Has access_token: True
Expires in seconds: 3600
Scope matches: True
Hints

Hint 1: Client credentials flow: client sends client_id + client_secret + grant_type=client_credentials to the token endpoint.

Hint 2: The auth server returns an access_token (no refresh token — the client can always get a new one).

Hint 3: This flow is for machine-to-machine communication — no user is involved.


Medium

#5PKCE Code Verifier and ChallengeMedium
oauth2pkcecode-verifiercode-challengesha256

Implement PKCE (Proof Key for Code Exchange) code verifier and challenge generation and verification.

import secrets
import hashlib
import base64

def generate_code_verifier(length: int = 128) -> str:
# generate a cryptographically random verifier (unreserved chars)
pass

def generate_code_challenge(verifier: str) -> str:
# return BASE64URL(SHA256(verifier)) with S256 method
pass

def verify_pkce(verifier: str, challenge: str) -> bool:
# verify that challenge == BASE64URL(SHA256(verifier))
pass

verifier = generate_code_verifier(128)
challenge = generate_code_challenge(verifier)

print(f"Code verifier length: {len(verifier)}")
print(f"Code challenge is base64url SHA256 of verifier: {verify_pkce(verifier, challenge)}")
print(f"Challenge is URL-safe (no +/=): {'+' not in challenge and '/' not in challenge and '=' not in challenge}")
print(f"Verification passes: {verify_pkce(verifier, challenge)}")
Solution
import secrets
import hashlib
import base64
import re

def generate_code_verifier(length: int = 128) -> str:
# Use URL-safe base64 then trim to desired length — all chars are unreserved
raw = secrets.token_urlsafe(length)
return raw[:length]

def generate_code_challenge(verifier: str) -> str:
digest = hashlib.sha256(verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")

def verify_pkce(verifier: str, challenge: str) -> bool:
expected = generate_code_challenge(verifier)
# Use compare_digest even here for consistency
import hmac
return hmac.compare_digest(expected, challenge)

verifier = generate_code_verifier(128)
challenge = generate_code_challenge(verifier)

print(f"Code verifier length: {len(verifier)}")
print(f"Code challenge is base64url SHA256 of verifier: {verify_pkce(verifier, challenge)}")
print(f"Challenge is URL-safe (no +/=): {'+' not in challenge and '/' not in challenge and '=' not in challenge}")
print(f"Verification passes: {verify_pkce(verifier, challenge)}")

Why PKCE exists: In the original authorization code flow, a stolen authorization code could be exchanged for an access token. Mobile and SPA apps cannot keep a client_secret truly secret. PKCE solves this: the app generates a code_verifier before the flow and sends only its hash (code_challenge) in the authorization request. The auth server stores the challenge. At token exchange, the app sends the original code_verifier — the auth server recomputes the hash and verifies it matches. A stolen code cannot be exchanged without the original verifier.

Expected Output
Code verifier length: 128
Code challenge is base64url SHA256 of verifier: True
Challenge is URL-safe (no +/=): True
Verification passes: True
Hints

Hint 1: PKCE code_verifier is a random string of 43-128 characters from the unreserved character set.

Hint 2: code_challenge = BASE64URL(SHA256(code_verifier)) — using the S256 method.

Hint 3: The code_verifier is sent with the token exchange request. The auth server recomputes the challenge and compares.

#6Token Introspection EndpointMedium
oauth2token-introspectionrfc7662active

Implement a token introspection endpoint (RFC 7662) simulator.

import time
import secrets

# Simulated token store (in production this would be a database or cache)
TOKEN_STORE = {}

def issue_token(user_id: str, scope: str, ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
TOKEN_STORE[token] = {
"sub": user_id,
"scope": scope,
"iat": int(time.time()),
"exp": int(time.time()) + ttl,
"client_id": "myapp",
}
return token

def introspect(token: str) -> dict:
"""RFC 7662 token introspection response."""
pass

active_token = issue_token("user42", "read:profile write:posts", ttl=3600)
expired_token = issue_token("user99", "read:profile", ttl=-60)

r1 = introspect(active_token)
r2 = introspect(expired_token)
r3 = introspect("nonexistent-token")

print(f"Active token: active={r1['active']}, sub={r1.get('sub')}, scope={r1.get('scope')}")
print(f"Expired token: active={r2['active']}")
print(f"Unknown token: active={r3['active']}")
Solution
import time
import secrets

TOKEN_STORE = {}

def issue_token(user_id: str, scope: str, ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
TOKEN_STORE[token] = {
"sub": user_id,
"scope": scope,
"iat": int(time.time()),
"exp": int(time.time()) + ttl,
"client_id": "myapp",
}
return token

def introspect(token: str) -> dict:
entry = TOKEN_STORE.get(token)
if entry is None:
return {"active": False}
if time.time() > entry["exp"]:
return {"active": False}
return {
"active": True,
"sub": entry["sub"],
"scope": entry["scope"],
"client_id": entry["client_id"],
"iat": entry["iat"],
"exp": entry["exp"],
"token_type": "Bearer",
}

active_token = issue_token("user42", "read:profile write:posts", ttl=3600)
expired_token = issue_token("user99", "read:profile", ttl=-60)

r1 = introspect(active_token)
r2 = introspect(expired_token)
r3 = introspect("nonexistent-token")

print(f"Active token: active={r1['active']}, sub={r1.get('sub')}, scope={r1.get('scope')}")
print(f"Expired token: active={r2['active']}")
print(f"Unknown token: active={r3['active']}")

Self-contained JWT vs opaque token introspection:

  • Self-contained JWT: Resource server validates the signature locally — no network call to auth server. Fast, but tokens cannot be revoked until they expire.
  • Opaque token + introspection: Resource server calls the auth server on every request — slower, but supports instant revocation.
  • Many systems use short-lived JWTs (15 min) for performance and accept they can't be revoked before expiry. Revocation is handled by the refresh token layer.
Expected Output
Active token: active=True, sub=user42, scope=read:profile
Expired token: active=False
Unknown token: active=False
Hints

Hint 1: RFC 7662 defines token introspection: POST to /introspect with the token, returns {"active": true/false, ...claims}.

Hint 2: An inactive token returns only {"active": false} — do not include claims for invalid tokens.

Hint 3: The introspection endpoint is protected — only registered resource servers should call it.

#7OIDC ID Token ValidationMedium
oidcid-tokennonceclaimsopenid-connect

Implement OIDC ID token validation with nonce verification and required claim checks.

import base64, hashlib, hmac, json, time, secrets

class JWTError(Exception):
pass

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

def create_id_token(claims: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(claims, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def validate_id_token(
token: str,
secret: bytes,
expected_iss: str,
expected_aud: str,
expected_nonce: str,
required_claims: list = None,
) -> dict:
pass

SECRET = b"oidc-secret"
nonce = secrets.token_urlsafe(16)
now = int(time.time())

valid_claims = {
"iss": "https://auth.example.com",
"sub": "user42",
"aud": "myapp-client-id",
"exp": now + 300,
"iat": now,
"nonce": nonce,
"email": "[email protected]",
"name": "Alice",
}

valid_token = create_id_token(valid_claims, SECRET)
no_nonce_token = create_id_token({k: v for k, v in valid_claims.items() if k != "nonce"}, SECRET)
wrong_nonce_token = create_id_token({**valid_claims, "nonce": "wrong"}, SECRET)
no_email_token = create_id_token({k: v for k, v in valid_claims.items() if k != "email"}, SECRET)

result = validate_id_token(valid_token, SECRET, "https://auth.example.com", "myapp-client-id", nonce, ["email"])
print(f"Valid ID token: sub={result['sub']}")

for tok, label, extra_nonce in [
(no_nonce_token, "Missing nonce", nonce),
(wrong_nonce_token, "Wrong nonce", nonce),
(no_email_token, "Missing email scope claim", nonce),
]:
try:
validate_id_token(tok, SECRET, "https://auth.example.com", "myapp-client-id", extra_nonce, ["email"])
except JWTError as e:
print(f"{label}: JWTError: {e}")
Solution
import base64, hashlib, hmac, json, time, secrets

class JWTError(Exception):
pass

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

def create_id_token(claims: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(claims, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def validate_id_token(
token: str,
secret: bytes,
expected_iss: str,
expected_aud: str,
expected_nonce: str,
required_claims: list = None,
) -> dict:
try:
h_enc, p_enc, sig_enc = token.split(".")
except ValueError:
raise JWTError("malformed token")

expected_sig = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected_sig, b64url_decode(sig_enc)):
raise JWTError("invalid signature")

claims = json.loads(b64url_decode(p_enc))

if time.time() > claims.get("exp", 0):
raise JWTError("token expired")
if claims.get("iss") != expected_iss:
raise JWTError("invalid issuer")
if claims.get("aud") != expected_aud:
raise JWTError("invalid audience")

nonce = claims.get("nonce")
if not nonce:
raise JWTError("missing nonce")
if not hmac.compare_digest(nonce, expected_nonce):
raise JWTError("nonce mismatch")

if required_claims:
for claim in required_claims:
if claim not in claims:
raise JWTError(f"missing required claim: {claim}")

return claims

SECRET = b"oidc-secret"
nonce = secrets.token_urlsafe(16)
now = int(time.time())

valid_claims = {
"iss": "https://auth.example.com",
"sub": "user42",
"aud": "myapp-client-id",
"exp": now + 300,
"iat": now,
"nonce": nonce,
"email": "[email protected]",
"name": "Alice",
}

valid_token = create_id_token(valid_claims, SECRET)
no_nonce_token = create_id_token({k: v for k, v in valid_claims.items() if k != "nonce"}, SECRET)
wrong_nonce_token = create_id_token({**valid_claims, "nonce": "wrong"}, SECRET)
no_email_token = create_id_token({k: v for k, v in valid_claims.items() if k != "email"}, SECRET)

result = validate_id_token(valid_token, SECRET, "https://auth.example.com", "myapp-client-id", nonce, ["email"])
print(f"Valid ID token: sub={result['sub']}")

for tok, label, extra_nonce in [
(no_nonce_token, "Missing nonce", nonce),
(wrong_nonce_token, "Wrong nonce", nonce),
(no_email_token, "Missing email scope claim", nonce),
]:
try:
validate_id_token(tok, SECRET, "https://auth.example.com", "myapp-client-id", extra_nonce, ["email"])
except JWTError as e:
print(f"{label}: JWTError: {e}")

OIDC vs OAuth2: OAuth2 is an authorization protocol — it grants access. OIDC is an identity layer on top of OAuth2 — it authenticates users and returns an ID token. The ID token is a signed JWT containing user identity claims. The access token is used to call APIs. Never use an access token as proof of identity — use the ID token or the userinfo endpoint.

Expected Output
Valid ID token: sub=user42
Missing nonce: JWTError: missing nonce
Wrong nonce: JWTError: nonce mismatch
Missing email scope claim: JWTError: missing required claim: email
Hints

Hint 1: An OIDC ID token is a JWT with additional required claims: iss, sub, aud, exp, iat, and nonce.

Hint 2: The nonce claim prevents replay attacks — generate a fresh nonce for each auth request and verify it in the ID token.

Hint 3: The "email" claim is only present if the "email" scope was requested.

#8Token Exchange with Scope DowngradeMedium
oauth2token-exchangerfc8693scope-downgrade

Implement an OAuth 2.0 token exchange endpoint (RFC 8693) that supports scope downgrading.

import secrets

def token_exchange(
subject_token: str,
subject_token_type: str,
requested_scope: str,
token_store: dict,
) -> dict:
"""
Exchange a token for a new token with optionally narrower scopes.
Raises ValueError if requested scopes exceed original scopes.
"""
pass

TOKENS = {
"original-token-abc": {
"sub": "user42",
"scope": "read:profile write:posts admin:read",
"exp": 9999999999,
}
}

result = token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile",
token_store=TOKENS,
)
print(f"Exchange returns new token: {'access_token' in result}")
print(f"Downgraded scope: {result.get('scope')}")

try:
token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile delete:all",
token_store=TOKENS,
)
except ValueError as e:
print(f"Cannot upgrade scope: ValueError: {e}")
Solution
import secrets

def token_exchange(
subject_token: str,
subject_token_type: str,
requested_scope: str,
token_store: dict,
) -> dict:
entry = token_store.get(subject_token)
if not entry:
raise ValueError("invalid subject token")

original_scopes = set(entry["scope"].split())
requested_scopes = set(requested_scope.split())

if not requested_scopes.issubset(original_scopes):
raise ValueError("requested scopes exceed granted scopes")

new_token = secrets.token_urlsafe(32)
token_store[new_token] = {
"sub": entry["sub"],
"scope": " ".join(requested_scopes),
"exp": entry["exp"],
}
return {
"access_token": new_token,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"token_type": "Bearer",
"scope": " ".join(requested_scopes),
}

TOKENS = {
"original-token-abc": {
"sub": "user42",
"scope": "read:profile write:posts admin:read",
"exp": 9999999999,
}
}

result = token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile",
token_store=TOKENS,
)
print(f"Exchange returns new token: {'access_token' in result}")
print(f"Downgraded scope: {result.get('scope')}")

try:
token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile delete:all",
token_store=TOKENS,
)
except ValueError as e:
print(f"Cannot upgrade scope: ValueError: {e}")

Token exchange use cases:

  • A front-end service exchanges a user's token for a narrower-scoped token before passing it to a microservice (principle of least privilege in service meshes).
  • Impersonation: an admin system exchanges for a token scoped to a specific user.
  • Delegation: a background job gets a token bound to a specific task scope.
  • RFC 8693 is now supported by Keycloak, Auth0, and most enterprise identity providers.
Expected Output
Exchange returns new token: True
Downgraded scope: read:profile
Cannot upgrade scope: ValueError: requested scopes exceed granted scopes
Hints

Hint 1: RFC 8693 token exchange allows a service to get a new token on behalf of a user, with potentially narrower scopes.

Hint 2: The exchanged token must have scopes that are a strict subset of the original token scopes.

Hint 3: Scope upgrade (requesting more than the original) must be rejected.


Hard

#9Full Authorization Code + PKCE FlowHard
oauth2authorization-codepkcefull-flow

Implement the full OAuth 2.0 authorization code + PKCE flow end to end.

import hashlib, base64, secrets, time

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

# Shared state (simulates auth server database)
AUTH_CODES = {} # code -> {client_id, user_id, scope, code_challenge, used, expires}
TOKENS = {} # token -> {sub, scope, expires}

def generate_pkce():
"""Returns (code_verifier, code_challenge)"""
verifier = secrets.token_urlsafe(96)[:128]
challenge = b64url_encode(hashlib.sha256(verifier.encode()).digest())
return verifier, challenge

def authorization_endpoint(
client_id: str,
redirect_uri: str,
scope: str,
code_challenge: str,
code_challenge_method: str,
state: str,
# Simulated: user approves
user_id: str = "user42",
) -> str:
"""Simulates user approving the request; returns redirect URL with code."""
pass

def token_endpoint(
client_id: str,
code: str,
redirect_uri: str,
code_verifier: str,
) -> dict:
"""Exchange authorization code for access token, verifying PKCE."""
pass

# Full flow
verifier, challenge = generate_pkce()
state = secrets.token_urlsafe(16)
print("Step 1 - Auth URL built with code_challenge")

redirect = authorization_endpoint(
client_id="myapp",
redirect_uri="https://myapp.com/callback",
scope="openid profile",
code_challenge=challenge,
code_challenge_method="S256",
state=state,
)
from urllib.parse import urlparse, parse_qs
params = parse_qs(urlparse(redirect).query)
code = params["code"][0]
print(f"Step 2 - Authorization code issued")

token_response = token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
print(f"Step 3 - Token issued: has access_token={('access_token' in token_response)}")

try:
token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
except ValueError as e:
print(f"Step 4 - Code replay rejected: ValueError: {e}")
Solution
import hashlib, base64, secrets, time
from urllib.parse import urlparse, parse_qs, urlencode
import hmac

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

AUTH_CODES = {}
TOKENS = {}

def generate_pkce():
verifier = secrets.token_urlsafe(96)[:128]
challenge = b64url_encode(hashlib.sha256(verifier.encode()).digest())
return verifier, challenge

def authorization_endpoint(
client_id, redirect_uri, scope,
code_challenge, code_challenge_method,
state, user_id="user42",
) -> str:
code = secrets.token_urlsafe(32)
AUTH_CODES[code] = {
"client_id": client_id,
"user_id": user_id,
"scope": scope,
"redirect_uri": redirect_uri,
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"used": False,
"expires": time.time() + 60,
}
params = urlencode({"code": code, "state": state})
return f"{redirect_uri}?{params}"

def token_endpoint(client_id, code, redirect_uri, code_verifier) -> dict:
entry = AUTH_CODES.get(code)
if not entry:
raise ValueError("invalid code")
if entry["used"]:
raise ValueError("code already used")
if time.time() > entry["expires"]:
raise ValueError("code expired")
if entry["client_id"] != client_id:
raise ValueError("client_id mismatch")
if entry["redirect_uri"] != redirect_uri:
raise ValueError("redirect_uri mismatch")

# Verify PKCE
computed = b64url_encode(hashlib.sha256(code_verifier.encode()).digest())
if not hmac.compare_digest(computed, entry["code_challenge"]):
raise ValueError("code_verifier mismatch")

entry["used"] = True
access_token = secrets.token_urlsafe(32)
TOKENS[access_token] = {"sub": entry["user_id"], "scope": entry["scope"]}
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": entry["scope"],
}

verifier, challenge = generate_pkce()
state = secrets.token_urlsafe(16)
print("Step 1 - Auth URL built with code_challenge")

redirect = authorization_endpoint(
client_id="myapp",
redirect_uri="https://myapp.com/callback",
scope="openid profile",
code_challenge=challenge,
code_challenge_method="S256",
state=state,
)
params = parse_qs(urlparse(redirect).query)
code = params["code"][0]
print(f"Step 2 - Authorization code issued")

token_response = token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
print(f"Step 3 - Token issued: has access_token={('access_token' in token_response)}")

try:
token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
except ValueError as e:
print(f"Step 4 - Code replay rejected: ValueError: {e}")

Authorization code security properties:

  • Single-use codes prevent replay attacks.
  • Short TTL (60 seconds) limits the replay window.
  • PKCE binds the code to the client that initiated the flow — a stolen code is useless without the verifier.
  • redirect_uri binding prevents open redirect + code theft combinations.
Expected Output
Step 1 - Auth URL built with code_challenge
Step 2 - Authorization code issued
Step 3 - Token issued: has access_token=True
Step 4 - Code replay rejected: ValueError: code already used
Hints

Hint 1: Build the complete flow: generate verifier/challenge, build auth URL, simulate user approval, exchange code for token.

Hint 2: The authorization code is single-use — reject second uses.

Hint 3: At token exchange, recompute the code challenge from the submitted verifier and compare with the stored challenge.

#10Implicit Flow Deprecation AnalysisHard
oauth2implicit-flowdeprecatedsecurity-analysis

Analyze the implicit flow's security weaknesses and demonstrate why it was deprecated in OAuth 2.1.

from urllib.parse import urlparse, parse_qs, urlencode

def implicit_flow_redirect(
client_id: str,
redirect_uri: str,
scope: str,
access_token: str,
) -> str:
"""Simulate auth server redirect in implicit flow — token in URL fragment."""
params = urlencode({"access_token": access_token, "token_type": "bearer", "expires_in": 3600})
return f"{redirect_uri}#{params}" # Note: fragment (#), not query (?)

def analyze_implicit_flow_risks(redirect_url: str) -> dict:
"""Return a security analysis dict for the implicit flow redirect."""
pass

import secrets
redirect = implicit_flow_redirect(
client_id="spa-app",
redirect_uri="https://myapp.com/callback",
scope="read:profile",
access_token=secrets.token_urlsafe(32),
)

analysis = analyze_implicit_flow_risks(redirect)
print(f"Implicit flow token in fragment: {analysis['token_in_fragment']}")
print(f"Token visible in browser history: {analysis['browser_history_risk']} (security risk)")
print(f"No PKCE protection: {analysis['no_pkce']}")
print(f"Recommendation: {analysis['recommendation']}")
Solution
from urllib.parse import urlparse, parse_qs, urlencode, unquote
import secrets

def implicit_flow_redirect(client_id, redirect_uri, scope, access_token) -> str:
params = urlencode({"access_token": access_token, "token_type": "bearer", "expires_in": 3600})
return f"{redirect_uri}#{params}"

def analyze_implicit_flow_risks(redirect_url: str) -> dict:
parsed = urlparse(redirect_url)
fragment = parsed.fragment
fragment_params = parse_qs(fragment)

token_in_fragment = "access_token" in fragment_params
return {
"token_in_fragment": token_in_fragment,
"browser_history_risk": token_in_fragment, # fragments stored in history
"referer_leak_risk": token_in_fragment, # fragments can leak via Referer
"extension_risk": token_in_fragment, # browser extensions can read location.hash
"no_pkce": True, # implicit flow has no code exchange
"no_refresh_token": True, # implicit flow cannot issue refresh tokens
"recommendation": "use authorization code + PKCE instead",
}

redirect = implicit_flow_redirect(
client_id="spa-app",
redirect_uri="https://myapp.com/callback",
scope="read:profile",
access_token=secrets.token_urlsafe(32),
)

analysis = analyze_implicit_flow_risks(redirect)
print(f"Implicit flow token in fragment: {analysis['token_in_fragment']}")
print(f"Token visible in browser history: {analysis['browser_history_risk']} (security risk)")
print(f"No PKCE protection: {analysis['no_pkce']}")
print(f"Recommendation: {analysis['recommendation']}")

Why implicit flow was deprecated:

  • Token in URL fragment is stored in browser history — extractable by malicious browser extensions.
  • Fragment can leak via the Referer header if the page loads third-party resources.
  • No code_challenge mechanism — a stolen token cannot be invalidated without expiry.
  • OAuth 2.1 (draft) formally removes implicit flow. All SPAs should use authorization code + PKCE — modern browsers support CORS properly so the back-channel token exchange is not a problem.
Expected Output
Implicit flow token in fragment: True
Token visible in browser history: True (security risk)
No PKCE protection: True
Recommendation: use authorization code + PKCE instead
Hints

Hint 1: The implicit flow returns the access token directly in the URL fragment (#access_token=...) after user approval.

Hint 2: URL fragments are stored in browser history, can be logged by browser extensions, and leaked via Referer headers.

Hint 3: OAuth 2.1 removes implicit flow entirely — it is superseded by authorization code + PKCE for SPAs.

#11Dynamic Client Registration Security CheckHard
oauth2dynamic-registrationrfc7591redirect-uri-validation

Implement a secure dynamic client registration endpoint with redirect URI validation.

import secrets
import re

def register_client(
client_name: str,
redirect_uris: list,
grant_types: list,
allow_localhost: bool = False,
) -> dict:
"""
Register an OAuth2 client. Validates redirect_uris strictly.
Raises ValueError for invalid configurations.
"""
pass

# Valid registration
result = register_client(
client_name="My SPA",
redirect_uris=["https://myapp.com/callback"],
grant_types=["authorization_code"],
)
print(f"Valid registration: client_id assigned")

# Invalid cases
for uris, label in [
(["http://localhost:3000/callback"], "Localhost redirect blocked"),
(["https://myapp.com/*"], "Wildcard redirect blocked"),
(["http://myapp.com/callback"], "HTTP redirect blocked"),
]:
try:
register_client("Test App", uris, ["authorization_code"])
except ValueError as e:
print(f"{label}: ValueError: {e}")
Solution
import secrets
import re
from urllib.parse import urlparse

def register_client(
client_name: str,
redirect_uris: list,
grant_types: list,
allow_localhost: bool = False,
) -> dict:
allowed_grant_types = {"authorization_code", "client_credentials", "refresh_token"}
for gt in grant_types:
if gt not in allowed_grant_types:
raise ValueError(f"unsupported grant_type: {gt}")

for uri in redirect_uris:
parsed = urlparse(uri)

if "*" in uri:
raise ValueError("wildcard redirect_uri not allowed")

hostname = parsed.hostname or ""
if hostname in ("localhost", "127.0.0.1", "::1") and not allow_localhost:
raise ValueError("localhost redirect_uri not allowed in production")

if parsed.scheme != "https" and not (allow_localhost and hostname == "localhost"):
raise ValueError("https required for redirect_uri")

client_id = secrets.token_urlsafe(16)
client_secret = secrets.token_urlsafe(32)
return {
"client_id": client_id,
"client_secret": client_secret,
"client_name": client_name,
"redirect_uris": redirect_uris,
"grant_types": grant_types,
"token_endpoint_auth_method": "client_secret_basic",
}

result = register_client(
client_name="My SPA",
redirect_uris=["https://myapp.com/callback"],
grant_types=["authorization_code"],
)
print(f"Valid registration: client_id assigned")

for uris, label in [
(["http://localhost:3000/callback"], "Localhost redirect blocked"),
(["https://myapp.com/*"], "Wildcard redirect blocked"),
(["http://myapp.com/callback"], "HTTP redirect blocked"),
]:
try:
register_client("Test App", uris, ["authorization_code"])
except ValueError as e:
print(f"{label}: ValueError: {e}")

redirect_uri security: An open redirector vulnerability in OAuth allows an attacker to register a client with a redirect_uri pointing to their server, then trick a user into authorizing it — the authorization code gets sent to the attacker. Strict redirect_uri validation (exact match, no wildcards, https-only in production) is the primary defense. RFC 6749 requires exact string matching of redirect_uri at both registration and authorization time.

Expected Output
Valid registration: client_id assigned
Localhost redirect blocked: ValueError: localhost redirect_uri not allowed in production
Wildcard redirect blocked: ValueError: wildcard redirect_uri not allowed
HTTP redirect blocked: ValueError: https required for redirect_uri
Hints

Hint 1: RFC 7591 defines dynamic client registration — clients can register themselves programmatically.

Hint 2: redirect_uri validation is critical: reject localhost (except in dev), wildcards, and http:// URIs.

Hint 3: A malicious redirect_uri can exfiltrate authorization codes to an attacker-controlled server.

© 2026 EngineersOfAI. All rights reserved.