Python OAuth 2 and OIDC Practice Problems & Exercises
Practice: OAuth 2 and OIDC
← Back to lessonEasy
Build an OAuth 2.0 authorization URL for the authorization code flow.
from urllib.parse import urlencode, urlparse, parse_qs
import secrets
def build_auth_url(
auth_endpoint: str,
client_id: str,
redirect_uri: str,
scope: str,
state: str,
) -> str:
# return full authorization URL with all required parameters
pass
state = secrets.token_urlsafe(16)
url = build_auth_url(
auth_endpoint="https://auth.example.com/oauth/authorize",
client_id="myapp-client-id",
redirect_uri="https://myapp.com/callback",
scope="openid profile email",
state=state,
)
parsed = urlparse(url)
params = parse_qs(parsed.query)
print(f"URL contains response_type=code: {params.get('response_type') == ['code']}")
print(f"URL contains client_id: {'client_id' in params}")
print(f"URL contains state: {'state' in params}")
print(f"URL contains scope: {'scope' in params}")
Solution
from urllib.parse import urlencode, urlparse, parse_qs
import secrets
def build_auth_url(
auth_endpoint: str,
client_id: str,
redirect_uri: str,
scope: str,
state: str,
) -> str:
params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": scope,
"state": state,
}
return f"{auth_endpoint}?{urlencode(params)}"
state = secrets.token_urlsafe(16)
url = build_auth_url(
auth_endpoint="https://auth.example.com/oauth/authorize",
client_id="myapp-client-id",
redirect_uri="https://myapp.com/callback",
scope="openid profile email",
state=state,
)
parsed = urlparse(url)
params = parse_qs(parsed.query)
print(f"URL contains response_type=code: {params.get('response_type') == ['code']}")
print(f"URL contains client_id: {'client_id' in params}")
print(f"URL contains state: {'state' in params}")
print(f"URL contains scope: {'scope' in params}")
Authorization code flow steps:
- App generates random
state, stores it in the session. - App redirects user to the authorization URL.
- User logs in at the auth server and grants permission.
- Auth server redirects back to
redirect_uri?code=AUTH_CODE&state=STATE. - App verifies
statematches the stored value (CSRF check). - App exchanges
codeforaccess_tokenat the token endpoint (server-to-server).
Expected Output
URL contains response_type=code: True
URL contains client_id: True
URL contains state: True
URL contains scope: TrueHints
Hint 1: The authorization URL is the auth server endpoint with query parameters: response_type, client_id, redirect_uri, scope, state.
Hint 2: Use urllib.parse.urlencode() to build the query string.
Hint 3: The state parameter is a random value you generate — you store it in the session and verify it when the callback arrives.
Implement state parameter CSRF protection for an OAuth 2.0 callback handler.
import secrets
def start_oauth_flow(session: dict) -> str:
"""Generate and store state; return authorization URL."""
state = secrets.token_urlsafe(32)
session["oauth_state"] = state
return f"https://auth.example.com/oauth/authorize?state={state}&response_type=code&client_id=app1"
def handle_callback(session: dict, callback_params: dict) -> str:
"""Verify state and return authorization code, or raise ValueError."""
pass
session = {}
auth_url = start_oauth_flow(session)
# Simulate valid callback
valid_state = session["oauth_state"]
print(f"Valid callback: {handle_callback(session, {'state': valid_state, 'code': 'AUTH123'}) == 'AUTH123'}")
# Simulate missing state
try:
handle_callback(session, {"code": "AUTH123"})
except ValueError as e:
print(f"Missing state: CSRF check failed: {e}")
# Simulate mismatched state
try:
handle_callback(session, {"state": "attacker-state", "code": "AUTH123"})
except ValueError as e:
print(f"Mismatched state: CSRF check failed: {e}")
Solution
import secrets
import hmac
def start_oauth_flow(session: dict) -> str:
state = secrets.token_urlsafe(32)
session["oauth_state"] = state
return f"https://auth.example.com/oauth/authorize?state={state}&response_type=code&client_id=app1"
def handle_callback(session: dict, callback_params: dict) -> str:
stored_state = session.get("oauth_state")
returned_state = callback_params.get("state")
if not returned_state:
raise ValueError("missing state")
if not stored_state:
raise ValueError("no state in session")
if not hmac.compare_digest(stored_state, returned_state):
raise ValueError("state mismatch")
# Clear state — single use
del session["oauth_state"]
code = callback_params.get("code")
if not code:
raise ValueError("missing authorization code")
return code
session = {}
auth_url = start_oauth_flow(session)
valid_state = session["oauth_state"]
print(f"Valid callback: {handle_callback(session, {'state': valid_state, 'code': 'AUTH123'}) == 'AUTH123'}")
session2 = {}
start_oauth_flow(session2)
try:
handle_callback(session2, {"code": "AUTH123"})
except ValueError as e:
print(f"Missing state: CSRF check failed: {e}")
session3 = {}
start_oauth_flow(session3)
try:
handle_callback(session3, {"state": "attacker-state", "code": "AUTH123"})
except ValueError as e:
print(f"Mismatched state: CSRF check failed: {e}")
Why CSRF matters in OAuth: Without state verification, an attacker can trick a victim into completing an OAuth flow that logs the victim into the attacker's account. The attacker initiates the flow, gets the authorization URL, then tricks the victim into visiting the callback URL with the attacker's authorization code — the victim's session gets bound to the attacker's account. The state parameter, tied to the victim's session, prevents this.
Expected Output
Valid callback: True
Missing state: CSRF check failed: missing state
Mismatched state: CSRF check failed: state mismatchHints
Hint 1: Store the generated state in the session before redirecting. On callback, compare the returned state with the stored value.
Hint 2: The state must be cryptographically random — use secrets.token_urlsafe().
Hint 3: Never proceed with the token exchange if the state is missing or does not match.
Implement a scope parser and has_scope(granted_scopes, required) checker.
def parse_scopes(scope_string: str) -> set:
# parse space-separated scope string into a set
pass
def has_scope(granted_scope_string: str, required: str) -> bool:
# check if a single required scope is in granted scopes
pass
def has_all_scopes(granted_scope_string: str, required: list) -> bool:
# check if all required scopes are granted
pass
granted = "read:profile write:posts read:feed"
print(f"read:profile granted: {has_scope(granted, 'read:profile')}")
print(f"write:posts granted: {has_scope(granted, 'write:posts')}")
print(f"delete:users granted: {has_scope(granted, 'delete:users')}")
print(f"Multiple scopes check: {has_all_scopes(granted, ['read:profile', 'write:posts'])}")
print(f"Empty scope check: {has_all_scopes(granted, ['read:profile', 'admin'])}")
Solution
def parse_scopes(scope_string: str) -> set:
if not scope_string:
return set()
return set(scope_string.strip().split())
def has_scope(granted_scope_string: str, required: str) -> bool:
return required in parse_scopes(granted_scope_string)
def has_all_scopes(granted_scope_string: str, required: list) -> bool:
granted = parse_scopes(granted_scope_string)
return set(required).issubset(granted)
granted = "read:profile write:posts read:feed"
print(f"read:profile granted: {has_scope(granted, 'read:profile')}")
print(f"write:posts granted: {has_scope(granted, 'write:posts')}")
print(f"delete:users granted: {has_scope(granted, 'delete:users')}")
print(f"Multiple scopes check: {has_all_scopes(granted, ['read:profile', 'write:posts'])}")
print(f"Empty scope check: {has_all_scopes(granted, ['read:profile', 'admin'])}")
Scope design best practices:
- Use
resource:actionformat:read:profile,write:posts,admin:users. - Request the minimum scopes needed — principle of least privilege.
- Scopes in the access token are set at issuance time — they cannot be expanded without re-authorization.
- Check scopes on every API call, not just at login — tokens can be reused across multiple endpoints.
Expected Output
read:profile granted: True
write:posts granted: True
delete:users granted: False
Multiple scopes check: True
Empty scope check: FalseHints
Hint 1: OAuth scopes are space-separated strings. Parse into a set for O(1) membership checks.
Hint 2: To require multiple scopes, check that all required scopes are a subset of granted scopes.
Hint 3: Use set.issubset() for multi-scope checks.
Implement a client credentials token request and response parser (simulated without real HTTP).
import secrets
import time
def simulate_token_endpoint(
client_id: str,
client_secret: str,
grant_type: str,
scope: str,
registered_clients: dict,
) -> dict:
"""Simulates an OAuth2 token endpoint for client credentials flow."""
if grant_type != "client_credentials":
return {"error": "unsupported_grant_type"}
client = registered_clients.get(client_id)
if not client or client["secret"] != client_secret:
return {"error": "invalid_client"}
allowed_scopes = set(scope.split()) & set(client["scopes"])
return {
"access_token": secrets.token_urlsafe(32),
"token_type": "bearer",
"expires_in": 3600,
"scope": " ".join(allowed_scopes),
}
CLIENTS = {
"worker-service": {"secret": "super-secret-worker", "scopes": ["read:data", "write:events"]},
}
response = simulate_token_endpoint(
client_id="worker-service",
client_secret="super-secret-worker",
grant_type="client_credentials",
scope="read:data write:events",
registered_clients=CLIENTS,
)
print(f"Token type: {response.get('token_type')}")
print(f"Has access_token: {'access_token' in response}")
print(f"Expires in seconds: {response.get('expires_in')}")
print(f"Scope matches: {set(response.get('scope', '').split()) == {'read:data', 'write:events'}}")
Solution
import secrets
import time
def simulate_token_endpoint(
client_id: str,
client_secret: str,
grant_type: str,
scope: str,
registered_clients: dict,
) -> dict:
if grant_type != "client_credentials":
return {"error": "unsupported_grant_type"}
client = registered_clients.get(client_id)
if not client or client["secret"] != client_secret:
return {"error": "invalid_client"}
allowed_scopes = set(scope.split()) & set(client["scopes"])
return {
"access_token": secrets.token_urlsafe(32),
"token_type": "bearer",
"expires_in": 3600,
"scope": " ".join(allowed_scopes),
}
CLIENTS = {
"worker-service": {"secret": "super-secret-worker", "scopes": ["read:data", "write:events"]},
}
response = simulate_token_endpoint(
client_id="worker-service",
client_secret="super-secret-worker",
grant_type="client_credentials",
scope="read:data write:events",
registered_clients=CLIENTS,
)
print(f"Token type: {response.get('token_type')}")
print(f"Has access_token: {'access_token' in response}")
print(f"Expires in seconds: {response.get('expires_in')}")
print(f"Scope matches: {set(response.get('scope', '').split()) == {'read:data', 'write:events'}}")
Client credentials vs authorization code:
- Client credentials: Machine-to-machine. No user. Client authenticates with its own credentials. Used for background jobs, microservices, and API integrations.
- Authorization code: User-delegated access. User approves what scopes the client gets. Used when the application acts on behalf of a user.
- Client credentials tokens should be short-lived (1 hour) — the client can always request a new one. No refresh token is needed or issued.
Expected Output
Token type: bearer
Has access_token: True
Expires in seconds: 3600
Scope matches: TrueHints
Hint 1: Client credentials flow: client sends client_id + client_secret + grant_type=client_credentials to the token endpoint.
Hint 2: The auth server returns an access_token (no refresh token — the client can always get a new one).
Hint 3: This flow is for machine-to-machine communication — no user is involved.
Medium
Implement PKCE (Proof Key for Code Exchange) code verifier and challenge generation and verification.
import secrets
import hashlib
import base64
def generate_code_verifier(length: int = 128) -> str:
# generate a cryptographically random verifier (unreserved chars)
pass
def generate_code_challenge(verifier: str) -> str:
# return BASE64URL(SHA256(verifier)) with S256 method
pass
def verify_pkce(verifier: str, challenge: str) -> bool:
# verify that challenge == BASE64URL(SHA256(verifier))
pass
verifier = generate_code_verifier(128)
challenge = generate_code_challenge(verifier)
print(f"Code verifier length: {len(verifier)}")
print(f"Code challenge is base64url SHA256 of verifier: {verify_pkce(verifier, challenge)}")
print(f"Challenge is URL-safe (no +/=): {'+' not in challenge and '/' not in challenge and '=' not in challenge}")
print(f"Verification passes: {verify_pkce(verifier, challenge)}")
Solution
import secrets
import hashlib
import base64
import re
def generate_code_verifier(length: int = 128) -> str:
# Use URL-safe base64 then trim to desired length — all chars are unreserved
raw = secrets.token_urlsafe(length)
return raw[:length]
def generate_code_challenge(verifier: str) -> str:
digest = hashlib.sha256(verifier.encode("ascii")).digest()
return base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
def verify_pkce(verifier: str, challenge: str) -> bool:
expected = generate_code_challenge(verifier)
# Use compare_digest even here for consistency
import hmac
return hmac.compare_digest(expected, challenge)
verifier = generate_code_verifier(128)
challenge = generate_code_challenge(verifier)
print(f"Code verifier length: {len(verifier)}")
print(f"Code challenge is base64url SHA256 of verifier: {verify_pkce(verifier, challenge)}")
print(f"Challenge is URL-safe (no +/=): {'+' not in challenge and '/' not in challenge and '=' not in challenge}")
print(f"Verification passes: {verify_pkce(verifier, challenge)}")
Why PKCE exists: In the original authorization code flow, a stolen authorization code could be exchanged for an access token. Mobile and SPA apps cannot keep a client_secret truly secret. PKCE solves this: the app generates a code_verifier before the flow and sends only its hash (code_challenge) in the authorization request. The auth server stores the challenge. At token exchange, the app sends the original code_verifier — the auth server recomputes the hash and verifies it matches. A stolen code cannot be exchanged without the original verifier.
Expected Output
Code verifier length: 128
Code challenge is base64url SHA256 of verifier: True
Challenge is URL-safe (no +/=): True
Verification passes: TrueHints
Hint 1: PKCE code_verifier is a random string of 43-128 characters from the unreserved character set.
Hint 2: code_challenge = BASE64URL(SHA256(code_verifier)) — using the S256 method.
Hint 3: The code_verifier is sent with the token exchange request. The auth server recomputes the challenge and compares.
Implement a token introspection endpoint (RFC 7662) simulator.
import time
import secrets
# Simulated token store (in production this would be a database or cache)
TOKEN_STORE = {}
def issue_token(user_id: str, scope: str, ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
TOKEN_STORE[token] = {
"sub": user_id,
"scope": scope,
"iat": int(time.time()),
"exp": int(time.time()) + ttl,
"client_id": "myapp",
}
return token
def introspect(token: str) -> dict:
"""RFC 7662 token introspection response."""
pass
active_token = issue_token("user42", "read:profile write:posts", ttl=3600)
expired_token = issue_token("user99", "read:profile", ttl=-60)
r1 = introspect(active_token)
r2 = introspect(expired_token)
r3 = introspect("nonexistent-token")
print(f"Active token: active={r1['active']}, sub={r1.get('sub')}, scope={r1.get('scope')}")
print(f"Expired token: active={r2['active']}")
print(f"Unknown token: active={r3['active']}")
Solution
import time
import secrets
TOKEN_STORE = {}
def issue_token(user_id: str, scope: str, ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
TOKEN_STORE[token] = {
"sub": user_id,
"scope": scope,
"iat": int(time.time()),
"exp": int(time.time()) + ttl,
"client_id": "myapp",
}
return token
def introspect(token: str) -> dict:
entry = TOKEN_STORE.get(token)
if entry is None:
return {"active": False}
if time.time() > entry["exp"]:
return {"active": False}
return {
"active": True,
"sub": entry["sub"],
"scope": entry["scope"],
"client_id": entry["client_id"],
"iat": entry["iat"],
"exp": entry["exp"],
"token_type": "Bearer",
}
active_token = issue_token("user42", "read:profile write:posts", ttl=3600)
expired_token = issue_token("user99", "read:profile", ttl=-60)
r1 = introspect(active_token)
r2 = introspect(expired_token)
r3 = introspect("nonexistent-token")
print(f"Active token: active={r1['active']}, sub={r1.get('sub')}, scope={r1.get('scope')}")
print(f"Expired token: active={r2['active']}")
print(f"Unknown token: active={r3['active']}")
Self-contained JWT vs opaque token introspection:
- Self-contained JWT: Resource server validates the signature locally — no network call to auth server. Fast, but tokens cannot be revoked until they expire.
- Opaque token + introspection: Resource server calls the auth server on every request — slower, but supports instant revocation.
- Many systems use short-lived JWTs (15 min) for performance and accept they can't be revoked before expiry. Revocation is handled by the refresh token layer.
Expected Output
Active token: active=True, sub=user42, scope=read:profile
Expired token: active=False
Unknown token: active=FalseHints
Hint 1: RFC 7662 defines token introspection: POST to /introspect with the token, returns {"active": true/false, ...claims}.
Hint 2: An inactive token returns only {"active": false} — do not include claims for invalid tokens.
Hint 3: The introspection endpoint is protected — only registered resource servers should call it.
Implement OIDC ID token validation with nonce verification and required claim checks.
import base64, hashlib, hmac, json, time, secrets
class JWTError(Exception):
pass
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
def create_id_token(claims: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(claims, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def validate_id_token(
token: str,
secret: bytes,
expected_iss: str,
expected_aud: str,
expected_nonce: str,
required_claims: list = None,
) -> dict:
pass
SECRET = b"oidc-secret"
nonce = secrets.token_urlsafe(16)
now = int(time.time())
valid_claims = {
"iss": "https://auth.example.com",
"sub": "user42",
"aud": "myapp-client-id",
"exp": now + 300,
"iat": now,
"nonce": nonce,
"name": "Alice",
}
valid_token = create_id_token(valid_claims, SECRET)
no_nonce_token = create_id_token({k: v for k, v in valid_claims.items() if k != "nonce"}, SECRET)
wrong_nonce_token = create_id_token({**valid_claims, "nonce": "wrong"}, SECRET)
no_email_token = create_id_token({k: v for k, v in valid_claims.items() if k != "email"}, SECRET)
result = validate_id_token(valid_token, SECRET, "https://auth.example.com", "myapp-client-id", nonce, ["email"])
print(f"Valid ID token: sub={result['sub']}")
for tok, label, extra_nonce in [
(no_nonce_token, "Missing nonce", nonce),
(wrong_nonce_token, "Wrong nonce", nonce),
(no_email_token, "Missing email scope claim", nonce),
]:
try:
validate_id_token(tok, SECRET, "https://auth.example.com", "myapp-client-id", extra_nonce, ["email"])
except JWTError as e:
print(f"{label}: JWTError: {e}")
Solution
import base64, hashlib, hmac, json, time, secrets
class JWTError(Exception):
pass
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
def create_id_token(claims: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(claims, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def validate_id_token(
token: str,
secret: bytes,
expected_iss: str,
expected_aud: str,
expected_nonce: str,
required_claims: list = None,
) -> dict:
try:
h_enc, p_enc, sig_enc = token.split(".")
except ValueError:
raise JWTError("malformed token")
expected_sig = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected_sig, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
claims = json.loads(b64url_decode(p_enc))
if time.time() > claims.get("exp", 0):
raise JWTError("token expired")
if claims.get("iss") != expected_iss:
raise JWTError("invalid issuer")
if claims.get("aud") != expected_aud:
raise JWTError("invalid audience")
nonce = claims.get("nonce")
if not nonce:
raise JWTError("missing nonce")
if not hmac.compare_digest(nonce, expected_nonce):
raise JWTError("nonce mismatch")
if required_claims:
for claim in required_claims:
if claim not in claims:
raise JWTError(f"missing required claim: {claim}")
return claims
SECRET = b"oidc-secret"
nonce = secrets.token_urlsafe(16)
now = int(time.time())
valid_claims = {
"iss": "https://auth.example.com",
"sub": "user42",
"aud": "myapp-client-id",
"exp": now + 300,
"iat": now,
"nonce": nonce,
"name": "Alice",
}
valid_token = create_id_token(valid_claims, SECRET)
no_nonce_token = create_id_token({k: v for k, v in valid_claims.items() if k != "nonce"}, SECRET)
wrong_nonce_token = create_id_token({**valid_claims, "nonce": "wrong"}, SECRET)
no_email_token = create_id_token({k: v for k, v in valid_claims.items() if k != "email"}, SECRET)
result = validate_id_token(valid_token, SECRET, "https://auth.example.com", "myapp-client-id", nonce, ["email"])
print(f"Valid ID token: sub={result['sub']}")
for tok, label, extra_nonce in [
(no_nonce_token, "Missing nonce", nonce),
(wrong_nonce_token, "Wrong nonce", nonce),
(no_email_token, "Missing email scope claim", nonce),
]:
try:
validate_id_token(tok, SECRET, "https://auth.example.com", "myapp-client-id", extra_nonce, ["email"])
except JWTError as e:
print(f"{label}: JWTError: {e}")
OIDC vs OAuth2: OAuth2 is an authorization protocol — it grants access. OIDC is an identity layer on top of OAuth2 — it authenticates users and returns an ID token. The ID token is a signed JWT containing user identity claims. The access token is used to call APIs. Never use an access token as proof of identity — use the ID token or the userinfo endpoint.
Expected Output
Valid ID token: sub=user42
Missing nonce: JWTError: missing nonce
Wrong nonce: JWTError: nonce mismatch
Missing email scope claim: JWTError: missing required claim: emailHints
Hint 1: An OIDC ID token is a JWT with additional required claims: iss, sub, aud, exp, iat, and nonce.
Hint 2: The nonce claim prevents replay attacks — generate a fresh nonce for each auth request and verify it in the ID token.
Hint 3: The "email" claim is only present if the "email" scope was requested.
Implement an OAuth 2.0 token exchange endpoint (RFC 8693) that supports scope downgrading.
import secrets
def token_exchange(
subject_token: str,
subject_token_type: str,
requested_scope: str,
token_store: dict,
) -> dict:
"""
Exchange a token for a new token with optionally narrower scopes.
Raises ValueError if requested scopes exceed original scopes.
"""
pass
TOKENS = {
"original-token-abc": {
"sub": "user42",
"scope": "read:profile write:posts admin:read",
"exp": 9999999999,
}
}
result = token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile",
token_store=TOKENS,
)
print(f"Exchange returns new token: {'access_token' in result}")
print(f"Downgraded scope: {result.get('scope')}")
try:
token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile delete:all",
token_store=TOKENS,
)
except ValueError as e:
print(f"Cannot upgrade scope: ValueError: {e}")
Solution
import secrets
def token_exchange(
subject_token: str,
subject_token_type: str,
requested_scope: str,
token_store: dict,
) -> dict:
entry = token_store.get(subject_token)
if not entry:
raise ValueError("invalid subject token")
original_scopes = set(entry["scope"].split())
requested_scopes = set(requested_scope.split())
if not requested_scopes.issubset(original_scopes):
raise ValueError("requested scopes exceed granted scopes")
new_token = secrets.token_urlsafe(32)
token_store[new_token] = {
"sub": entry["sub"],
"scope": " ".join(requested_scopes),
"exp": entry["exp"],
}
return {
"access_token": new_token,
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"token_type": "Bearer",
"scope": " ".join(requested_scopes),
}
TOKENS = {
"original-token-abc": {
"sub": "user42",
"scope": "read:profile write:posts admin:read",
"exp": 9999999999,
}
}
result = token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile",
token_store=TOKENS,
)
print(f"Exchange returns new token: {'access_token' in result}")
print(f"Downgraded scope: {result.get('scope')}")
try:
token_exchange(
subject_token="original-token-abc",
subject_token_type="urn:ietf:params:oauth:token-type:access_token",
requested_scope="read:profile delete:all",
token_store=TOKENS,
)
except ValueError as e:
print(f"Cannot upgrade scope: ValueError: {e}")
Token exchange use cases:
- A front-end service exchanges a user's token for a narrower-scoped token before passing it to a microservice (principle of least privilege in service meshes).
- Impersonation: an admin system exchanges for a token scoped to a specific user.
- Delegation: a background job gets a token bound to a specific task scope.
- RFC 8693 is now supported by Keycloak, Auth0, and most enterprise identity providers.
Expected Output
Exchange returns new token: True
Downgraded scope: read:profile
Cannot upgrade scope: ValueError: requested scopes exceed granted scopesHints
Hint 1: RFC 8693 token exchange allows a service to get a new token on behalf of a user, with potentially narrower scopes.
Hint 2: The exchanged token must have scopes that are a strict subset of the original token scopes.
Hint 3: Scope upgrade (requesting more than the original) must be rejected.
Hard
Implement the full OAuth 2.0 authorization code + PKCE flow end to end.
import hashlib, base64, secrets, time
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
# Shared state (simulates auth server database)
AUTH_CODES = {} # code -> {client_id, user_id, scope, code_challenge, used, expires}
TOKENS = {} # token -> {sub, scope, expires}
def generate_pkce():
"""Returns (code_verifier, code_challenge)"""
verifier = secrets.token_urlsafe(96)[:128]
challenge = b64url_encode(hashlib.sha256(verifier.encode()).digest())
return verifier, challenge
def authorization_endpoint(
client_id: str,
redirect_uri: str,
scope: str,
code_challenge: str,
code_challenge_method: str,
state: str,
# Simulated: user approves
user_id: str = "user42",
) -> str:
"""Simulates user approving the request; returns redirect URL with code."""
pass
def token_endpoint(
client_id: str,
code: str,
redirect_uri: str,
code_verifier: str,
) -> dict:
"""Exchange authorization code for access token, verifying PKCE."""
pass
# Full flow
verifier, challenge = generate_pkce()
state = secrets.token_urlsafe(16)
print("Step 1 - Auth URL built with code_challenge")
redirect = authorization_endpoint(
client_id="myapp",
redirect_uri="https://myapp.com/callback",
scope="openid profile",
code_challenge=challenge,
code_challenge_method="S256",
state=state,
)
from urllib.parse import urlparse, parse_qs
params = parse_qs(urlparse(redirect).query)
code = params["code"][0]
print(f"Step 2 - Authorization code issued")
token_response = token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
print(f"Step 3 - Token issued: has access_token={('access_token' in token_response)}")
try:
token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
except ValueError as e:
print(f"Step 4 - Code replay rejected: ValueError: {e}")
Solution
import hashlib, base64, secrets, time
from urllib.parse import urlparse, parse_qs, urlencode
import hmac
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
AUTH_CODES = {}
TOKENS = {}
def generate_pkce():
verifier = secrets.token_urlsafe(96)[:128]
challenge = b64url_encode(hashlib.sha256(verifier.encode()).digest())
return verifier, challenge
def authorization_endpoint(
client_id, redirect_uri, scope,
code_challenge, code_challenge_method,
state, user_id="user42",
) -> str:
code = secrets.token_urlsafe(32)
AUTH_CODES[code] = {
"client_id": client_id,
"user_id": user_id,
"scope": scope,
"redirect_uri": redirect_uri,
"code_challenge": code_challenge,
"code_challenge_method": code_challenge_method,
"used": False,
"expires": time.time() + 60,
}
params = urlencode({"code": code, "state": state})
return f"{redirect_uri}?{params}"
def token_endpoint(client_id, code, redirect_uri, code_verifier) -> dict:
entry = AUTH_CODES.get(code)
if not entry:
raise ValueError("invalid code")
if entry["used"]:
raise ValueError("code already used")
if time.time() > entry["expires"]:
raise ValueError("code expired")
if entry["client_id"] != client_id:
raise ValueError("client_id mismatch")
if entry["redirect_uri"] != redirect_uri:
raise ValueError("redirect_uri mismatch")
# Verify PKCE
computed = b64url_encode(hashlib.sha256(code_verifier.encode()).digest())
if not hmac.compare_digest(computed, entry["code_challenge"]):
raise ValueError("code_verifier mismatch")
entry["used"] = True
access_token = secrets.token_urlsafe(32)
TOKENS[access_token] = {"sub": entry["user_id"], "scope": entry["scope"]}
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": 3600,
"scope": entry["scope"],
}
verifier, challenge = generate_pkce()
state = secrets.token_urlsafe(16)
print("Step 1 - Auth URL built with code_challenge")
redirect = authorization_endpoint(
client_id="myapp",
redirect_uri="https://myapp.com/callback",
scope="openid profile",
code_challenge=challenge,
code_challenge_method="S256",
state=state,
)
params = parse_qs(urlparse(redirect).query)
code = params["code"][0]
print(f"Step 2 - Authorization code issued")
token_response = token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
print(f"Step 3 - Token issued: has access_token={('access_token' in token_response)}")
try:
token_endpoint("myapp", code, "https://myapp.com/callback", verifier)
except ValueError as e:
print(f"Step 4 - Code replay rejected: ValueError: {e}")
Authorization code security properties:
- Single-use codes prevent replay attacks.
- Short TTL (60 seconds) limits the replay window.
- PKCE binds the code to the client that initiated the flow — a stolen code is useless without the verifier.
- redirect_uri binding prevents open redirect + code theft combinations.
Expected Output
Step 1 - Auth URL built with code_challenge
Step 2 - Authorization code issued
Step 3 - Token issued: has access_token=True
Step 4 - Code replay rejected: ValueError: code already usedHints
Hint 1: Build the complete flow: generate verifier/challenge, build auth URL, simulate user approval, exchange code for token.
Hint 2: The authorization code is single-use — reject second uses.
Hint 3: At token exchange, recompute the code challenge from the submitted verifier and compare with the stored challenge.
Analyze the implicit flow's security weaknesses and demonstrate why it was deprecated in OAuth 2.1.
from urllib.parse import urlparse, parse_qs, urlencode
def implicit_flow_redirect(
client_id: str,
redirect_uri: str,
scope: str,
access_token: str,
) -> str:
"""Simulate auth server redirect in implicit flow — token in URL fragment."""
params = urlencode({"access_token": access_token, "token_type": "bearer", "expires_in": 3600})
return f"{redirect_uri}#{params}" # Note: fragment (#), not query (?)
def analyze_implicit_flow_risks(redirect_url: str) -> dict:
"""Return a security analysis dict for the implicit flow redirect."""
pass
import secrets
redirect = implicit_flow_redirect(
client_id="spa-app",
redirect_uri="https://myapp.com/callback",
scope="read:profile",
access_token=secrets.token_urlsafe(32),
)
analysis = analyze_implicit_flow_risks(redirect)
print(f"Implicit flow token in fragment: {analysis['token_in_fragment']}")
print(f"Token visible in browser history: {analysis['browser_history_risk']} (security risk)")
print(f"No PKCE protection: {analysis['no_pkce']}")
print(f"Recommendation: {analysis['recommendation']}")
Solution
from urllib.parse import urlparse, parse_qs, urlencode, unquote
import secrets
def implicit_flow_redirect(client_id, redirect_uri, scope, access_token) -> str:
params = urlencode({"access_token": access_token, "token_type": "bearer", "expires_in": 3600})
return f"{redirect_uri}#{params}"
def analyze_implicit_flow_risks(redirect_url: str) -> dict:
parsed = urlparse(redirect_url)
fragment = parsed.fragment
fragment_params = parse_qs(fragment)
token_in_fragment = "access_token" in fragment_params
return {
"token_in_fragment": token_in_fragment,
"browser_history_risk": token_in_fragment, # fragments stored in history
"referer_leak_risk": token_in_fragment, # fragments can leak via Referer
"extension_risk": token_in_fragment, # browser extensions can read location.hash
"no_pkce": True, # implicit flow has no code exchange
"no_refresh_token": True, # implicit flow cannot issue refresh tokens
"recommendation": "use authorization code + PKCE instead",
}
redirect = implicit_flow_redirect(
client_id="spa-app",
redirect_uri="https://myapp.com/callback",
scope="read:profile",
access_token=secrets.token_urlsafe(32),
)
analysis = analyze_implicit_flow_risks(redirect)
print(f"Implicit flow token in fragment: {analysis['token_in_fragment']}")
print(f"Token visible in browser history: {analysis['browser_history_risk']} (security risk)")
print(f"No PKCE protection: {analysis['no_pkce']}")
print(f"Recommendation: {analysis['recommendation']}")
Why implicit flow was deprecated:
- Token in URL fragment is stored in browser history — extractable by malicious browser extensions.
- Fragment can leak via the
Refererheader if the page loads third-party resources. - No
code_challengemechanism — a stolen token cannot be invalidated without expiry. - OAuth 2.1 (draft) formally removes implicit flow. All SPAs should use authorization code + PKCE — modern browsers support CORS properly so the back-channel token exchange is not a problem.
Expected Output
Implicit flow token in fragment: True
Token visible in browser history: True (security risk)
No PKCE protection: True
Recommendation: use authorization code + PKCE insteadHints
Hint 1: The implicit flow returns the access token directly in the URL fragment (#access_token=...) after user approval.
Hint 2: URL fragments are stored in browser history, can be logged by browser extensions, and leaked via Referer headers.
Hint 3: OAuth 2.1 removes implicit flow entirely — it is superseded by authorization code + PKCE for SPAs.
Implement a secure dynamic client registration endpoint with redirect URI validation.
import secrets
import re
def register_client(
client_name: str,
redirect_uris: list,
grant_types: list,
allow_localhost: bool = False,
) -> dict:
"""
Register an OAuth2 client. Validates redirect_uris strictly.
Raises ValueError for invalid configurations.
"""
pass
# Valid registration
result = register_client(
client_name="My SPA",
redirect_uris=["https://myapp.com/callback"],
grant_types=["authorization_code"],
)
print(f"Valid registration: client_id assigned")
# Invalid cases
for uris, label in [
(["http://localhost:3000/callback"], "Localhost redirect blocked"),
(["https://myapp.com/*"], "Wildcard redirect blocked"),
(["http://myapp.com/callback"], "HTTP redirect blocked"),
]:
try:
register_client("Test App", uris, ["authorization_code"])
except ValueError as e:
print(f"{label}: ValueError: {e}")
Solution
import secrets
import re
from urllib.parse import urlparse
def register_client(
client_name: str,
redirect_uris: list,
grant_types: list,
allow_localhost: bool = False,
) -> dict:
allowed_grant_types = {"authorization_code", "client_credentials", "refresh_token"}
for gt in grant_types:
if gt not in allowed_grant_types:
raise ValueError(f"unsupported grant_type: {gt}")
for uri in redirect_uris:
parsed = urlparse(uri)
if "*" in uri:
raise ValueError("wildcard redirect_uri not allowed")
hostname = parsed.hostname or ""
if hostname in ("localhost", "127.0.0.1", "::1") and not allow_localhost:
raise ValueError("localhost redirect_uri not allowed in production")
if parsed.scheme != "https" and not (allow_localhost and hostname == "localhost"):
raise ValueError("https required for redirect_uri")
client_id = secrets.token_urlsafe(16)
client_secret = secrets.token_urlsafe(32)
return {
"client_id": client_id,
"client_secret": client_secret,
"client_name": client_name,
"redirect_uris": redirect_uris,
"grant_types": grant_types,
"token_endpoint_auth_method": "client_secret_basic",
}
result = register_client(
client_name="My SPA",
redirect_uris=["https://myapp.com/callback"],
grant_types=["authorization_code"],
)
print(f"Valid registration: client_id assigned")
for uris, label in [
(["http://localhost:3000/callback"], "Localhost redirect blocked"),
(["https://myapp.com/*"], "Wildcard redirect blocked"),
(["http://myapp.com/callback"], "HTTP redirect blocked"),
]:
try:
register_client("Test App", uris, ["authorization_code"])
except ValueError as e:
print(f"{label}: ValueError: {e}")
redirect_uri security: An open redirector vulnerability in OAuth allows an attacker to register a client with a redirect_uri pointing to their server, then trick a user into authorizing it — the authorization code gets sent to the attacker. Strict redirect_uri validation (exact match, no wildcards, https-only in production) is the primary defense. RFC 6749 requires exact string matching of redirect_uri at both registration and authorization time.
Expected Output
Valid registration: client_id assigned
Localhost redirect blocked: ValueError: localhost redirect_uri not allowed in production
Wildcard redirect blocked: ValueError: wildcard redirect_uri not allowed
HTTP redirect blocked: ValueError: https required for redirect_uriHints
Hint 1: RFC 7591 defines dynamic client registration — clients can register themselves programmatically.
Hint 2: redirect_uri validation is critical: reject localhost (except in dev), wildcards, and http:// URIs.
Hint 3: A malicious redirect_uri can exfiltrate authorization codes to an attacker-controlled server.
