Skip to main content

Python JWT Authentication Practice Problems & Exercises

Practice: JWT Authentication

11 problems4 Easy4 Medium3 Hard60–90 min
← Back to lesson

Easy

#1Decode JWT Structure Without VerificationEasy
jwtbase64urlstructureheaderpayload

Decode the header and payload sections of a JWT string without verifying the signature.

import base64
import json

TOKEN = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
".eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
".SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
)

def decode_jwt_part(part: str) -> dict:
# add padding if needed, base64url decode, then JSON parse
pass

header, payload, _ = TOKEN.split(".")
print(f"Header: {decode_jwt_part(header)}")
print(f"Payload: {decode_jwt_part(payload)}")
Solution
import base64
import json

TOKEN = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
".eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
".SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
)

def decode_jwt_part(part: str) -> dict:
# Base64url padding: length must be a multiple of 4
padding = "=" * (4 - len(part) % 4)
decoded = base64.urlsafe_b64decode(part + padding)
return json.loads(decoded)

header, payload, _ = TOKEN.split(".")
print(f"Header: {decode_jwt_part(header)}")
print(f"Payload: {decode_jwt_part(payload)}")

JWT structure anatomy:

  • Header: {"alg": "HS256", "typ": "JWT"} — declares the signing algorithm.
  • Payload: Claims about the subject — sub (subject), iat (issued at), exp (expiry), custom claims.
  • Signature: HMAC_SHA256(base64url(header) + "." + base64url(payload), secret) or RSA/EC signature.
  • Critical: Never use a decoded JWT payload for authorization without first verifying the signature. The payload is only base64-encoded, not encrypted — it is readable by anyone.
Expected Output
Header: {'alg': 'HS256', 'typ': 'JWT'}
Payload: {'sub': '1234567890', 'name': 'John Doe', 'iat': 1516239022}
Hints

Hint 1: A JWT is three base64url-encoded parts separated by dots: header.payload.signature.

Hint 2: Base64url differs from standard base64: uses - instead of + and _ instead of /. Use base64.urlsafe_b64decode().

Hint 3: Base64url strings may need padding to a multiple of 4 — append = characters before decoding.

#2Build a JWT from ScratchEasy
jwthmacbase64urlsigning

Build a minimal HS256 JWT from a header dict, payload dict, and secret key — without using any JWT library.

import base64
import hashlib
import hmac
import json

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def create_jwt(payload: dict, secret: bytes) -> str:
# Build HS256 JWT: header.payload.signature
pass

secret = b"my-256-bit-secret"
payload = {"sub": "user123", "role": "admin", "iat": 1700000000}

token = create_jwt(payload, secret)
parts = token.split(".")
print(f"JWT has 3 parts: {len(parts) == 3}")

import json, base64
header_raw = base64.urlsafe_b64decode(parts[0] + "==")
payload_raw = base64.urlsafe_b64decode(parts[1] + "==")
print(f"Header matches: {json.loads(header_raw)['alg'] == 'HS256'}")
print(f"Payload matches: {json.loads(payload_raw)['sub'] == 'user123'}")
Solution
import base64
import hashlib
import hmac
import json

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def create_jwt(payload: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
header_enc = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
payload_enc = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_enc}.{payload_enc}"
sig_bytes = hmac.new(secret, signing_input.encode(), hashlib.sha256).digest()
sig_enc = b64url_encode(sig_bytes)
return f"{signing_input}.{sig_enc}"

secret = b"my-256-bit-secret"
payload = {"sub": "user123", "role": "admin", "iat": 1700000000}

token = create_jwt(payload, secret)
parts = token.split(".")
print(f"JWT has 3 parts: {len(parts) == 3}")

header_raw = base64.urlsafe_b64decode(parts[0] + "==")
payload_raw = base64.urlsafe_b64decode(parts[1] + "==")
print(f"Header matches: {json.loads(header_raw)['alg'] == 'HS256'}")
print(f"Payload matches: {json.loads(payload_raw)['sub'] == 'user123'}")

Implementation details:

  • separators=(",", ":") removes whitespace from JSON — important because any whitespace change would break signature verification.
  • Strip padding (rstrip(b"=")) from base64url output — RFC 7515 requires unpadded base64url.
  • The signature uses raw bytes (digest()), not hex — hex would add unnecessary length.
  • In production, use PyJWT or python-jose — this exercise is for understanding the internals.
Expected Output
JWT has 3 parts: True
Header matches: True
Payload matches: True
Hints

Hint 1: Encode each part with base64.urlsafe_b64encode(), then strip trailing = padding.

Hint 2: The signing input is: base64url(header) + "." + base64url(payload).

Hint 3: Use hmac.new(secret, signing_input.encode(), hashlib.sha256).digest() — note digest() not hexdigest() for the signature bytes.

#3Validate JWT Expiry ClaimEasy
jwtexpclaimsexpiry

