Python JWT Authentication Practice Problems & Exercises
Practice: JWT Authentication
← Back to lessonEasy
Decode the header and payload sections of a JWT string without verifying the signature.
import base64
import json
TOKEN = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
".eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
".SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
)
def decode_jwt_part(part: str) -> dict:
# add padding if needed, base64url decode, then JSON parse
pass
header, payload, _ = TOKEN.split(".")
print(f"Header: {decode_jwt_part(header)}")
print(f"Payload: {decode_jwt_part(payload)}")
Solution
import base64
import json
TOKEN = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
".eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
".SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
)
def decode_jwt_part(part: str) -> dict:
# Base64url padding: length must be a multiple of 4
padding = "=" * (4 - len(part) % 4)
decoded = base64.urlsafe_b64decode(part + padding)
return json.loads(decoded)
header, payload, _ = TOKEN.split(".")
print(f"Header: {decode_jwt_part(header)}")
print(f"Payload: {decode_jwt_part(payload)}")
JWT structure anatomy:
- Header:
{"alg": "HS256", "typ": "JWT"}— declares the signing algorithm. - Payload: Claims about the subject —
sub(subject),iat(issued at),exp(expiry), custom claims. - Signature:
HMAC_SHA256(base64url(header) + "." + base64url(payload), secret)or RSA/EC signature. - Critical: Never use a decoded JWT payload for authorization without first verifying the signature. The payload is only base64-encoded, not encrypted — it is readable by anyone.
Expected Output
Header: {'alg': 'HS256', 'typ': 'JWT'}
Payload: {'sub': '1234567890', 'name': 'John Doe', 'iat': 1516239022}Hints
Hint 1: A JWT is three base64url-encoded parts separated by dots: header.payload.signature.
Hint 2: Base64url differs from standard base64: uses - instead of + and _ instead of /. Use base64.urlsafe_b64decode().
Hint 3: Base64url strings may need padding to a multiple of 4 — append = characters before decoding.
Build a minimal HS256 JWT from a header dict, payload dict, and secret key — without using any JWT library.
import base64
import hashlib
import hmac
import json
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def create_jwt(payload: dict, secret: bytes) -> str:
# Build HS256 JWT: header.payload.signature
pass
secret = b"my-256-bit-secret"
payload = {"sub": "user123", "role": "admin", "iat": 1700000000}
token = create_jwt(payload, secret)
parts = token.split(".")
print(f"JWT has 3 parts: {len(parts) == 3}")
import json, base64
header_raw = base64.urlsafe_b64decode(parts[0] + "==")
payload_raw = base64.urlsafe_b64decode(parts[1] + "==")
print(f"Header matches: {json.loads(header_raw)['alg'] == 'HS256'}")
print(f"Payload matches: {json.loads(payload_raw)['sub'] == 'user123'}")
Solution
import base64
import hashlib
import hmac
import json
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def create_jwt(payload: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
header_enc = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
payload_enc = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_enc}.{payload_enc}"
sig_bytes = hmac.new(secret, signing_input.encode(), hashlib.sha256).digest()
sig_enc = b64url_encode(sig_bytes)
return f"{signing_input}.{sig_enc}"
secret = b"my-256-bit-secret"
payload = {"sub": "user123", "role": "admin", "iat": 1700000000}
token = create_jwt(payload, secret)
parts = token.split(".")
print(f"JWT has 3 parts: {len(parts) == 3}")
header_raw = base64.urlsafe_b64decode(parts[0] + "==")
payload_raw = base64.urlsafe_b64decode(parts[1] + "==")
print(f"Header matches: {json.loads(header_raw)['alg'] == 'HS256'}")
print(f"Payload matches: {json.loads(payload_raw)['sub'] == 'user123'}")
Implementation details:
separators=(",", ":")removes whitespace from JSON — important because any whitespace change would break signature verification.- Strip padding (
rstrip(b"=")) from base64url output — RFC 7515 requires unpadded base64url. - The signature uses raw bytes (
digest()), not hex — hex would add unnecessary length. - In production, use
PyJWTorpython-jose— this exercise is for understanding the internals.
Expected Output
JWT has 3 parts: True
Header matches: True
Payload matches: TrueHints
Hint 1: Encode each part with base64.urlsafe_b64encode(), then strip trailing = padding.
Hint 2: The signing input is: base64url(header) + "." + base64url(payload).
Hint 3: Use hmac.new(secret, signing_input.encode(), hashlib.sha256).digest() — note digest() not hexdigest() for the signature bytes.
Write a check_expiry(payload: dict) -> str function that returns "valid", "EXPIRED", or "missing exp claim".
import time
def check_expiry(payload: dict) -> str:
# validate the exp claim
pass
now = int(time.time())
fresh = {"sub": "user1", "exp": now + 3600}
expired = {"sub": "user2", "exp": now - 60}
no_exp = {"sub": "user3"}
print(f"Fresh token: {check_expiry(fresh)}")
print(f"Expired token: {check_expiry(expired)}")
print(f"Token without exp: {check_expiry(no_exp)}")
Solution
import time
def check_expiry(payload: dict) -> str:
if "exp" not in payload:
return "missing exp claim"
if time.time() > payload["exp"]:
return "EXPIRED"
return "valid"
now = int(time.time())
fresh = {"sub": "user1", "exp": now + 3600}
expired = {"sub": "user2", "exp": now - 60}
no_exp = {"sub": "user3"}
print(f"Fresh token: {check_expiry(fresh)}")
print(f"Expired token: {check_expiry(expired)}")
print(f"Token without exp: {check_expiry(no_exp)}")
Claims you must validate:
- exp (expiry): Token must not be expired. Use
time.time()(UTC epoch seconds). - nbf (not before): Token must not be used before this time — prevents clock-skew attacks.
- iss (issuer): Token must come from the expected issuer. Prevents using a valid token from one service at another.
- aud (audience): Token must be intended for your service. Prevents token replay across services.
- Always validate ALL relevant claims — a valid signature with wrong issuer is still a security failure.
Expected Output
Fresh token: valid
Expired token: EXPIRED
Token without exp: missing exp claimHints
Hint 1: The exp claim is a Unix timestamp (seconds since epoch). Compare with time.time().
Hint 2: An expired token has exp < time.time().
Hint 3: A token with no exp claim should be rejected if your policy requires expiry.
Demonstrate the difference between HS256 (symmetric) and RS256 (asymmetric) key types.
import hmac, hashlib
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
# HS256 — symmetric
hs_secret = b"shared-secret-both-sides-need-this"
# RS256 — asymmetric
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
# Sign a message with HS256
def hs256_sign(message: bytes, secret: bytes) -> bytes:
return hmac.new(secret, message, hashlib.sha256).digest()
# Sign a message with RS256 private key
def rs256_sign(message: bytes, priv_key) -> bytes:
return priv_key.sign(message, padding.PKCS1v15(), hashes.SHA256())
# Verify RS256 with public key
def rs256_verify(message: bytes, sig: bytes, pub_key) -> bool:
try:
pub_key.verify(sig, message, padding.PKCS1v15(), hashes.SHA256())
return True
except Exception:
return False
msg = b"user_id=42"
hs_sig = hs256_sign(msg, hs_secret)
rs_sig = rs256_sign(msg, private_key)
print(f"HS256 uses symmetric key: {isinstance(hs_secret, bytes)}")
print(f"RS256 private key type: {type(private_key).__name__}")
print(f"RS256 public key type: {type(public_key).__name__}")
print(f"Only private key can sign: {rs256_verify(msg, rs_sig, public_key)}")
Solution
import hmac, hashlib
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
hs_secret = b"shared-secret-both-sides-need-this"
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
def hs256_sign(message: bytes, secret: bytes) -> bytes:
return hmac.new(secret, message, hashlib.sha256).digest()
def rs256_sign(message: bytes, priv_key) -> bytes:
return priv_key.sign(message, padding.PKCS1v15(), hashes.SHA256())
def rs256_verify(message: bytes, sig: bytes, pub_key) -> bool:
try:
pub_key.verify(sig, message, padding.PKCS1v15(), hashes.SHA256())
return True
except Exception:
return False
msg = b"user_id=42"
hs_sig = hs256_sign(msg, hs_secret)
rs_sig = rs256_sign(msg, private_key)
print(f"HS256 uses symmetric key: {isinstance(hs_secret, bytes)}")
print(f"RS256 private key type: {type(private_key).__name__}")
print(f"RS256 public key type: {type(public_key).__name__}")
print(f"Only private key can sign: {rs256_verify(msg, rs_sig, public_key)}")
When to use each:
- HS256: Single service or trusted internal services that share the secret. Simpler but requires distributing the secret securely.
- RS256: Multiple services (microservices, third-party). Only the auth server has the private key. Any service can verify tokens using the published JWKS (public key endpoint) without needing the secret.
- ES256 (ECDSA): Same asymmetric model as RS256 but shorter keys and signatures. Preferred for modern systems.
Expected Output
HS256 uses symmetric key: True
RS256 private key type: RSAPrivateKey
RS256 public key type: RSAPublicKey
Only private key can sign: TrueHints
Hint 1: HS256 uses a shared secret (bytes) — both signer and verifier need the same key.
Hint 2: RS256 uses RSA: sign with private key, verify with public key. Use cryptography library.
Hint 3: For RS256, generate a key pair with rsa.generate_private_key(65537, 2048).
Medium
Implement a verify_jwt(token, secret, expected_iss, expected_aud) function that validates signature and all standard claims.
import base64, hashlib, hmac, json, time
class JWTError(Exception):
pass
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def create_jwt(payload: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def verify_jwt(token: str, secret: bytes, expected_iss: str, expected_aud: str) -> dict:
# verify signature, exp, iss, aud — raise JWTError on failure
pass
SECRET = b"signing-secret"
now = int(time.time())
valid_payload = {"sub": "user42", "iss": "auth.example.com", "aud": "api.example.com", "exp": now + 3600}
good_token = create_jwt(valid_payload, SECRET)
exp_token = create_jwt({**valid_payload, "exp": now - 60}, SECRET)
iss_token = create_jwt({**valid_payload, "iss": "evil.com"}, SECRET)
aud_token = create_jwt({**valid_payload, "aud": "other-service"}, SECRET)
claims = verify_jwt(good_token, SECRET, "auth.example.com", "api.example.com")
print(f"Valid token claims: {{'sub': '{claims['sub']}', 'iss': '{claims['iss']}', 'aud': '{claims['aud']}'}}")
for tok, label in [(exp_token, "Expired"), (iss_token, "Wrong issuer"), (aud_token, "Wrong audience")]:
try:
verify_jwt(tok, SECRET, "auth.example.com", "api.example.com")
except JWTError as e:
print(f"{label}: JWTError: {e}")
Solution
import base64, hashlib, hmac, json, time
class JWTError(Exception):
pass
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def create_jwt(payload: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def verify_jwt(token: str, secret: bytes, expected_iss: str, expected_aud: str) -> dict:
try:
h_enc, p_enc, sig_enc = token.split(".")
except ValueError:
raise JWTError("malformed token")
header = json.loads(b64url_decode(h_enc))
if header.get("alg") != "HS256":
raise JWTError("unsupported algorithm")
expected_sig = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
actual_sig = b64url_decode(sig_enc)
if not hmac.compare_digest(expected_sig, actual_sig):
raise JWTError("invalid signature")
payload = json.loads(b64url_decode(p_enc))
if time.time() > payload.get("exp", 0):
raise JWTError("token expired")
if payload.get("iss") != expected_iss:
raise JWTError("invalid issuer")
if payload.get("aud") != expected_aud:
raise JWTError("invalid audience")
return payload
SECRET = b"signing-secret"
now = int(time.time())
valid_payload = {"sub": "user42", "iss": "auth.example.com", "aud": "api.example.com", "exp": now + 3600}
good_token = create_jwt(valid_payload, SECRET)
exp_token = create_jwt({**valid_payload, "exp": now - 60}, SECRET)
iss_token = create_jwt({**valid_payload, "iss": "evil.com"}, SECRET)
aud_token = create_jwt({**valid_payload, "aud": "other-service"}, SECRET)
claims = verify_jwt(good_token, SECRET, "auth.example.com", "api.example.com")
print(f"Valid token claims: {{'sub': '{claims['sub']}', 'iss': '{claims['iss']}', 'aud': '{claims['aud']}'}}")
for tok, label in [(exp_token, "Expired"), (iss_token, "Wrong issuer"), (aud_token, "Wrong audience")]:
try:
verify_jwt(tok, SECRET, "auth.example.com", "api.example.com")
except JWTError as e:
print(f"{label}: JWTError: {e}")
Validation order matters:
- Verify signature first — if the signature is invalid, don't bother parsing claims (prevents decoding attacker-crafted payloads).
- Check algorithm claim — reject anything that isn't your expected algorithm.
- Check expiry — fast reject for stale tokens.
- Check issuer and audience — prevent token reuse across services.
Expected Output
Valid token claims: {'sub': 'user42', 'iss': 'auth.example.com', 'aud': 'api.example.com'}
Expired: JWTError: token expired
Wrong issuer: JWTError: invalid issuer
Wrong audience: JWTError: invalid audienceHints
Hint 1: Verify the HMAC signature first — only then decode and validate claims.
Hint 2: Rebuild the expected signature from header + payload and compare with hmac.compare_digest().
Hint 3: Validate exp, iss, and aud — not just the signature.
Demonstrate the "none" algorithm attack and implement a secure verifier that blocks it.
import base64, json, hashlib, hmac
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
class JWTError(Exception):
pass
def craft_none_token(payload: dict) -> str:
# build a JWT with alg=none and empty signature
header = {"alg": "none", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
return f"{h}.{p}." # empty signature
def vulnerable_verify(token: str, secret: bytes) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header.get("alg", "")
if alg == "none":
return json.loads(b64url_decode(p_enc)) # BUG: no verification!
expected = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
return json.loads(b64url_decode(p_enc))
def secure_verify(token: str, secret: bytes, allowed_algs=("HS256",)) -> dict:
# fix the vulnerability — only accept explicitly allowed algorithms
pass
SECRET = b"server-secret"
evil_payload = {"sub": "attacker", "role": "admin", "exp": 9999999999}
none_token = craft_none_token(evil_payload)
result = vulnerable_verify(none_token, SECRET)
print(f"Crafted token accepted by vulnerable verifier: {result['role'] == 'admin'}")
try:
secure_verify(none_token, SECRET)
except JWTError as e:
print(f"Crafted token rejected by secure verifier: JWTError: {e}")
Solution
import base64, json, hashlib, hmac
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
class JWTError(Exception):
pass
def craft_none_token(payload: dict) -> str:
header = {"alg": "none", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
return f"{h}.{p}."
def vulnerable_verify(token: str, secret: bytes) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header.get("alg", "")
if alg == "none":
return json.loads(b64url_decode(p_enc))
expected = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
return json.loads(b64url_decode(p_enc))
def secure_verify(token: str, secret: bytes, allowed_algs=("HS256",)) -> dict:
try:
h_enc, p_enc, sig_enc = token.split(".")
except ValueError:
raise JWTError("malformed token")
header = json.loads(b64url_decode(h_enc))
alg = header.get("alg", "")
if alg not in allowed_algs:
raise JWTError(f"unsupported algorithm: {alg}")
expected = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
return json.loads(b64url_decode(p_enc))
SECRET = b"server-secret"
evil_payload = {"sub": "attacker", "role": "admin", "exp": 9999999999}
none_token = craft_none_token(evil_payload)
result = vulnerable_verify(none_token, SECRET)
print(f"Crafted token accepted by vulnerable verifier: {result['role'] == 'admin'}")
try:
secure_verify(none_token, SECRET)
except JWTError as e:
print(f"Crafted token rejected by secure verifier: JWTError: {e}")
CVE history: Multiple real JWT libraries (node-jsonwebtoken before 4.0, python-jwt before 3.3.4) were vulnerable to this attack. An attacker could craft a token with "alg": "none" and any payload they wanted, and the server would accept it. Always pass an explicit algorithms=["HS256"] argument to PyJWT.decode().
Expected Output
Crafted token accepted by vulnerable verifier: True
Crafted token rejected by secure verifier: JWTError: unsupported algorithm: noneHints
Hint 1: The "none" algorithm JWT has an empty signature — format is header.payload. (dot then nothing).
Hint 2: A vulnerable verifier skips signature verification when alg is "none".
Hint 3: The fix: always check that alg is in your explicit allowlist before verifying.
Demonstrate the RS256-to-HS256 algorithm confusion attack and the secure fix.
import base64, json, hashlib, hmac
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
class JWTError(Exception):
pass
# Simulate: server uses RS256, public key is known to attacker
PUBLIC_KEY_BYTES = b"this-is-the-public-key-bytes-attacker-knows"
def attacker_craft_hs256_with_pubkey(payload: dict) -> str:
# attacker signs with HS256 using the public key as the secret
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(PUBLIC_KEY_BYTES, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def confused_verifier(token: str, public_key_bytes: bytes) -> dict:
# BUG: reads alg from header and uses public_key_bytes as HMAC secret for HS256
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header["alg"]
if alg == "HS256": # uses public key as HMAC secret!
expected = hmac.new(public_key_bytes, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if hmac.compare_digest(expected, b64url_decode(sig_enc)):
return json.loads(b64url_decode(p_enc))
raise JWTError("verification failed")
def secure_verifier(token: str, public_key_bytes: bytes, expected_alg: str = "RS256") -> dict:
# fix: reject any token that doesn't use the pinned algorithm
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
if header.get("alg") != expected_alg:
raise JWTError("algorithm mismatch")
raise JWTError("RS256 verification requires real RSA key — algorithm mismatch caught")
evil_payload = {"sub": "attacker", "role": "superadmin"}
attack_token = attacker_craft_hs256_with_pubkey(evil_payload)
result = confused_verifier(attack_token, PUBLIC_KEY_BYTES)
print(f"Attack token accepted by confused verifier: {result['role'] == 'superadmin'}")
try:
secure_verifier(attack_token, PUBLIC_KEY_BYTES, expected_alg="RS256")
except JWTError as e:
print(f"Attack token rejected by secure verifier: JWTError: {e}")
Solution
import base64, json, hashlib, hmac
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
class JWTError(Exception):
pass
PUBLIC_KEY_BYTES = b"this-is-the-public-key-bytes-attacker-knows"
def attacker_craft_hs256_with_pubkey(payload: dict) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(PUBLIC_KEY_BYTES, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def confused_verifier(token: str, public_key_bytes: bytes) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header["alg"]
if alg == "HS256":
expected = hmac.new(public_key_bytes, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if hmac.compare_digest(expected, b64url_decode(sig_enc)):
return json.loads(b64url_decode(p_enc))
raise JWTError("verification failed")
def secure_verifier(token: str, public_key_bytes: bytes, expected_alg: str = "RS256") -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
if header.get("alg") != expected_alg:
raise JWTError("algorithm mismatch")
raise JWTError("RS256 verification requires real RSA key — algorithm mismatch caught")
evil_payload = {"sub": "attacker", "role": "superadmin"}
attack_token = attacker_craft_hs256_with_pubkey(evil_payload)
result = confused_verifier(attack_token, PUBLIC_KEY_BYTES)
print(f"Attack token accepted by confused verifier: {result['role'] == 'superadmin'}")
try:
secure_verifier(attack_token, PUBLIC_KEY_BYTES, expected_alg="RS256")
except JWTError as e:
print(f"Attack token rejected by secure verifier: JWTError: {e}")
The attack mechanics: The RS256 public key is public — anyone can read it. If the server verifies HS256 tokens using the public key as the HMAC secret, and an attacker creates an HS256 token signed with that same public key, the confused verifier accepts it. PyJWT 2.x fixed this by requiring algorithms=["RS256"] — the alg header is never trusted alone.
Expected Output
Attack token accepted by confused verifier: True
Attack token rejected by secure verifier: JWTError: algorithm mismatchHints
Hint 1: In the RS256-to-HS256 confusion attack, the attacker signs a token with HS256 using the server's PUBLIC key as the HMAC secret.
Hint 2: A confused verifier that does not pin the expected algorithm sees "HS256" in the header and uses the public key as the HMAC secret — which the attacker also has access to.
Hint 3: The fix: the verifier must specify which algorithm it expects — never trust the alg header alone.
Implement a refresh token rotation system with reuse detection.
import secrets
import time
class TokenStore:
"""Simulates server-side refresh token storage."""
def __init__(self):
self._tokens = {} # token -> {user_id, family_id, used, expires}
self._revoked_families = set()
def issue(self, user_id: str, family_id: str = None) -> str:
token = secrets.token_urlsafe(32)
if family_id is None:
family_id = secrets.token_urlsafe(16)
self._tokens[token] = {
"user_id": user_id,
"family_id": family_id,
"used": False,
"expires": time.time() + 86400,
}
return token
def consume(self, token: str):
# returns (user_id, family_id) or raises ValueError
# also detects reuse and revokes family
pass
def is_family_revoked(self, family_id: str) -> bool:
return family_id in self._revoked_families
store = TokenStore()
# Login
family_id = secrets.token_urlsafe(16)
refresh1 = store.issue("user42", family_id)
print(f"Login: access=<token>, refresh={refresh1[:8]}...")
# Use refresh token once
user_id, fam = store.consume(refresh1)
refresh2 = store.issue(user_id, fam)
print(f"Refresh: new access token issued")
# Try to reuse refresh1 (should detect theft)
try:
store.consume(refresh1)
print("Old refresh token rejected: False")
except ValueError:
print("Old refresh token rejected: True")
# Family should now be revoked
print(f"Reuse detected — family revoked: {store.is_family_revoked(family_id)}")
Solution
import secrets
import time
class TokenStore:
def __init__(self):
self._tokens = {}
self._revoked_families = set()
def issue(self, user_id: str, family_id: str = None) -> str:
token = secrets.token_urlsafe(32)
if family_id is None:
family_id = secrets.token_urlsafe(16)
self._tokens[token] = {
"user_id": user_id,
"family_id": family_id,
"used": False,
"expires": time.time() + 86400,
}
return token
def consume(self, token: str):
entry = self._tokens.get(token)
if entry is None:
raise ValueError("unknown token")
if entry["family_id"] in self._revoked_families:
raise ValueError("family revoked")
if entry["used"]:
# Reuse detected — revoke the entire family
self._revoked_families.add(entry["family_id"])
raise ValueError("token reuse detected — family revoked")
if time.time() > entry["expires"]:
raise ValueError("token expired")
entry["used"] = True
return entry["user_id"], entry["family_id"]
def is_family_revoked(self, family_id: str) -> bool:
return family_id in self._revoked_families
store = TokenStore()
family_id = secrets.token_urlsafe(16)
refresh1 = store.issue("user42", family_id)
print(f"Login: access=<token>, refresh={refresh1[:8]}...")
user_id, fam = store.consume(refresh1)
refresh2 = store.issue(user_id, fam)
print(f"Refresh: new access token issued")
try:
store.consume(refresh1)
print("Old refresh token rejected: False")
except ValueError:
print("Old refresh token rejected: True")
print(f"Reuse detected — family revoked: {store.is_family_revoked(family_id)}")
Refresh token rotation security model:
- Each refresh token is single-use — after use, it is marked consumed and a new one is issued.
- Token families group all refresh tokens issued from a single login session.
- If an already-used token is presented again, it means either the server or client had the token stolen — the entire family is revoked, forcing re-login.
- Access tokens are short-lived JWTs (15 min). Refresh tokens are long-lived opaque server-side tokens (7–30 days).
Expected Output
Login: access=<token>, refresh=<token>
Refresh: new access token issued
Old refresh token rejected: True
Reuse detected — family revoked: TrueHints
Hint 1: A refresh token is a long-lived opaque token stored server-side. The access token is a short-lived JWT.
Hint 2: On each refresh, issue new access + refresh tokens and invalidate the old refresh token.
Hint 3: Token family tracking: if an already-used refresh token is presented again, revoke the entire family (sign of token theft).
Hard
Implement a JWKS cache that parses a JWKS document and selects the correct key based on the JWT kid header.
import json
import base64
# Simulated JWKS response (simplified — real JWKS has n, e fields for RSA)
JWKS = {
"keys": [
{"kty": "RSA", "kid": "key-2023-12", "alg": "RS256", "use": "sig", "n": "abc123", "e": "AQAB"},
{"kty": "RSA", "kid": "key-2024-01", "alg": "RS256", "use": "sig", "n": "def456", "e": "AQAB"},
]
}
class JWKSCache:
def __init__(self, jwks: dict):
# build internal index
pass
def get_key(self, kid: str) -> dict:
# return the JWK dict for kid, raise KeyError if not found
pass
@property
def key_ids(self):
pass
cache = JWKSCache(JWKS)
print(f"JWKS has {len(cache.key_ids)} keys")
key = cache.get_key("key-2024-01")
print(f"Selected key id: {key['kid']}")
print(f"Correct key selected for kid: {key['n'] == 'def456'}")
try:
cache.get_key("nonexistent-kid")
print("Unknown kid raises: False")
except KeyError:
print("Unknown kid raises: True")
Solution
import json
JWKS = {
"keys": [
{"kty": "RSA", "kid": "key-2023-12", "alg": "RS256", "use": "sig", "n": "abc123", "e": "AQAB"},
{"kty": "RSA", "kid": "key-2024-01", "alg": "RS256", "use": "sig", "n": "def456", "e": "AQAB"},
]
}
class JWKSCache:
def __init__(self, jwks: dict):
self._keys = {key["kid"]: key for key in jwks.get("keys", [])}
def get_key(self, kid: str) -> dict:
if kid not in self._keys:
raise KeyError(f"Unknown kid: {kid}")
return self._keys[kid]
@property
def key_ids(self):
return list(self._keys.keys())
cache = JWKSCache(JWKS)
print(f"JWKS has {len(cache.key_ids)} keys")
key = cache.get_key("key-2024-01")
print(f"Selected key id: {key['kid']}")
print(f"Correct key selected for kid: {key['n'] == 'def456'}")
try:
cache.get_key("nonexistent-kid")
print("Unknown kid raises: False")
except KeyError:
print("Unknown kid raises: True")
JWKS in production:
- Auth servers (Keycloak, Auth0, Okta) publish their JWKS at
/.well-known/jwks.json. - Cache the JWKS in memory with a TTL — refetch on cache miss or
kidnot found (allows key rotation without downtime). - Never disable
kidvalidation — an attacker could craft a token with akidpointing to a key they control if your verifier doesn't validate it. - The
use: "sig"field distinguishes signing keys from encryption keys — only accept keys withuse: "sig"for JWT verification.
Expected Output
JWKS has 2 keys
Selected key id: key-2024-01
Correct key selected for kid: True
Unknown kid raises: TrueHints
Hint 1: A JWKS (JSON Web Key Set) is a JSON object with a "keys" array. Each key has a "kid" (key ID).
Hint 2: The JWT header contains a "kid" field — use it to look up the correct verification key from the JWKS.
Hint 3: Build a dict mapping kid to key object for O(1) lookup.
Implement JWT revocation using a jti blacklist, simulating a Redis-like TTL store.
import hashlib, hmac, base64, json, time, secrets
class JTIBlacklist:
"""In-memory blacklist with TTL support (simulates Redis)."""
def __init__(self):
self._store = {} # jti -> expires_at
def add(self, jti: str, ttl_seconds: int) -> None:
self._store[jti] = time.time() + ttl_seconds
def is_revoked(self, jti: str) -> bool:
expires = self._store.get(jti)
if expires is None:
return False
if time.time() > expires:
del self._store[jti]
return False
return True
def cleanup_expired(self) -> int:
now = time.time()
expired = [k for k, v in self._store.items() if v < now]
for k in expired:
del self._store[k]
return len(expired)
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
SECRET = b"signing-secret"
blacklist = JTIBlacklist()
class JWTError(Exception):
pass
def create_jwt(payload: dict) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(SECRET, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def verify_jwt(token: str) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
expected = hmac.new(SECRET, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
payload = json.loads(b64url_decode(p_enc))
if time.time() > payload.get("exp", 0):
raise JWTError("token expired")
jti = payload.get("jti")
if jti and blacklist.is_revoked(jti):
raise JWTError("token revoked")
return payload
def logout(token: str) -> None:
# add jti to blacklist with remaining TTL
pass
now = int(time.time())
payload = {"sub": "user42", "jti": secrets.token_urlsafe(16), "exp": now + 3600, "iat": now}
token = create_jwt(payload)
jti = json.loads(b64url_decode(token.split(".")[1]))["jti"]
print(f"Token issued with jti")
claims = verify_jwt(token)
print(f"Token valid before revoke: {claims['sub'] == 'user42'}")
logout(token)
try:
verify_jwt(token)
except JWTError as e:
print(f"Token invalid after revoke: JWTError: {e}")
# Simulate expired jti
blacklist.add("old-jti", ttl_seconds=-1)
cleaned = blacklist.cleanup_expired()
print(f"Expired jti cleaned up: {cleaned >= 1}")
Solution
import hashlib, hmac, base64, json, time, secrets
class JTIBlacklist:
def __init__(self):
self._store = {}
def add(self, jti: str, ttl_seconds: int) -> None:
self._store[jti] = time.time() + ttl_seconds
def is_revoked(self, jti: str) -> bool:
expires = self._store.get(jti)
if expires is None:
return False
if time.time() > expires:
del self._store[jti]
return False
return True
def cleanup_expired(self) -> int:
now = time.time()
expired = [k for k, v in self._store.items() if v < now]
for k in expired:
del self._store[k]
return len(expired)
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))
SECRET = b"signing-secret"
blacklist = JTIBlacklist()
class JWTError(Exception):
pass
def create_jwt(payload: dict) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(SECRET, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"
def verify_jwt(token: str) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
expected = hmac.new(SECRET, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
payload = json.loads(b64url_decode(p_enc))
if time.time() > payload.get("exp", 0):
raise JWTError("token expired")
jti = payload.get("jti")
if jti and blacklist.is_revoked(jti):
raise JWTError("token revoked")
return payload
def logout(token: str) -> None:
payload = json.loads(b64url_decode(token.split(".")[1]))
jti = payload.get("jti")
exp = payload.get("exp", int(time.time()))
ttl = max(0, exp - int(time.time()))
if jti:
blacklist.add(jti, ttl)
now = int(time.time())
payload = {"sub": "user42", "jti": secrets.token_urlsafe(16), "exp": now + 3600, "iat": now}
token = create_jwt(payload)
jti = json.loads(b64url_decode(token.split(".")[1]))["jti"]
print(f"Token issued with jti")
claims = verify_jwt(token)
print(f"Token valid before revoke: {claims['sub'] == 'user42'}")
logout(token)
try:
verify_jwt(token)
except JWTError as e:
print(f"Token invalid after revoke: JWTError: {e}")
blacklist.add("old-jti", ttl_seconds=-1)
cleaned = blacklist.cleanup_expired()
print(f"Expired jti cleaned up: {cleaned >= 1}")
Blacklist TTL optimization: The blacklist entry only needs to live as long as the token would have been valid — once the token expires naturally, you don't need to check the blacklist. Setting TTL = remaining token lifetime keeps the blacklist small.
Expected Output
Token issued with jti
Token valid before revoke: True
Token invalid after revoke: JWTError: token revoked
Expired jti cleaned up: TrueHints
Hint 1: Add a "jti" (JWT ID) claim — a unique identifier per token — so you can blacklist individual tokens.
Hint 2: On logout, add the jti to the blacklist with TTL equal to the remaining token lifetime.
Hint 3: Before accepting a token, check the blacklist. If jti is present, reject.
Build a JWT audit logger that detects IP binding violations and rapid token reuse anomalies.
import time
from collections import defaultdict
class JWTAuditLog:
def __init__(self, rapid_reuse_window: float = 1.0):
self._log = []
self._jti_registry = {} # jti -> {first_ip, last_seen, use_count}
self._rapid_window = rapid_reuse_window
def record(self, jti: str, user_id: str, ip: str) -> list:
"""Record a token use. Returns list of anomaly strings (empty = clean)."""
now = time.time()
anomalies = []
if jti in self._jti_registry:
entry = self._jti_registry[jti]
# check ip binding
if entry["first_ip"] != ip:
anomalies.append("ip_mismatch")
# check rapid reuse
if now - entry["last_seen"] < self._rapid_window:
anomalies.append("rapid_reuse")
entry["use_count"] += 1
entry["last_seen"] = now
else:
self._jti_registry[jti] = {
"first_ip": ip,
"last_seen": now,
"use_count": 1,
"user_id": user_id,
}
self._log.append({
"ts": now, "jti": jti, "user_id": user_id,
"ip": ip, "anomalies": anomalies,
})
return anomalies
def get_log(self):
return list(self._log)
audit = JWTAuditLog(rapid_reuse_window=1.0)
jti = "test-jti-001"
print("Token issued for user42")
for use_num, ip, label in [
(1, "192.168.1.1", "First use from 192.168.1.1"),
(2, "192.168.1.1", "Second use from 192.168.1.1"),
(3, "10.0.0.99", "Use from different IP"),
]:
anomalies = audit.record(jti, "user42", ip)
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"{label}: {status}")
# Rapid reuse simulation
jti2 = "test-jti-002"
audit.record(jti2, "user42", "192.168.1.1")
anomalies = audit.record(jti2, "user42", "192.168.1.1") # immediate reuse
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"Replay within 1 second: {status}")
Solution
import time
class JWTAuditLog:
def __init__(self, rapid_reuse_window: float = 1.0):
self._log = []
self._jti_registry = {}
self._rapid_window = rapid_reuse_window
def record(self, jti: str, user_id: str, ip: str) -> list:
now = time.time()
anomalies = []
if jti in self._jti_registry:
entry = self._jti_registry[jti]
if entry["first_ip"] != ip:
anomalies.append("ip_mismatch")
if now - entry["last_seen"] < self._rapid_window:
anomalies.append("rapid_reuse")
entry["use_count"] += 1
entry["last_seen"] = now
else:
self._jti_registry[jti] = {
"first_ip": ip,
"last_seen": now,
"use_count": 1,
"user_id": user_id,
}
self._log.append({
"ts": now, "jti": jti, "user_id": user_id,
"ip": ip, "anomalies": anomalies,
})
return anomalies
def get_log(self):
return list(self._log)
audit = JWTAuditLog(rapid_reuse_window=1.0)
jti = "test-jti-001"
print("Token issued for user42")
for use_num, ip, label in [
(1, "192.168.1.1", "First use from 192.168.1.1"),
(2, "192.168.1.1", "Second use from 192.168.1.1"),
(3, "10.0.0.99", "Use from different IP"),
]:
anomalies = audit.record(jti, "user42", ip)
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"{label}: {status}")
jti2 = "test-jti-002"
audit.record(jti2, "user42", "192.168.1.1")
anomalies = audit.record(jti2, "user42", "192.168.1.1")
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"Replay within 1 second: {status}")
Production anomaly detection patterns:
- IP binding: Hard binding to IP is too strict for mobile users. Soft binding flags changes as anomalies without blocking — route to step-up authentication.
- Rapid reuse: A legitimate user will not use the same access token twice within milliseconds. Rapid reuse indicates token interception and replay.
- Use count: Access tokens used hundreds of times per minute from one user are suspicious — possible credential stuffing or shared token.
- Feed anomaly events into a SIEM (Splunk, Datadog Security) for real-time alerting.
Expected Output
Token issued for user42
First use from 192.168.1.1: OK
Second use from 192.168.1.1: OK
Use from different IP: ANOMALY: ip_mismatch
Replay within 1 second: ANOMALY: rapid_reuseHints
Hint 1: An audit log records every token verification attempt: timestamp, IP, user, outcome.
Hint 2: IP binding: if the same jti is used from a different IP, flag as anomalous.
Hint 3: Rapid reuse: if the same jti is used more than once within a short window, flag potential replay.
