klg-asutk-app/backend/app/services/security.py
Yuriy 0150aba4f5 Consolidation: KLG ASUTK + PAPA integration
- Unify API: lib/api.ts uses /api/v1, inbox uses /api/inbox (rewrites)
- Remove localhost refs: openapi, inbox page
- Add rewrites: /api/inbox|tmc -> inbox-server, /api/v1 -> FastAPI
- Add stub routes: knowledge/insights, recommendations, search, log-error
- Transfer from PAPA: prompts (inspection, tmc), scripts, supabase, data/tmc-requests
- Fix inbox-server: ORDER BY created_at, package.json
- Remove redundant app/api/inbox/files route (rewrites handle it)
- knowledge/ in gitignore (large PDFs)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-08 17:18:31 +03:00

94 lines
2.8 KiB
Python

from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Any
import httpx
from jose import jwt
from jose.exceptions import JWTError
from app.core.config import settings
# DEV bypass: включается ТОЛЬКО если ENABLE_DEV_AUTH=true
ENABLE_DEV_AUTH = os.getenv("ENABLE_DEV_AUTH", "false").strip().lower() == "true"
_DEV_TOKEN_NORM = (os.getenv("DEV_TOKEN") or "dev").strip().lower()
def _is_dev_token(t: str) -> bool:
if not ENABLE_DEV_AUTH:
return False
return (t or "").strip().lower() == _DEV_TOKEN_NORM
@dataclass
class TokenUser:
sub: str
display_name: str
email: str | None
role: str
org_id: str | None
class AuthError(Exception):
pass
_jwks_cache: dict[str, Any] | None = None
async def _get_jwks() -> dict[str, Any]:
global _jwks_cache
if _jwks_cache is not None:
return _jwks_cache
async with httpx.AsyncClient(timeout=settings.oidc_timeout_s if hasattr(settings, 'oidc_timeout_s') else 20) as client:
r = await client.get(settings.oidc_jwks_url)
r.raise_for_status()
_jwks_cache = r.json()
return _jwks_cache
async def decode_token(token: str) -> dict[str, Any]:
"""Validate and decode JWT.
Production: validate against OIDC JWKS from ASU TK-IB.
Dev: may accept HS256 using jwt_secret when ENABLE_DEV_AUTH=true.
"""
if _is_dev_token(token):
return {"sub": "dev", "name": "Dev User", "email": "dev@local", "role": "admin", "org_id": None}
if ENABLE_DEV_AUTH and hasattr(settings, 'allow_hs256_dev_tokens') and settings.allow_hs256_dev_tokens:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_alg], options={"verify_aud": False})
except JWTError:
pass
jwks = await _get_jwks()
try:
header = jwt.get_unverified_header(token)
kid = header.get("kid")
keys = jwks.get("keys", [])
key = next((k for k in keys if k.get("kid") == kid), None)
if not key:
raise AuthError("Unknown key id")
return jwt.decode(
token,
key,
algorithms=[header.get("alg", "RS256")],
issuer=settings.oidc_issuer,
audience=settings.oidc_audience,
)
except JWTError as e:
raise AuthError("Invalid token") from e
def token_to_user(claims: dict[str, Any]) -> TokenUser:
role = claims.get("role") or claims.get("realm_access", {}).get("roles", [None])[0] or "operator_user"
return TokenUser(
sub=str(claims.get("sub")),
display_name=str(claims.get("name") or claims.get("preferred_username") or claims.get("sub")),
email=claims.get("email"),
role=str(role),
org_id=claims.get("org_id"),
)