Write a check_expiry(payload: dict) -> str function that returns "valid", "EXPIRED", or "missing exp claim".

import time

def check_expiry(payload: dict) -> str:
# validate the exp claim
pass

now = int(time.time())

fresh = {"sub": "user1", "exp": now + 3600}
expired = {"sub": "user2", "exp": now - 60}
no_exp = {"sub": "user3"}

print(f"Fresh token: {check_expiry(fresh)}")
print(f"Expired token: {check_expiry(expired)}")
print(f"Token without exp: {check_expiry(no_exp)}")
Solution
import time

def check_expiry(payload: dict) -> str:
if "exp" not in payload:
return "missing exp claim"
if time.time() > payload["exp"]:
return "EXPIRED"
return "valid"

now = int(time.time())

fresh = {"sub": "user1", "exp": now + 3600}
expired = {"sub": "user2", "exp": now - 60}
no_exp = {"sub": "user3"}

print(f"Fresh token: {check_expiry(fresh)}")
print(f"Expired token: {check_expiry(expired)}")
print(f"Token without exp: {check_expiry(no_exp)}")

Claims you must validate:

  • exp (expiry): Token must not be expired. Use time.time() (UTC epoch seconds).
  • nbf (not before): Token must not be used before this time — prevents clock-skew attacks.
  • iss (issuer): Token must come from the expected issuer. Prevents using a valid token from one service at another.
  • aud (audience): Token must be intended for your service. Prevents token replay across services.
  • Always validate ALL relevant claims — a valid signature with wrong issuer is still a security failure.
Expected Output
Fresh token: valid
Expired token: EXPIRED
Token without exp: missing exp claim
Hints

Hint 1: The exp claim is a Unix timestamp (seconds since epoch). Compare with time.time().

Hint 2: An expired token has exp < time.time().

Hint 3: A token with no exp claim should be rejected if your policy requires expiry.

#4HS256 vs RS256 — Key TypesEasy
hs256rs256symmetricasymmetrickey-types

Demonstrate the difference between HS256 (symmetric) and RS256 (asymmetric) key types.

import hmac, hashlib
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes

# HS256 — symmetric
hs_secret = b"shared-secret-both-sides-need-this"

# RS256 — asymmetric
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()

# Sign a message with HS256
def hs256_sign(message: bytes, secret: bytes) -> bytes:
return hmac.new(secret, message, hashlib.sha256).digest()

# Sign a message with RS256 private key
def rs256_sign(message: bytes, priv_key) -> bytes:
return priv_key.sign(message, padding.PKCS1v15(), hashes.SHA256())

# Verify RS256 with public key
def rs256_verify(message: bytes, sig: bytes, pub_key) -> bool:
try:
pub_key.verify(sig, message, padding.PKCS1v15(), hashes.SHA256())
return True
except Exception:
return False

msg = b"user_id=42"
hs_sig = hs256_sign(msg, hs_secret)
rs_sig = rs256_sign(msg, private_key)

print(f"HS256 uses symmetric key: {isinstance(hs_secret, bytes)}")
print(f"RS256 private key type: {type(private_key).__name__}")
print(f"RS256 public key type: {type(public_key).__name__}")
print(f"Only private key can sign: {rs256_verify(msg, rs_sig, public_key)}")
Solution
import hmac, hashlib
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes

hs_secret = b"shared-secret-both-sides-need-this"

private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()

def hs256_sign(message: bytes, secret: bytes) -> bytes:
return hmac.new(secret, message, hashlib.sha256).digest()

def rs256_sign(message: bytes, priv_key) -> bytes:
return priv_key.sign(message, padding.PKCS1v15(), hashes.SHA256())

def rs256_verify(message: bytes, sig: bytes, pub_key) -> bool:
try:
pub_key.verify(sig, message, padding.PKCS1v15(), hashes.SHA256())
return True
except Exception:
return False

msg = b"user_id=42"
hs_sig = hs256_sign(msg, hs_secret)
rs_sig = rs256_sign(msg, private_key)

print(f"HS256 uses symmetric key: {isinstance(hs_secret, bytes)}")
print(f"RS256 private key type: {type(private_key).__name__}")
print(f"RS256 public key type: {type(public_key).__name__}")
print(f"Only private key can sign: {rs256_verify(msg, rs_sig, public_key)}")

When to use each:

  • HS256: Single service or trusted internal services that share the secret. Simpler but requires distributing the secret securely.
  • RS256: Multiple services (microservices, third-party). Only the auth server has the private key. Any service can verify tokens using the published JWKS (public key endpoint) without needing the secret.
  • ES256 (ECDSA): Same asymmetric model as RS256 but shorter keys and signatures. Preferred for modern systems.
Expected Output
HS256 uses symmetric key: True
RS256 private key type: RSAPrivateKey
RS256 public key type: RSAPublicKey
Only private key can sign: True
Hints

Hint 1: HS256 uses a shared secret (bytes) — both signer and verifier need the same key.

