- Unify API: lib/api.ts uses /api/v1, inbox uses /api/inbox (rewrites) - Remove localhost refs: openapi, inbox page - Add rewrites: /api/inbox|tmc -> inbox-server, /api/v1 -> FastAPI - Add stub routes: knowledge/insights, recommendations, search, log-error - Transfer from PAPA: prompts (inspection, tmc), scripts, supabase, data/tmc-requests - Fix inbox-server: ORDER BY created_at, package.json - Remove redundant app/api/inbox/files route (rewrites handle it) - knowledge/ in gitignore (large PDFs) Co-authored-by: Cursor <cursoragent@cursor.com>
94 lines
2.8 KiB
Python
94 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from jose import jwt
|
|
from jose.exceptions import JWTError
|
|
|
|
from app.core.config import settings
|
|
|
|
# DEV bypass: включается ТОЛЬКО если ENABLE_DEV_AUTH=true
|
|
ENABLE_DEV_AUTH = os.getenv("ENABLE_DEV_AUTH", "false").strip().lower() == "true"
|
|
_DEV_TOKEN_NORM = (os.getenv("DEV_TOKEN") or "dev").strip().lower()
|
|
|
|
|
|
def _is_dev_token(t: str) -> bool:
|
|
if not ENABLE_DEV_AUTH:
|
|
return False
|
|
return (t or "").strip().lower() == _DEV_TOKEN_NORM
|
|
|
|
|
|
@dataclass
|
|
class TokenUser:
|
|
sub: str
|
|
display_name: str
|
|
email: str | None
|
|
role: str
|
|
org_id: str | None
|
|
|
|
|
|
class AuthError(Exception):
|
|
pass
|
|
|
|
|
|
_jwks_cache: dict[str, Any] | None = None
|
|
|
|
|
|
async def _get_jwks() -> dict[str, Any]:
|
|
global _jwks_cache
|
|
if _jwks_cache is not None:
|
|
return _jwks_cache
|
|
async with httpx.AsyncClient(timeout=settings.oidc_timeout_s if hasattr(settings, 'oidc_timeout_s') else 20) as client:
|
|
r = await client.get(settings.oidc_jwks_url)
|
|
r.raise_for_status()
|
|
_jwks_cache = r.json()
|
|
return _jwks_cache
|
|
|
|
|
|
async def decode_token(token: str) -> dict[str, Any]:
|
|
"""Validate and decode JWT.
|
|
|
|
Production: validate against OIDC JWKS from ASU TK-IB.
|
|
Dev: may accept HS256 using jwt_secret when ENABLE_DEV_AUTH=true.
|
|
"""
|
|
if _is_dev_token(token):
|
|
return {"sub": "dev", "name": "Dev User", "email": "dev@local", "role": "admin", "org_id": None}
|
|
|
|
if ENABLE_DEV_AUTH and hasattr(settings, 'allow_hs256_dev_tokens') and settings.allow_hs256_dev_tokens:
|
|
try:
|
|
return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_alg], options={"verify_aud": False})
|
|
except JWTError:
|
|
pass
|
|
|
|
jwks = await _get_jwks()
|
|
try:
|
|
header = jwt.get_unverified_header(token)
|
|
kid = header.get("kid")
|
|
keys = jwks.get("keys", [])
|
|
key = next((k for k in keys if k.get("kid") == kid), None)
|
|
if not key:
|
|
raise AuthError("Unknown key id")
|
|
return jwt.decode(
|
|
token,
|
|
key,
|
|
algorithms=[header.get("alg", "RS256")],
|
|
issuer=settings.oidc_issuer,
|
|
audience=settings.oidc_audience,
|
|
)
|
|
except JWTError as e:
|
|
raise AuthError("Invalid token") from e
|
|
|
|
|
|
def token_to_user(claims: dict[str, Any]) -> TokenUser:
|
|
role = claims.get("role") or claims.get("realm_access", {}).get("roles", [None])[0] or "operator_user"
|
|
return TokenUser(
|
|
sub=str(claims.get("sub")),
|
|
display_name=str(claims.get("name") or claims.get("preferred_username") or claims.get("sub")),
|
|
email=claims.get("email"),
|
|
role=str(role),
|
|
org_id=claims.get("org_id"),
|
|
)
|