klg-asutk-app/backend/app/integration/piv.py
Yuriy 0150aba4f5 Consolidation: KLG ASUTK + PAPA integration
- Unify API: lib/api.ts uses /api/v1, inbox uses /api/inbox (rewrites)
- Remove localhost refs: openapi, inbox page
- Add rewrites: /api/inbox|tmc -> inbox-server, /api/v1 -> FastAPI
- Add stub routes: knowledge/insights, recommendations, search, log-error
- Transfer from PAPA: prompts (inspection, tmc), scripts, supabase, data/tmc-requests
- Fix inbox-server: ORDER BY created_at, package.json
- Remove redundant app/api/inbox/files route (rewrites handle it)
- knowledge/ in gitignore (large PDFs)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-08 17:18:31 +03:00

51 lines
1.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""П-ИВ integration stubs.
Соответствует требованиям ТЗ: использование П-ИВ АСУ ТК для интеграции и взаимодействия.
In the ASU TK variant, inbound/outbound data flows should go through П-ИВ.
This module provides a minimal HTTP client interface that can be swapped with
actual adapters when endpoint contracts are agreed.
Требуется уточнение согласно ТЗ:
- Форматы сообщений
- Расписания синхронизации
- ETL-pipeline
- Протоколирование
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import httpx
from app.core.config import settings
@dataclass
class PIVResult:
ok: bool
status_code: int
payload: dict[str, Any] | None
error: str | None = None
async def push_event(event_type: str, payload: dict[str, Any]) -> PIVResult:
"""Send an event to П-ИВ (prototype).
Production mapping examples:
- data upload logs
- status changes (application workflow)
"""
url = f"{settings.piv_base_url.rstrip('/')}/events"
body = {"type": event_type, "payload": payload}
try:
async with httpx.AsyncClient(timeout=settings.piv_timeout_s) as client:
r = await client.post(url, json=body)
if 200 <= r.status_code < 300:
return PIVResult(ok=True, status_code=r.status_code, payload=r.json() if r.content else None)
return PIVResult(ok=False, status_code=r.status_code, payload=None, error=r.text)
except Exception as e:
return PIVResult(ok=False, status_code=0, payload=None, error=str(e))