klg-asutk-app/backend/app/services/ws_manager.py
Yuriy fabe4fa72f feat(backend): 6 модулей для main/роутов + ws_notifications
- api/helpers: audit, is_authority, get_org_name, paginate_query, require_roles
- services/ws_manager: connect(ws, user_id, org_id), send_to_user, send_to_org, broadcast, make_notification(event, entity_type, entity_id, **extra)
- services/risk_scheduler: setup_scheduler (заглушка/APScheduler)
- services/email_service: email_service.send (заглушка)
- middleware/request_logger: RequestLoggerMiddleware
- core/rate_limit: RateLimitMiddleware (in-memory, RATE_LIMIT_PER_MINUTE, /health в обход)
- api/routes/ws_notifications: WebSocket /ws/notifications?user_id=&org_id=

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-14 23:21:57 +03:00

179 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WebSocket Connection Manager — real-time push для критических событий.
Поддерживает: connect(ws, user_id, org_id), send_to_user, send_to_org, broadcast.
Типы событий: ad_new_mandatory, defect_critical, life_limit_critical, wo_aog, wo_closed_crs и др.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Set
from fastapi import WebSocket
logger = logging.getLogger(__name__)
class ConnectionManager:
"""Управление WebSocket: по user_id и org_id, плюс room для обратной совместимости."""
def __init__(self):
self.active: Dict[str, Set[WebSocket]] = {} # room -> set of websockets
self._global: Set[WebSocket] = set()
# По user_id / org_id для send_to_user, send_to_org
self._connections: Dict[str, List[WebSocket]] = {} # user_id -> list[WebSocket]
self._org_users: Dict[str, Set[str]] = {} # org_id -> set[user_id]
async def connect(self, websocket: WebSocket, user_id: str | None = None, org_id: str | None = None, room: str = "global"):
await websocket.accept()
self._global.add(websocket)
self.active.setdefault(room, set()).add(websocket)
if user_id:
self._connections.setdefault(user_id, []).append(websocket)
if org_id:
self._org_users.setdefault(org_id, set()).add(user_id)
logger.info("WS connected: user_id=%s org_id=%s room=%s total=%d", user_id, org_id, room, len(self._global))
def disconnect(self, websocket: WebSocket, user_id: str | None = None, org_id: str | None = None, room: str = "global"):
self._global.discard(websocket)
if room in self.active:
self.active[room].discard(websocket)
if user_id and user_id in self._connections:
conns = self._connections[user_id]
if websocket in conns:
conns.remove(websocket)
if not conns:
del self._connections[user_id]
if org_id and org_id in self._org_users:
self._org_users[org_id].discard(user_id)
logger.info("WS disconnected: total=%d", len(self._global))
async def send_to_user(self, user_id: str, data: dict) -> None:
"""Отправить данные одному пользователю (всем его соединениям)."""
for ws in self._connections.get(user_id, []):
try:
await ws.send_text(json.dumps(data))
except Exception:
logger.warning("Failed to send WS to user %s", user_id)
async def send_to_org(self, org_id: str | None, data: dict) -> None:
"""Отправить данные всем пользователям организации."""
if not org_id:
return
for uid in self._org_users.get(org_id, set()):
if uid:
await self.send_to_user(uid, data)
async def broadcast(self, event_type_or_data: str | dict, data: dict | None = None, room: str = "global"):
"""Либо broadcast(data) — один dict для всех, либо broadcast(event_type, data, room) — по комнатам."""
if data is None:
# Один аргумент — payload dict, отправить всем (cert_applications, checklist_audits)
payload = event_type_or_data
if not isinstance(payload, dict):
payload = {"event": str(event_type_or_data)}
msg = json.dumps({**payload, "timestamp": datetime.now(timezone.utc).isoformat()})
disconnected = set()
for ws in self._global:
try:
await ws.send_text(msg)
except Exception:
disconnected.add(ws)
for ws in disconnected:
self._global.discard(ws)
if self._global:
logger.info("WS broadcast payload: sent=%d", len(self._global) - len(disconnected))
else:
# Старый формат: event_type, data, room
message = json.dumps({
"type": event_type_or_data,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
targets = self.active.get(room, set()) | self._global
disconnected = set()
for ws in targets:
try:
await ws.send_text(message)
except Exception:
disconnected.add(ws)
for ws in disconnected:
self.disconnect(ws, room=room)
if targets:
logger.info("WS broadcast: type=%s room=%s sent=%d", event_type_or_data, room, len(targets) - len(disconnected))
async def send_personal(self, websocket: WebSocket, event_type: str, data: dict):
"""Отправить событие одному клиенту."""
try:
await websocket.send_text(json.dumps({
"type": event_type,
"data": data,
"timestamp": datetime.now(timezone.utc).isoformat(),
}))
except Exception:
self._global.discard(websocket)
# Singleton
ws_manager = ConnectionManager()
# === Helper functions for broadcasting from routes ===
async def notify_new_ad(ad_number: str, aircraft_types: list, compliance_type: str):
"""Уведомление о новой ДЛГ."""
if compliance_type == "mandatory":
await ws_manager.broadcast("ad_new_mandatory", {
"ad_number": ad_number,
"aircraft_types": aircraft_types,
"severity": "critical",
"message": f"⚠️ Новая обязательная ДЛГ: {ad_number}",
})
async def notify_critical_defect(aircraft_reg: str, description: str, defect_id: str):
"""Уведомление о критическом дефекте."""
await ws_manager.broadcast("defect_critical", {
"aircraft_reg": aircraft_reg,
"description": description[:100],
"defect_id": defect_id,
"severity": "critical",
"message": f"🔴 Критический дефект: {aircraft_reg}",
})
async def notify_wo_aog(wo_number: str, aircraft_reg: str):
"""Уведомление о наряде AOG."""
await ws_manager.broadcast("wo_aog", {
"wo_number": wo_number,
"aircraft_reg": aircraft_reg,
"severity": "critical",
"message": f"🔴 AOG: {aircraft_reg} — наряд {wo_number}",
})
async def notify_wo_closed(wo_number: str, aircraft_reg: str, crs_by: str):
"""Уведомление о закрытии наряда с CRS."""
await ws_manager.broadcast("wo_closed_crs", {
"wo_number": wo_number,
"aircraft_reg": aircraft_reg,
"crs_signed_by": crs_by,
"message": f"✅ CRS: {aircraft_reg} — наряд {wo_number}",
})
async def notify_life_limit_critical(component: str, serial: str, remaining: dict):
"""Уведомление об исчерпании ресурса."""
await ws_manager.broadcast("life_limit_critical", {
"component": component,
"serial_number": serial,
"remaining": remaining,
"severity": "critical",
"message": f"⏱️ КРИТИЧЕСКИЙ РЕСУРС: {component} S/N {serial}",
})
def make_notification(event: str, entity_type: str, entity_id: str, **extra: Any) -> dict:
"""Payload для отправки по WebSocket (broadcast / send_to_org)."""
return {"event": event, "entity_type": entity_type, "entity_id": entity_id, **extra}