Hint 2: RS256 uses RSA: sign with private key, verify with public key. Use cryptography library.

Hint 3: For RS256, generate a key pair with rsa.generate_private_key(65537, 2048).


Medium

#5Full JWT Verify with Claims ValidationMedium
jwtverifyclaimsissaudexp

Implement a verify_jwt(token, secret, expected_iss, expected_aud) function that validates signature and all standard claims.

import base64, hashlib, hmac, json, time

class JWTError(Exception):
pass

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def create_jwt(payload: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def verify_jwt(token: str, secret: bytes, expected_iss: str, expected_aud: str) -> dict:
# verify signature, exp, iss, aud — raise JWTError on failure
pass

SECRET = b"signing-secret"
now = int(time.time())

valid_payload = {"sub": "user42", "iss": "auth.example.com", "aud": "api.example.com", "exp": now + 3600}
good_token = create_jwt(valid_payload, SECRET)
exp_token = create_jwt({**valid_payload, "exp": now - 60}, SECRET)
iss_token = create_jwt({**valid_payload, "iss": "evil.com"}, SECRET)
aud_token = create_jwt({**valid_payload, "aud": "other-service"}, SECRET)

claims = verify_jwt(good_token, SECRET, "auth.example.com", "api.example.com")
print(f"Valid token claims: {{'sub': '{claims['sub']}', 'iss': '{claims['iss']}', 'aud': '{claims['aud']}'}}")
for tok, label in [(exp_token, "Expired"), (iss_token, "Wrong issuer"), (aud_token, "Wrong audience")]:
try:
verify_jwt(tok, SECRET, "auth.example.com", "api.example.com")
except JWTError as e:
print(f"{label}: JWTError: {e}")
Solution
import base64, hashlib, hmac, json, time

class JWTError(Exception):
pass

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def create_jwt(payload: dict, secret: bytes) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def verify_jwt(token: str, secret: bytes, expected_iss: str, expected_aud: str) -> dict:
try:
h_enc, p_enc, sig_enc = token.split(".")
except ValueError:
raise JWTError("malformed token")

header = json.loads(b64url_decode(h_enc))
if header.get("alg") != "HS256":
raise JWTError("unsupported algorithm")

expected_sig = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
actual_sig = b64url_decode(sig_enc)
if not hmac.compare_digest(expected_sig, actual_sig):
raise JWTError("invalid signature")

payload = json.loads(b64url_decode(p_enc))

if time.time() > payload.get("exp", 0):
raise JWTError("token expired")
if payload.get("iss") != expected_iss:
raise JWTError("invalid issuer")
if payload.get("aud") != expected_aud:
raise JWTError("invalid audience")

return payload

SECRET = b"signing-secret"
now = int(time.time())

valid_payload = {"sub": "user42", "iss": "auth.example.com", "aud": "api.example.com", "exp": now + 3600}
good_token = create_jwt(valid_payload, SECRET)
exp_token = create_jwt({**valid_payload, "exp": now - 60}, SECRET)
iss_token = create_jwt({**valid_payload, "iss": "evil.com"}, SECRET)
aud_token = create_jwt({**valid_payload, "aud": "other-service"}, SECRET)

claims = verify_jwt(good_token, SECRET, "auth.example.com", "api.example.com")
print(f"Valid token claims: {{'sub': '{claims['sub']}', 'iss': '{claims['iss']}', 'aud': '{claims['aud']}'}}")
for tok, label in [(exp_token, "Expired"), (iss_token, "Wrong issuer"), (aud_token, "Wrong audience")]:
try:
verify_jwt(tok, SECRET, "auth.example.com", "api.example.com")
except JWTError as e:
print(f"{label}: JWTError: {e}")

Validation order matters:

  1. Verify signature first — if the signature is invalid, don't bother parsing claims (prevents decoding attacker-crafted payloads).
  2. Check algorithm claim — reject anything that isn't your expected algorithm.
  3. Check expiry — fast reject for stale tokens.
  4. Check issuer and audience — prevent token reuse across services.
Expected Output
Valid token claims: {'sub': 'user42', 'iss': 'auth.example.com', 'aud': 'api.example.com'}
Expired: JWTError: token expired
Wrong issuer: JWTError: invalid issuer
Wrong audience: JWTError: invalid audience
Hints

Hint 1: Verify the HMAC signature first — only then decode and validate claims.

Hint 2: Rebuild the expected signature from header + payload and compare with hmac.compare_digest().

Hint 3: Validate exp, iss, and aud — not just the signature.

#6None Algorithm VulnerabilityMedium
jwtnone-algorithmvulnerabilitysecurity

Demonstrate the "none" algorithm attack and implement a secure verifier that blocks it.

import base64, json, hashlib, hmac

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

class JWTError(Exception):
pass

def craft_none_token(payload: dict) -> str:
# build a JWT with alg=none and empty signature
header = {"alg": "none", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
return f"{h}.{p}." # empty signature

def vulnerable_verify(token: str, secret: bytes) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header.get("alg", "")
if alg == "none":
return json.loads(b64url_decode(p_enc)) # BUG: no verification!
expected = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
return json.loads(b64url_decode(p_enc))

def secure_verify(token: str, secret: bytes, allowed_algs=("HS256",)) -> dict:
# fix the vulnerability — only accept explicitly allowed algorithms
pass

SECRET = b"server-secret"
evil_payload = {"sub": "attacker", "role": "admin", "exp": 9999999999}
none_token = craft_none_token(evil_payload)

result = vulnerable_verify(none_token, SECRET)
print(f"Crafted token accepted by vulnerable verifier: {result['role'] == 'admin'}")
try:
secure_verify(none_token, SECRET)
except JWTError as e:
print(f"Crafted token rejected by secure verifier: JWTError: {e}")
Solution
import base64, json, hashlib, hmac

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

class JWTError(Exception):
pass

def craft_none_token(payload: dict) -> str:
header = {"alg": "none", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
return f"{h}.{p}."

def vulnerable_verify(token: str, secret: bytes) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header.get("alg", "")
if alg == "none":
return json.loads(b64url_decode(p_enc))
expected = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
return json.loads(b64url_decode(p_enc))

def secure_verify(token: str, secret: bytes, allowed_algs=("HS256",)) -> dict:
try:
h_enc, p_enc, sig_enc = token.split(".")
except ValueError:
raise JWTError("malformed token")
header = json.loads(b64url_decode(h_enc))
alg = header.get("alg", "")
if alg not in allowed_algs:
raise JWTError(f"unsupported algorithm: {alg}")
expected = hmac.new(secret, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
return json.loads(b64url_decode(p_enc))

SECRET = b"server-secret"
evil_payload = {"sub": "attacker", "role": "admin", "exp": 9999999999}
none_token = craft_none_token(evil_payload)

result = vulnerable_verify(none_token, SECRET)
print(f"Crafted token accepted by vulnerable verifier: {result['role'] == 'admin'}")
try:
secure_verify(none_token, SECRET)
except JWTError as e:
print(f"Crafted token rejected by secure verifier: JWTError: {e}")

CVE history: Multiple real JWT libraries (node-jsonwebtoken before 4.0, python-jwt before 3.3.4) were vulnerable to this attack. An attacker could craft a token with "alg": "none" and any payload they wanted, and the server would accept it. Always pass an explicit algorithms=["HS256"] argument to PyJWT.decode().

Expected Output
Crafted token accepted by vulnerable verifier: True
Crafted token rejected by secure verifier: JWTError: unsupported algorithm: none
Hints

Hint 1: The "none" algorithm JWT has an empty signature — format is header.payload. (dot then nothing).

Hint 2: A vulnerable verifier skips signature verification when alg is "none".

Hint 3: The fix: always check that alg is in your explicit allowlist before verifying.

#7Algorithm Confusion Attack (RS256 to HS256)Medium
jwtalgorithm-confusionrs256hs256attack

Demonstrate the RS256-to-HS256 algorithm confusion attack and the secure fix.

import base64, json, hashlib, hmac

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

class JWTError(Exception):
pass

# Simulate: server uses RS256, public key is known to attacker
PUBLIC_KEY_BYTES = b"this-is-the-public-key-bytes-attacker-knows"

def attacker_craft_hs256_with_pubkey(payload: dict) -> str:
# attacker signs with HS256 using the public key as the secret
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(PUBLIC_KEY_BYTES, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def confused_verifier(token: str, public_key_bytes: bytes) -> dict:
# BUG: reads alg from header and uses public_key_bytes as HMAC secret for HS256
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header["alg"]
if alg == "HS256": # uses public key as HMAC secret!
expected = hmac.new(public_key_bytes, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if hmac.compare_digest(expected, b64url_decode(sig_enc)):
return json.loads(b64url_decode(p_enc))
raise JWTError("verification failed")

def secure_verifier(token: str, public_key_bytes: bytes, expected_alg: str = "RS256") -> dict:
# fix: reject any token that doesn't use the pinned algorithm
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
if header.get("alg") != expected_alg:
raise JWTError("algorithm mismatch")
raise JWTError("RS256 verification requires real RSA key — algorithm mismatch caught")

evil_payload = {"sub": "attacker", "role": "superadmin"}
attack_token = attacker_craft_hs256_with_pubkey(evil_payload)

result = confused_verifier(attack_token, PUBLIC_KEY_BYTES)
print(f"Attack token accepted by confused verifier: {result['role'] == 'superadmin'}")
try:
secure_verifier(attack_token, PUBLIC_KEY_BYTES, expected_alg="RS256")
except JWTError as e:
print(f"Attack token rejected by secure verifier: JWTError: {e}")
Solution
import base64, json, hashlib, hmac

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

class JWTError(Exception):
pass

PUBLIC_KEY_BYTES = b"this-is-the-public-key-bytes-attacker-knows"

def attacker_craft_hs256_with_pubkey(payload: dict) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(PUBLIC_KEY_BYTES, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def confused_verifier(token: str, public_key_bytes: bytes) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
alg = header["alg"]
if alg == "HS256":
expected = hmac.new(public_key_bytes, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if hmac.compare_digest(expected, b64url_decode(sig_enc)):
return json.loads(b64url_decode(p_enc))
raise JWTError("verification failed")

def secure_verifier(token: str, public_key_bytes: bytes, expected_alg: str = "RS256") -> dict:
h_enc, p_enc, sig_enc = token.split(".")
header = json.loads(b64url_decode(h_enc))
if header.get("alg") != expected_alg:
raise JWTError("algorithm mismatch")
raise JWTError("RS256 verification requires real RSA key — algorithm mismatch caught")

evil_payload = {"sub": "attacker", "role": "superadmin"}
attack_token = attacker_craft_hs256_with_pubkey(evil_payload)

result = confused_verifier(attack_token, PUBLIC_KEY_BYTES)
print(f"Attack token accepted by confused verifier: {result['role'] == 'superadmin'}")
try:
secure_verifier(attack_token, PUBLIC_KEY_BYTES, expected_alg="RS256")
except JWTError as e:
print(f"Attack token rejected by secure verifier: JWTError: {e}")

The attack mechanics: The RS256 public key is public — anyone can read it. If the server verifies HS256 tokens using the public key as the HMAC secret, and an attacker creates an HS256 token signed with that same public key, the confused verifier accepts it. PyJWT 2.x fixed this by requiring algorithms=["RS256"] — the alg header is never trusted alone.

Expected Output
Attack token accepted by confused verifier: True
Attack token rejected by secure verifier: JWTError: algorithm mismatch
Hints

Hint 1: In the RS256-to-HS256 confusion attack, the attacker signs a token with HS256 using the server's PUBLIC key as the HMAC secret.

Hint 2: A confused verifier that does not pin the expected algorithm sees "HS256" in the header and uses the public key as the HMAC secret — which the attacker also has access to.

Hint 3: The fix: the verifier must specify which algorithm it expects — never trust the alg header alone.

#8Refresh Token Rotation PatternMedium
jwtrefresh-tokenrotationtoken-family

Implement a refresh token rotation system with reuse detection.

import secrets
import time

class TokenStore:
"""Simulates server-side refresh token storage."""
def __init__(self):
self._tokens = {} # token -> {user_id, family_id, used, expires}
self._revoked_families = set()

def issue(self, user_id: str, family_id: str = None) -> str:
token = secrets.token_urlsafe(32)
if family_id is None:
family_id = secrets.token_urlsafe(16)
self._tokens[token] = {
"user_id": user_id,
"family_id": family_id,
"used": False,
"expires": time.time() + 86400,
}
return token

def consume(self, token: str):
# returns (user_id, family_id) or raises ValueError
# also detects reuse and revokes family
pass

def is_family_revoked(self, family_id: str) -> bool:
return family_id in self._revoked_families

store = TokenStore()

# Login
family_id = secrets.token_urlsafe(16)
refresh1 = store.issue("user42", family_id)
print(f"Login: access=<token>, refresh={refresh1[:8]}...")

# Use refresh token once
user_id, fam = store.consume(refresh1)
refresh2 = store.issue(user_id, fam)
print(f"Refresh: new access token issued")

# Try to reuse refresh1 (should detect theft)
try:
store.consume(refresh1)
print("Old refresh token rejected: False")
except ValueError:
print("Old refresh token rejected: True")

# Family should now be revoked
print(f"Reuse detected — family revoked: {store.is_family_revoked(family_id)}")
Solution
import secrets
import time

class TokenStore:
def __init__(self):
self._tokens = {}
self._revoked_families = set()

def issue(self, user_id: str, family_id: str = None) -> str:
token = secrets.token_urlsafe(32)
if family_id is None:
family_id = secrets.token_urlsafe(16)
self._tokens[token] = {
"user_id": user_id,
"family_id": family_id,
"used": False,
"expires": time.time() + 86400,
}
return token

def consume(self, token: str):
entry = self._tokens.get(token)
if entry is None:
raise ValueError("unknown token")
if entry["family_id"] in self._revoked_families:
raise ValueError("family revoked")
if entry["used"]:
# Reuse detected — revoke the entire family
self._revoked_families.add(entry["family_id"])
raise ValueError("token reuse detected — family revoked")
if time.time() > entry["expires"]:
raise ValueError("token expired")
entry["used"] = True
return entry["user_id"], entry["family_id"]

def is_family_revoked(self, family_id: str) -> bool:
return family_id in self._revoked_families

store = TokenStore()
family_id = secrets.token_urlsafe(16)
refresh1 = store.issue("user42", family_id)
print(f"Login: access=<token>, refresh={refresh1[:8]}...")

user_id, fam = store.consume(refresh1)
refresh2 = store.issue(user_id, fam)
print(f"Refresh: new access token issued")

try:
store.consume(refresh1)
print("Old refresh token rejected: False")
except ValueError:
print("Old refresh token rejected: True")

print(f"Reuse detected — family revoked: {store.is_family_revoked(family_id)}")

Refresh token rotation security model:

  • Each refresh token is single-use — after use, it is marked consumed and a new one is issued.
  • Token families group all refresh tokens issued from a single login session.
  • If an already-used token is presented again, it means either the server or client had the token stolen — the entire family is revoked, forcing re-login.
  • Access tokens are short-lived JWTs (15 min). Refresh tokens are long-lived opaque server-side tokens (7–30 days).
Expected Output
Login: access=<token>, refresh=<token>
Refresh: new access token issued
Old refresh token rejected: True
Reuse detected — family revoked: True
Hints

Hint 1: A refresh token is a long-lived opaque token stored server-side. The access token is a short-lived JWT.

Hint 2: On each refresh, issue new access + refresh tokens and invalidate the old refresh token.

Hint 3: Token family tracking: if an already-used refresh token is presented again, revoke the entire family (sign of token theft).


Hard

#9JWKS Endpoint Parser and Key SelectorHard
jwkskidkey-rotationrs256public-key

Implement a JWKS cache that parses a JWKS document and selects the correct key based on the JWT kid header.

import json
import base64

# Simulated JWKS response (simplified — real JWKS has n, e fields for RSA)
JWKS = {
"keys": [
{"kty": "RSA", "kid": "key-2023-12", "alg": "RS256", "use": "sig", "n": "abc123", "e": "AQAB"},
{"kty": "RSA", "kid": "key-2024-01", "alg": "RS256", "use": "sig", "n": "def456", "e": "AQAB"},
]
}

class JWKSCache:
def __init__(self, jwks: dict):
# build internal index
pass

def get_key(self, kid: str) -> dict:
# return the JWK dict for kid, raise KeyError if not found
pass

@property
def key_ids(self):
pass

cache = JWKSCache(JWKS)
print(f"JWKS has {len(cache.key_ids)} keys")

key = cache.get_key("key-2024-01")
print(f"Selected key id: {key['kid']}")
print(f"Correct key selected for kid: {key['n'] == 'def456'}")

try:
cache.get_key("nonexistent-kid")
print("Unknown kid raises: False")
except KeyError:
print("Unknown kid raises: True")
Solution
import json

JWKS = {
"keys": [
{"kty": "RSA", "kid": "key-2023-12", "alg": "RS256", "use": "sig", "n": "abc123", "e": "AQAB"},
{"kty": "RSA", "kid": "key-2024-01", "alg": "RS256", "use": "sig", "n": "def456", "e": "AQAB"},
]
}

class JWKSCache:
def __init__(self, jwks: dict):
self._keys = {key["kid"]: key for key in jwks.get("keys", [])}

def get_key(self, kid: str) -> dict:
if kid not in self._keys:
raise KeyError(f"Unknown kid: {kid}")
return self._keys[kid]

@property
def key_ids(self):
return list(self._keys.keys())

cache = JWKSCache(JWKS)
print(f"JWKS has {len(cache.key_ids)} keys")

key = cache.get_key("key-2024-01")
print(f"Selected key id: {key['kid']}")
print(f"Correct key selected for kid: {key['n'] == 'def456'}")

try:
cache.get_key("nonexistent-kid")
print("Unknown kid raises: False")
except KeyError:
print("Unknown kid raises: True")

JWKS in production:

  • Auth servers (Keycloak, Auth0, Okta) publish their JWKS at /.well-known/jwks.json.
  • Cache the JWKS in memory with a TTL — refetch on cache miss or kid not found (allows key rotation without downtime).
  • Never disable kid validation — an attacker could craft a token with a kid pointing to a key they control if your verifier doesn't validate it.
  • The use: "sig" field distinguishes signing keys from encryption keys — only accept keys with use: "sig" for JWT verification.
Expected Output
JWKS has 2 keys
Selected key id: key-2024-01
Correct key selected for kid: True
Unknown kid raises: True
Hints

Hint 1: A JWKS (JSON Web Key Set) is a JSON object with a "keys" array. Each key has a "kid" (key ID).

Hint 2: The JWT header contains a "kid" field — use it to look up the correct verification key from the JWKS.

Hint 3: Build a dict mapping kid to key object for O(1) lookup.

#10JWT Blacklist with Redis-Style StoreHard
jwtblacklistrevocationjtilogout

Implement JWT revocation using a jti blacklist, simulating a Redis-like TTL store.

import hashlib, hmac, base64, json, time, secrets

class JTIBlacklist:
"""In-memory blacklist with TTL support (simulates Redis)."""
def __init__(self):
self._store = {} # jti -> expires_at

def add(self, jti: str, ttl_seconds: int) -> None:
self._store[jti] = time.time() + ttl_seconds

def is_revoked(self, jti: str) -> bool:
expires = self._store.get(jti)
if expires is None:
return False
if time.time() > expires:
del self._store[jti]
return False
return True

def cleanup_expired(self) -> int:
now = time.time()
expired = [k for k, v in self._store.items() if v < now]
for k in expired:
del self._store[k]
return len(expired)

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

SECRET = b"signing-secret"
blacklist = JTIBlacklist()

class JWTError(Exception):
pass

def create_jwt(payload: dict) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(SECRET, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def verify_jwt(token: str) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
expected = hmac.new(SECRET, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
payload = json.loads(b64url_decode(p_enc))
if time.time() > payload.get("exp", 0):
raise JWTError("token expired")
jti = payload.get("jti")
if jti and blacklist.is_revoked(jti):
raise JWTError("token revoked")
return payload

def logout(token: str) -> None:
# add jti to blacklist with remaining TTL
pass

now = int(time.time())
payload = {"sub": "user42", "jti": secrets.token_urlsafe(16), "exp": now + 3600, "iat": now}
token = create_jwt(payload)
jti = json.loads(b64url_decode(token.split(".")[1]))["jti"]
print(f"Token issued with jti")

claims = verify_jwt(token)
print(f"Token valid before revoke: {claims['sub'] == 'user42'}")

logout(token)
try:
verify_jwt(token)
except JWTError as e:
print(f"Token invalid after revoke: JWTError: {e}")

# Simulate expired jti
blacklist.add("old-jti", ttl_seconds=-1)
cleaned = blacklist.cleanup_expired()
print(f"Expired jti cleaned up: {cleaned >= 1}")
Solution
import hashlib, hmac, base64, json, time, secrets

class JTIBlacklist:
def __init__(self):
self._store = {}

def add(self, jti: str, ttl_seconds: int) -> None:
self._store[jti] = time.time() + ttl_seconds

def is_revoked(self, jti: str) -> bool:
expires = self._store.get(jti)
if expires is None:
return False
if time.time() > expires:
del self._store[jti]
return False
return True

def cleanup_expired(self) -> int:
now = time.time()
expired = [k for k, v in self._store.items() if v < now]
for k in expired:
del self._store[k]
return len(expired)

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(part: str) -> bytes:
return base64.urlsafe_b64decode(part + "=" * (4 - len(part) % 4))

SECRET = b"signing-secret"
blacklist = JTIBlacklist()

class JWTError(Exception):
pass

def create_jwt(payload: dict) -> str:
header = {"alg": "HS256", "typ": "JWT"}
h = b64url_encode(json.dumps(header, separators=(",", ":")).encode())
p = b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(SECRET, f"{h}.{p}".encode(), hashlib.sha256).digest()
return f"{h}.{p}.{b64url_encode(sig)}"

def verify_jwt(token: str) -> dict:
h_enc, p_enc, sig_enc = token.split(".")
expected = hmac.new(SECRET, f"{h_enc}.{p_enc}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, b64url_decode(sig_enc)):
raise JWTError("invalid signature")
payload = json.loads(b64url_decode(p_enc))
if time.time() > payload.get("exp", 0):
raise JWTError("token expired")
jti = payload.get("jti")
if jti and blacklist.is_revoked(jti):
raise JWTError("token revoked")
return payload

def logout(token: str) -> None:
payload = json.loads(b64url_decode(token.split(".")[1]))
jti = payload.get("jti")
exp = payload.get("exp", int(time.time()))
ttl = max(0, exp - int(time.time()))
if jti:
blacklist.add(jti, ttl)

now = int(time.time())
payload = {"sub": "user42", "jti": secrets.token_urlsafe(16), "exp": now + 3600, "iat": now}
token = create_jwt(payload)
jti = json.loads(b64url_decode(token.split(".")[1]))["jti"]
print(f"Token issued with jti")

claims = verify_jwt(token)
print(f"Token valid before revoke: {claims['sub'] == 'user42'}")

logout(token)
try:
verify_jwt(token)
except JWTError as e:
print(f"Token invalid after revoke: JWTError: {e}")

blacklist.add("old-jti", ttl_seconds=-1)
cleaned = blacklist.cleanup_expired()
print(f"Expired jti cleaned up: {cleaned >= 1}")

Blacklist TTL optimization: The blacklist entry only needs to live as long as the token would have been valid — once the token expires naturally, you don't need to check the blacklist. Setting TTL = remaining token lifetime keeps the blacklist small.

Expected Output
Token issued with jti
Token valid before revoke: True
Token invalid after revoke: JWTError: token revoked
Expired jti cleaned up: True
Hints

Hint 1: Add a "jti" (JWT ID) claim — a unique identifier per token — so you can blacklist individual tokens.

Hint 2: On logout, add the jti to the blacklist with TTL equal to the remaining token lifetime.

Hint 3: Before accepting a token, check the blacklist. If jti is present, reject.

#11JWT Audit Log and Anomaly DetectionHard
jwtauditanomalyip-bindingreplay

Build a JWT audit logger that detects IP binding violations and rapid token reuse anomalies.

import time
from collections import defaultdict

class JWTAuditLog:
def __init__(self, rapid_reuse_window: float = 1.0):
self._log = []
self._jti_registry = {} # jti -> {first_ip, last_seen, use_count}
self._rapid_window = rapid_reuse_window

def record(self, jti: str, user_id: str, ip: str) -> list:
"""Record a token use. Returns list of anomaly strings (empty = clean)."""
now = time.time()
anomalies = []

if jti in self._jti_registry:
entry = self._jti_registry[jti]
# check ip binding
if entry["first_ip"] != ip:
anomalies.append("ip_mismatch")
# check rapid reuse
if now - entry["last_seen"] < self._rapid_window:
anomalies.append("rapid_reuse")
entry["use_count"] += 1
entry["last_seen"] = now
else:
self._jti_registry[jti] = {
"first_ip": ip,
"last_seen": now,
"use_count": 1,
"user_id": user_id,
}

self._log.append({
"ts": now, "jti": jti, "user_id": user_id,
"ip": ip, "anomalies": anomalies,
})
return anomalies

def get_log(self):
return list(self._log)

audit = JWTAuditLog(rapid_reuse_window=1.0)
jti = "test-jti-001"

print("Token issued for user42")
for use_num, ip, label in [
(1, "192.168.1.1", "First use from 192.168.1.1"),
(2, "192.168.1.1", "Second use from 192.168.1.1"),
(3, "10.0.0.99", "Use from different IP"),
]:
anomalies = audit.record(jti, "user42", ip)
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"{label}: {status}")

# Rapid reuse simulation
jti2 = "test-jti-002"
audit.record(jti2, "user42", "192.168.1.1")
anomalies = audit.record(jti2, "user42", "192.168.1.1") # immediate reuse
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"Replay within 1 second: {status}")
Solution
import time

class JWTAuditLog:
def __init__(self, rapid_reuse_window: float = 1.0):
self._log = []
self._jti_registry = {}
self._rapid_window = rapid_reuse_window

def record(self, jti: str, user_id: str, ip: str) -> list:
now = time.time()
anomalies = []

if jti in self._jti_registry:
entry = self._jti_registry[jti]
if entry["first_ip"] != ip:
anomalies.append("ip_mismatch")
if now - entry["last_seen"] < self._rapid_window:
anomalies.append("rapid_reuse")
entry["use_count"] += 1
entry["last_seen"] = now
else:
self._jti_registry[jti] = {
"first_ip": ip,
"last_seen": now,
"use_count": 1,
"user_id": user_id,
}

self._log.append({
"ts": now, "jti": jti, "user_id": user_id,
"ip": ip, "anomalies": anomalies,
})
return anomalies

def get_log(self):
return list(self._log)

audit = JWTAuditLog(rapid_reuse_window=1.0)
jti = "test-jti-001"

print("Token issued for user42")
for use_num, ip, label in [
(1, "192.168.1.1", "First use from 192.168.1.1"),
(2, "192.168.1.1", "Second use from 192.168.1.1"),
(3, "10.0.0.99", "Use from different IP"),
]:
anomalies = audit.record(jti, "user42", ip)
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"{label}: {status}")

jti2 = "test-jti-002"
audit.record(jti2, "user42", "192.168.1.1")
anomalies = audit.record(jti2, "user42", "192.168.1.1")
status = f"ANOMALY: {', '.join(anomalies)}" if anomalies else "OK"
print(f"Replay within 1 second: {status}")

Production anomaly detection patterns:

  • IP binding: Hard binding to IP is too strict for mobile users. Soft binding flags changes as anomalies without blocking — route to step-up authentication.
  • Rapid reuse: A legitimate user will not use the same access token twice within milliseconds. Rapid reuse indicates token interception and replay.
  • Use count: Access tokens used hundreds of times per minute from one user are suspicious — possible credential stuffing or shared token.
  • Feed anomaly events into a SIEM (Splunk, Datadog Security) for real-time alerting.
Expected Output
Token issued for user42
First use from 192.168.1.1: OK
Second use from 192.168.1.1: OK
Use from different IP: ANOMALY: ip_mismatch
Replay within 1 second: ANOMALY: rapid_reuse
Hints

Hint 1: An audit log records every token verification attempt: timestamp, IP, user, outcome.

Hint 2: IP binding: if the same jti is used from a different IP, flag as anomalous.

Hint 3: Rapid reuse: if the same jti is used more than once within a short window, flag potential replay.

© 2026 EngineersOfAI. All rights reserved.