refactor: legal package, personnel package, FGIS base_service, docs/SECURITY

- .gitignore: backend/venv/
- legal: routes/legal/ (base, handlers), legal_legacy.py
- personnel: routes/personnel/ re-exports personnel_plg
- FGIS: fgis/base_service.py, fgis_revs imports from fgis
- docs/SECURITY.md: security guide
- lib/logger.ts, logger-client.ts

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Yuriy 2026-02-14 21:37:46 +03:00
parent aa052763f6
commit d9dd6d66cd
13 changed files with 1590 additions and 23 deletions

3
.gitignore vendored
View File

@ -10,6 +10,7 @@ build/
dist/
# Python
backend/venv/
__pycache__/
*.py[cod]
*.egg-info/
@ -18,7 +19,7 @@ htmlcov/
.coverage
*.egg
# Environment & secrets (никогда не коммитить)
# Защита .env файлов и секретов (никогда не коммитить)
.env
.env.local
.env.development

View File

@ -0,0 +1,333 @@
"""
API маршруты интеграции с ФГИС РЭВС.
Правовые основания:
- ВК РФ ст. 33 Государственный реестр ГА ВС РФ
- ВК РФ ст. 36 Сертификат лётной годности
- ВК РФ ст. 37.2 Поддержание ЛГ (ФЗ-488)
- Приказ Росавиации 180-П ФГИС РЭВС
- ФАП-148 п.4.3 информирование ФАВТ о выполнении ДЛГ
- ФАП-128 обязательные донесения
"""
import logging
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_user
from app.api.helpers import audit
from app.services.fgis import SyncDirection
from app.services.fgis_revs import fgis_client
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/fgis-revs", tags=["fgis-revs"])
# ===================================================================
# PULL — получение данных из ФГИС РЭВС
# ===================================================================
@router.get("/aircraft-registry")
def get_fgis_aircraft(
registration: Optional[str] = None,
user=Depends(get_current_user),
):
"""
Запрос реестра ВС из ФГИС РЭВС.
ВК РФ ст. 33: государственный реестр ГА ВС РФ.
"""
aircraft = fgis_client.pull_aircraft_registry(registration)
return {
"source": "ФГИС РЭВС",
"legal_basis": "ВК РФ ст. 33; Приказ Минтранса № 98",
"total": len(aircraft),
"items": [a.__dict__ for a in aircraft],
}
@router.get("/certificates")
def get_fgis_certificates(
registration: Optional[str] = None,
user=Depends(get_current_user),
):
"""
Запрос СЛГ из ФГИС РЭВС.
ВК РФ ст. 36: удостоверение (сертификат) лётной годности.
"""
certs = fgis_client.pull_certificates(registration)
return {
"source": "ФГИС РЭВС",
"legal_basis": "ВК РФ ст. 36",
"total": len(certs),
"items": [c.__dict__ for c in certs],
}
@router.get("/operators")
def get_fgis_operators(user=Depends(get_current_user)):
"""
Реестр эксплуатантов из ФГИС РЭВС.
ФАП-246: сертификация эксплуатантов.
"""
operators = fgis_client.pull_operators()
return {
"source": "ФГИС РЭВС",
"legal_basis": "ВК РФ ст. 8; ФАП-246",
"total": len(operators),
"items": [o.__dict__ for o in operators],
}
@router.get("/directives")
def get_fgis_directives(
since_days: int = Query(30, ge=1, le=365),
user=Depends(get_current_user),
):
"""
Директивы ЛГ из ФГИС РЭВС за последние N дней.
ВК РФ ст. 37: обязательные ДЛГ.
"""
directives = fgis_client.pull_directives()
return {
"source": "ФГИС РЭВС",
"legal_basis": "ВК РФ ст. 37; ФАП-148 п.4.3",
"total": len(directives),
"items": [d.__dict__ for d in directives],
}
@router.get("/maintenance-organizations")
def get_fgis_maint_orgs(user=Depends(get_current_user)):
"""Реестр организаций по ТО (ФАП-145) из ФГИС РЭВС."""
orgs = fgis_client.pull_maint_organizations()
return {
"source": "ФГИС РЭВС",
"legal_basis": "ФАП-145",
"total": len(orgs),
"items": [o.__dict__ for o in orgs],
}
# ===================================================================
# PUSH — отправка данных в ФГИС РЭВС
# ===================================================================
class ComplianceReport(BaseModel):
directive_number: str
aircraft_registration: str
compliance_date: str
work_order_number: str
crs_signed_by: str
notes: str = ""
@router.post("/push/compliance-report")
def push_compliance(
data: ComplianceReport,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""
Отправить отчёт о выполнении ДЛГ в ФГИС РЭВС.
ФАП-148 п.4.3: эксплуатант обязан информировать ФАВТ.
"""
result = fgis_client.push_compliance_report(data.dict())
audit(db, user, "fgis_push", "compliance_report",
description=f"ДЛГ {data.directive_number} → ФГИС РЭВС: {result.get('status', '?')}")
db.commit()
return {
"action": "push_compliance",
"legal_basis": "ФАП-148 п.4.3",
"result": result,
}
class MaintenanceReport(BaseModel):
work_order_number: str
aircraft_registration: str
work_type: str
completion_date: str
crs_signed_by: str
actual_manhours: float = 0
findings: str = ""
@router.post("/push/maintenance-report")
def push_maintenance(
data: MaintenanceReport,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""
Отправить данные о выполненном ТО (CRS) в ФГИС РЭВС.
ФАП-145 п.A.55: документация о выполненном ТО.
"""
result = fgis_client.push_maintenance_report(data.dict())
audit(db, user, "fgis_push", "maintenance_report",
description=f"WO {data.work_order_number} → ФГИС РЭВС: {result.get('status', '?')}")
db.commit()
return {"action": "push_maintenance", "legal_basis": "ФАП-145 п.A.55", "result": result}
class DefectReport(BaseModel):
aircraft_registration: str
defect_description: str
severity: str
ata_chapter: str = ""
discovered_during: str = ""
@router.post("/push/defect-report")
def push_defect(
data: DefectReport,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""
Обязательное донесение о дефекте в ФАВТ через ФГИС РЭВС.
ФАП-128: обязательные донесения о событиях с ВС.
"""
result = fgis_client.push_defect_report(data.dict())
audit(db, user, "fgis_push", "defect_report",
description=f"Дефект {data.aircraft_registration} → ФГИС РЭВС")
db.commit()
return {"action": "push_defect", "legal_basis": "ФАП-128", "result": result}
# ===================================================================
# SYNC — синхронизация реестров
# ===================================================================
@router.post("/sync/aircraft")
def sync_aircraft(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""
Запустить синхронизацию реестра ВС с ФГИС РЭВС.
Фоновая задача результат доступен через /sync/status.
"""
result = fgis_client.sync_aircraft()
audit(db, user, "fgis_sync", "aircraft",
description=f"Sync aircraft: {result.status} ({result.records_synced}/{result.records_total})")
db.commit()
return {
"action": "sync_aircraft",
"legal_basis": "ВК РФ ст. 33",
"result": result.__dict__,
}
@router.post("/sync/certificates")
def sync_certificates(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Синхронизация СЛГ с ФГИС РЭВС."""
result = fgis_client.sync_certificates()
audit(db, user, "fgis_sync", "certificates",
description=f"Sync certs: {result.status}")
db.commit()
return {"action": "sync_certificates", "result": result.__dict__}
@router.post("/sync/directives")
def sync_directives(
since_days: int = Query(30, ge=1, le=365),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""
Синхронизация директив ЛГ с ФГИС РЭВС.
Автоматически создаёт новые AD в системе.
"""
result = fgis_client.sync_directives(since_days)
audit(db, user, "fgis_sync", "directives",
description=f"Sync AD: {result.status} ({result.records_synced} synced)")
db.commit()
return {"action": "sync_directives", "legal_basis": "ВК РФ ст. 37", "result": result.__dict__}
@router.post("/sync/all")
def sync_all(
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""Полная синхронизация всех реестров с ФГИС РЭВС."""
results = {
"aircraft": fgis_client.sync_aircraft().__dict__,
"certificates": fgis_client.sync_certificates().__dict__,
"directives": fgis_client.sync_directives().__dict__,
}
audit(db, user, "fgis_sync", "all", description="Full ФГИС РЭВС sync")
db.commit()
return {"action": "sync_all", "results": results}
@router.get("/sync/status")
def sync_status(user=Depends(get_current_user)):
"""История и статус синхронизаций."""
log = fgis_client.get_sync_log()
return {
"total_syncs": len(log),
"last_sync": log[-1] if log else None,
"history": log[-20:],
}
# ===================================================================
# СМЭВ 3.0 — юридически значимый обмен
# ===================================================================
class SMEVRequest(BaseModel):
service_code: str = Field(..., description="FAVT-001..004")
data: dict = Field(default={})
@router.post("/smev/send")
def smev_send(
req: SMEVRequest,
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
"""
Отправить запрос через СМЭВ 3.0 (юридически значимый обмен).
Требуется УКЭП (ГОСТ Р 34.10-2012).
Сервисы:
- FAVT-001: Запрос данных из реестра ВС
- FAVT-002: Подача заявления на СЛГ
- FAVT-003: Уведомление о выполнении ДЛГ
- FAVT-004: Отчёт о ТО
"""
message_id = fgis_client.smev_send_request(req.service_code, req.data)
audit(db, user, "smev_send", req.service_code,
description=f"СМЭВ 3.0: {req.service_code}, msg_id={message_id}")
db.commit()
return {
"action": "smev_send",
"service_code": req.service_code,
"message_id": message_id,
"note": "Ответ будет доступен асинхронно через /smev/responses",
}
@router.get("/connection-status")
def connection_status(user=Depends(get_current_user)):
"""Статус подключения к ФГИС РЭВС и СМЭВ."""
return {
"fgis_revs": {
"url": fgis_client.config.BASE_URL,
"status": "mock_mode",
"note": "Тестовая среда — используются mock-данные. Для production: настроить сертификат ГОСТ.",
},
"smev_30": {
"url": fgis_client.config.SMEV_URL,
"status": "mock_mode",
"note": "Требуется УКЭП и регистрация в СМЭВ 3.0.",
},
"config": {
"org_id": fgis_client.config.ORG_ID or "(не задан)",
"cert_path": fgis_client.config.CERT_PATH,
"timeout": fgis_client.config.TIMEOUT,
"max_retries": fgis_client.config.MAX_RETRIES,
},
}

View File

@ -0,0 +1,4 @@
"""Модуль legal: юрисдикции, документы, ИИ-анализ. Роутер в base.py."""
from .base import router
__all__ = ["router"]

View File

@ -0,0 +1,385 @@
"""
Роутер legal: юрисдикции, документы, перекрёстные ссылки, комментарии, судебная практика, ИИ-анализ.
Обработчики бизнес-логики в handlers.py.
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.api.helpers import paginate_query
from app.api.deps import get_current_user, require_roles
from app.db.session import get_db
from app.models import LegalDocument, CrossReference, LegalComment, JudicialPractice
from app.schemas.legal import (
JurisdictionCreate,
JurisdictionUpdate,
JurisdictionOut,
LegalDocumentCreate,
LegalDocumentUpdate,
LegalDocumentOut,
CrossReferenceCreate,
CrossReferenceOut,
LegalCommentCreate,
LegalCommentUpdate,
LegalCommentOut,
JudicialPracticeCreate,
JudicialPracticeUpdate,
JudicialPracticeOut,
AnalysisRequest,
AnalysisResponse,
)
from . import handlers
router = APIRouter(prefix="/legal", tags=["legal"])
# --- Jurisdictions ---
@router.get("/jurisdictions", response_model=list[JurisdictionOut])
def list_jurisdictions(
active_only: bool = Query(True),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return handlers.list_jurisdictions(db, active_only)
@router.post("/jurisdictions", response_model=JurisdictionOut, status_code=status.HTTP_201_CREATED)
def create_jurisdiction(
payload: JurisdictionCreate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin")),
):
return handlers.create_jurisdiction(db, payload)
@router.get("/jurisdictions/{jid}", response_model=JurisdictionOut)
def get_jurisdiction(jid: str, db: Session = Depends(get_db), user=Depends(get_current_user)):
return handlers.get_jurisdiction(db, jid)
@router.patch("/jurisdictions/{jid}", response_model=JurisdictionOut)
def update_jurisdiction(
jid: str,
payload: JurisdictionUpdate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin")),
):
return handlers.update_jurisdiction(db, jid, payload)
# --- Legal Documents ---
@router.get("/documents", response_model=list[LegalDocumentOut])
def list_legal_documents(
page: int = Query(1, ge=1), per_page: int = Query(25, ge=1, le=100),
jurisdiction_id: str | None = Query(None),
document_type: str | None = Query(None),
limit: int = Query(100, le=500),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
return handlers.list_legal_documents(db, page, per_page, jurisdiction_id, document_type)
@router.post("/documents", response_model=LegalDocumentOut, status_code=status.HTTP_201_CREATED)
def create_legal_document(
payload: LegalDocumentCreate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
return handlers.create_legal_document(db, payload)
@router.get("/documents/{doc_id}", response_model=LegalDocumentOut)
def get_legal_document(doc_id: str, db: Session = Depends(get_db), user=Depends(get_current_user)):
return handlers.get_legal_document(db, doc_id)
@router.patch("/documents/{doc_id}", response_model=LegalDocumentOut)
def update_legal_document(
doc_id: str,
payload: LegalDocumentUpdate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
return handlers.update_legal_document(db, doc_id, payload)
@router.get("/documents/{doc_id}/cross-references", response_model=list[CrossReferenceOut])
def list_document_cross_references(
doc_id: str,
direction: str = Query("outgoing", description="outgoing|incoming"),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
q = db.query(CrossReference)
if direction == "incoming":
q = q.filter(CrossReference.target_document_id == doc_id)
else:
q = q.filter(CrossReference.source_document_id == doc_id)
return q.limit(100).all()
@router.post("/cross-references", response_model=CrossReferenceOut, status_code=status.HTTP_201_CREATED)
def create_cross_reference(
payload: CrossReferenceCreate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
ref = CrossReference(**payload.model_dump())
db.add(ref)
db.commit()
db.refresh(ref)
return ref
# --- Legal Comments ---
@router.get("/comments", response_model=list[LegalCommentOut])
def list_legal_comments(
jurisdiction_id: str | None = Query(None),
document_id: str | None = Query(None),
limit: int = Query(100, le=500),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
q = db.query(LegalComment)
if jurisdiction_id:
q = q.filter(LegalComment.jurisdiction_id == jurisdiction_id)
if document_id:
q = q.filter(LegalComment.document_id == document_id)
return paginate_query(q.order_by(LegalComment.created_at.desc()), 1, limit)
@router.post("/comments", response_model=LegalCommentOut, status_code=status.HTTP_201_CREATED)
def create_legal_comment(
payload: LegalCommentCreate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
c = LegalComment(**payload.model_dump())
db.add(c)
db.commit()
db.refresh(c)
return c
@router.patch("/comments/{cid}", response_model=LegalCommentOut)
def update_legal_comment(
cid: str,
payload: LegalCommentUpdate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
c = db.get(LegalComment, cid)
if not c:
raise HTTPException(status_code=404, detail="Comment not found")
for k, v in payload.model_dump(exclude_unset=True).items():
setattr(c, k, v)
db.commit()
db.refresh(c)
return c
# --- Judicial Practice ---
@router.get("/judicial-practices", response_model=list[JudicialPracticeOut])
def list_judicial_practices(
jurisdiction_id: str | None = Query(None),
document_id: str | None = Query(None),
limit: int = Query(100, le=500),
db: Session = Depends(get_db),
user=Depends(get_current_user),
):
q = db.query(JudicialPractice)
if jurisdiction_id:
q = q.filter(JudicialPractice.jurisdiction_id == jurisdiction_id)
if document_id:
q = q.filter(JudicialPractice.document_id == document_id)
return q.order_by(
JudicialPractice.decision_date.is_(None),
JudicialPractice.decision_date.desc(),
JudicialPractice.created_at.desc(),
).limit(limit).all()
@router.post("/judicial-practices", response_model=JudicialPracticeOut, status_code=status.HTTP_201_CREATED)
def create_judicial_practice(
payload: JudicialPracticeCreate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
p = JudicialPractice(**payload.model_dump())
db.add(p)
db.commit()
db.refresh(p)
return p
@router.patch("/judicial-practices/{pid}", response_model=JudicialPracticeOut)
def update_judicial_practice(
pid: str,
payload: JudicialPracticeUpdate,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
p = db.get(JudicialPractice, pid)
if not p:
raise HTTPException(status_code=404, detail="Judicial practice not found")
for k, v in payload.model_dump(exclude_unset=True).items():
setattr(p, k, v)
db.commit()
db.refresh(p)
return p
# --- ИИ-анализ ---
@router.post("/analyze", response_model=AnalysisResponse)
def analyze_document(
payload: AnalysisRequest,
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
return handlers.run_analysis(db, payload)
@router.post("/documents/{doc_id}/analyze", response_model=AnalysisResponse)
def analyze_existing_document(
doc_id: str,
skip_agents: list[str] | None = Query(None),
save_cross_references: bool = Query(True),
db: Session = Depends(get_db),
user=Depends(require_roles("admin", "authority_inspector")),
):
d = db.get(LegalDocument, doc_id)
if not d:
raise HTTPException(status_code=404, detail="Document not found")
orch = LegalAnalysisOrchestrator(db=db)
out = orch.run(
document_id=doc_id,
jurisdiction_id=d.jurisdiction_id,
title=d.title,
content=d.content,
existing_document_type=d.document_type,
skip_agents=skip_agents,
save_cross_references=save_cross_references,
)
d.document_type = out["document_type"]
d.analysis_json = out.get("analysis_json")
d.compliance_notes = out.get("compliance_notes")
db.commit()
db.refresh(d)
return AnalysisResponse(
document_type=out["document_type"],
analysis_json=out.get("analysis_json"),
compliance_notes=out.get("compliance_notes"),
results=out.get("results", {}),
)
# --- Справочные эндпоинты (ФАП, национальная система, матрица соответствия) ---
FAP_ADDITIONAL = {
"ФАП-148": {
"full_name": "Требования к эксплуатантам гражданских воздушных судов по обеспечению поддержания лётной годности",
"document": "Приказ Минтранса России от 23.06.2003 № 148",
"status": "Действует",
"scope": [
"Обязанности эксплуатанта по ПЛГ",
"Программа ТО воздушного судна",
"Контроль за выполнением директив лётной годности",
"Ведение эксплуатационной документации",
"Учёт наработки агрегатов и компонентов",
"Контроль назначенных ресурсов и сроков службы",
],
"relevance_to_system": "Базовый документ для модулей: Лётная годность, ТО, Чек-листы, Риски",
},
"ФАП-149": {
"full_name": "Требования к электросветотехническому обеспечению полётов",
"document": "Приказ Минтранса России от 23.06.2003 № 149",
"status": "Действует",
"scope": [
"Нормы электросветотехнического обеспечения на аэродромах",
"Требования к светосигнальному оборудованию",
"Контроль технического состояния электросветотехнических средств",
"Периодичность проверок и ТО",
],
"relevance_to_system": "Учитывается при аудитах аэродромной инфраструктуры и чек-листах",
},
"ФАП-10": {
"full_name": "Сертификационные требования к эксплуатантам коммерческой гражданской авиации",
"document": "Приказ Минтранса России от 04.02.2003 № 10 (ФАП-246 от 13.08.2015 — актуальная редакция)",
"status": "Заменён ФАП-246, но ряд положений действует",
"scope": [
"Организационная структура эксплуатанта",
"Требования к руководящему персоналу",
"Система управления безопасностью полётов",
"Программа подготовки авиационного персонала",
"Требования к парку ВС",
],
"relevance_to_system": "Базис для модуля Сертификация эксплуатантов (заявки, организации)",
},
}
NATIONAL_PLG_FRAMEWORK = {
"presidential_order": {
"name": "Поручение Президента РФ Пр-1379 от 17.07.2019, п.2 пп.«в»",
"subject": "Гармонизация условий поддержания лётной годности ВС",
"requirements": [
"Создание национальной системы поддержания лётной годности",
"Гармонизация требований с международными стандартами (ICAO, EASA)",
"Обеспечение непрерывности контроля технического состояния ВС",
],
},
"fz_488": {
"name": "Федеральный закон от 30.12.2021 № 488-ФЗ",
"subject": "Введение статьи 37.2 ВК РФ «Поддержание лётной годности»",
"article_37_2": {
"text": "Поддержание лётной годности ВС — комплекс мер по обеспечению соответствия ВС "
"требованиям к лётной годности и поддержанию его безопасной эксплуатации",
"obligations": [
"Эксплуатант обязан обеспечивать ПЛГ",
"ФАВТ осуществляет государственный контроль за ПЛГ",
"Организация по ТО должна иметь сертификат",
],
},
},
"tz_asu_tk": {
"name": "Техническое задание на АСУ ТК «Контроль летной годности ВС»",
"approved_by": "Утверждено заместителем министра транспорта РФ 24.07.2022",
"scope": [
"Автоматизация процессов контроля лётной годности",
"Учёт воздушных судов и их технического состояния",
"Контроль выполнения программ ТО",
"Управление сертификационными процедурами",
"Мониторинг рисков безопасности полётов",
"Интеграция с системами ФАВТ и эксплуатантов",
],
},
}
@router.get("/fap-additional", tags=["legal"])
def get_additional_fap():
return FAP_ADDITIONAL
@router.get("/national-plg-framework", tags=["legal"])
def get_national_plg_framework():
return NATIONAL_PLG_FRAMEWORK
@router.get("/compliance-matrix", tags=["legal"])
def get_compliance_matrix():
return {
"system": "КЛГ АСУ ТК v16",
"developer": "АО «REFLY»",
"matrix": [
{"num": 1, "document": "Воздушный кодекс РФ (60-ФЗ)", "articles": "ст. 8, 35, 36, 37, 37.2",
"modules": ["Панель ФАВТ", "Сертификация", "Лётная годность", "Реестр ВС"], "status": "implemented"},
{"num": 2, "document": "ФАП-21 (Часть 21)", "articles": "Приказ №184", "modules": ["Сертификация АТ", "Организации"], "status": "implemented"},
{"num": 19, "document": "ТЗ АСУ ТК", "articles": "Утв. 24.07.2022", "modules": ["Все модули системы"], "status": "implemented"},
],
}

View File

@ -0,0 +1,122 @@
"""
Обработчики бизнес-логики для модуля legal (юрисдикции, документы, комментарии, судебная практика, ИИ-анализ).
"""
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models import Jurisdiction, LegalDocument, CrossReference, LegalComment, JudicialPractice
from app.schemas.legal import (
JurisdictionCreate,
JurisdictionUpdate,
LegalDocumentCreate,
LegalDocumentUpdate,
CrossReferenceCreate,
LegalCommentCreate,
LegalCommentUpdate,
JudicialPracticeCreate,
JudicialPracticeUpdate,
AnalysisRequest,
AnalysisResponse,
)
from app.services.legal_agents import LegalAnalysisOrchestrator
from app.api.helpers import paginate_query
# --- Jurisdictions ---
def list_jurisdictions(db: Session, active_only: bool):
q = db.query(Jurisdiction)
if active_only:
q = q.filter(Jurisdiction.is_active.is_(True))
return q.order_by(Jurisdiction.code).all()
def create_jurisdiction(db: Session, payload: JurisdictionCreate):
j = Jurisdiction(**payload.model_dump())
db.add(j)
db.commit()
db.refresh(j)
return j
def get_jurisdiction(db: Session, jid: str):
j = db.get(Jurisdiction, jid)
if not j:
raise HTTPException(status_code=404, detail="Jurisdiction not found")
return j
def update_jurisdiction(db: Session, jid: str, payload: JurisdictionUpdate):
j = db.get(Jurisdiction, jid)
if not j:
raise HTTPException(status_code=404, detail="Jurisdiction not found")
for k, v in payload.model_dump(exclude_unset=True).items():
setattr(j, k, v)
db.commit()
db.refresh(j)
return j
# --- Legal Documents ---
def list_legal_documents(db: Session, page: int, per_page: int, jurisdiction_id: str | None, document_type: str | None):
q = db.query(LegalDocument)
if jurisdiction_id:
q = q.filter(LegalDocument.jurisdiction_id == jurisdiction_id)
if document_type:
q = q.filter(LegalDocument.document_type == document_type)
q = q.order_by(LegalDocument.created_at.desc())
return paginate_query(q, page, per_page)
def create_legal_document(db: Session, payload: LegalDocumentCreate):
d = LegalDocument(**payload.model_dump())
db.add(d)
db.commit()
db.refresh(d)
return d
def get_legal_document(db: Session, doc_id: str):
d = db.get(LegalDocument, doc_id)
if not d:
raise HTTPException(status_code=404, detail="Document not found")
return d
def update_legal_document(db: Session, doc_id: str, payload: LegalDocumentUpdate):
d = db.get(LegalDocument, doc_id)
if not d:
raise HTTPException(status_code=404, detail="Document not found")
for k, v in payload.model_dump(exclude_unset=True).items():
setattr(d, k, v)
db.commit()
db.refresh(d)
return d
# --- Analyze ---
def run_analysis(db: Session, payload: AnalysisRequest):
orch = LegalAnalysisOrchestrator(db=db)
out = orch.run(
document_id=payload.document_id,
jurisdiction_id=payload.jurisdiction_id,
title=payload.title,
content=payload.content,
skip_agents=payload.skip_agents,
save_cross_references=payload.save_cross_references,
)
if payload.document_id and out.get("document_type"):
d = db.get(LegalDocument, payload.document_id)
if d:
d.document_type = out["document_type"]
d.analysis_json = out.get("analysis_json")
d.compliance_notes = out.get("compliance_notes")
db.commit()
return AnalysisResponse(
document_type=out["document_type"],
analysis_json=out.get("analysis_json"),
compliance_notes=out.get("compliance_notes"),
results=out.get("results", {}),
)

View File

@ -0,0 +1,8 @@
"""
Структура модуля персонала ПЛГ (сертификация, аттестация, повышение квалификации).
Роуты пока подключены через personnel_plg; при рефакторинге перенести сюда.
"""
# В будущем: from .router import router
from ..personnel_plg import router
__all__ = ["router"]

View File

@ -0,0 +1,24 @@
"""ФГИС РЭВС: конфигурация и модели в base_service, клиент в fgis_revs."""
from .base_service import (
FGISConfig,
SyncDirection,
SyncStatus,
FGISAircraft,
FGISCertificate,
FGISOperator,
FGISDirective,
FGISMaintOrg,
SyncResult,
)
__all__ = [
"FGISConfig",
"SyncDirection",
"SyncStatus",
"FGISAircraft",
"FGISCertificate",
"FGISOperator",
"FGISDirective",
"FGISMaintOrg",
"SyncResult",
]

View File

@ -0,0 +1,133 @@
"""
Базовые типы и конфигурация ФГИС РЭВС.
Клиент и методы работы с API в fgis_revs.py.
"""
from typing import List
from dataclasses import dataclass, field
from enum import Enum
class FGISConfig:
"""Параметры подключения к ФГИС РЭВС."""
BASE_URL: str = "https://fgis-revs-test.favt.gov.ru/api/v2"
SMEV_URL: str = "https://smev3-n0.test.gosuslugi.ru:7500/smev/v1.2/ws"
CERT_PATH: str = "/etc/ssl/fgis/client.pem"
KEY_PATH: str = "/etc/ssl/fgis/client.key"
CA_PATH: str = "/etc/ssl/fgis/ca-bundle.pem"
TIMEOUT: int = 30
ORG_ID: str = ""
API_KEY: str = ""
MAX_RETRIES: int = 3
RETRY_DELAY: int = 5
class SyncDirection(Enum):
PULL = "pull"
PUSH = "push"
BIDIRECT = "bidirect"
class SyncStatus(Enum):
SUCCESS = "success"
PARTIAL = "partial"
FAILED = "failed"
PENDING = "pending"
@dataclass
class FGISAircraft:
"""Воздушное судно в реестре ФГИС РЭВС."""
registration: str
serial_number: str
aircraft_type: str
icao_code: str = ""
manufacturer: str = ""
year_manufactured: int = 0
max_takeoff_weight: float = 0
owner: str = ""
operator: str = ""
operator_certificate: str = ""
base_airport: str = ""
status: str = "active"
registration_date: str = ""
deregistration_date: str = ""
fgis_id: str = ""
last_sync: str = ""
@dataclass
class FGISCertificate:
"""Сертификат лётной годности из ФГИС РЭВС."""
certificate_number: str
aircraft_registration: str
certificate_type: str
issue_date: str
expiry_date: str
issuing_authority: str = "ФАВТ"
category: str = ""
noise_certificate: str = ""
limitations: List[str] = field(default_factory=list)
status: str = "valid"
fgis_id: str = ""
@dataclass
class FGISOperator:
"""Эксплуатант в реестре ФГИС РЭВС."""
certificate_number: str
name: str
legal_address: str = ""
actual_address: str = ""
inn: str = ""
ogrn: str = ""
issue_date: str = ""
expiry_date: str = ""
aircraft_types: List[str] = field(default_factory=list)
fleet_count: int = 0
restrictions: List[str] = field(default_factory=list)
status: str = "active"
fgis_id: str = ""
@dataclass
class FGISDirective:
"""Директива ЛГ из ФГИС РЭВС."""
number: str
title: str
issuing_authority: str = "ФАВТ"
effective_date: str = ""
aircraft_types: List[str] = field(default_factory=list)
ata_chapter: str = ""
compliance_type: str = "mandatory"
description: str = ""
supersedes: str = ""
fgis_id: str = ""
@dataclass
class FGISMaintOrg:
"""Организация по ТО (ФАП-145) из ФГИС РЭВС."""
certificate_number: str
name: str
approval_scope: List[str] = field(default_factory=list)
issue_date: str = ""
expiry_date: str = ""
status: str = "active"
fgis_id: str = ""
@dataclass
class SyncResult:
"""Результат синхронизации."""
direction: str
entity_type: str
status: str
records_total: int = 0
records_synced: int = 0
records_created: int = 0
records_updated: int = 0
records_failed: int = 0
errors: List[str] = field(default_factory=list)
started_at: str = ""
completed_at: str = ""
duration_seconds: float = 0

View File

@ -0,0 +1,500 @@
"""
Интеграция с ФГИС РЭВС Федеральная государственная информационная система
Реестра эксплуатантов воздушных судов.
Правовые основания:
- ВК РФ ст. 33 Государственный реестр ГА ВС РФ
- ВК РФ ст. 36 Допуск ВС к эксплуатации
- ВК РФ ст. 37.2 Поддержание ЛГ (ФЗ-488)
- Приказ Минтранса 98 от 02.07.2007 порядок ведения Гос. реестра
- Приказ Росавиации 180-П от 09.03.2017 ФГИС РЭВС
- ФАП-10/246 сертификация эксплуатантов
Протокол: REST API / SOAP / СМЭВ 3.0 (Система межведомственного электронного взаимодействия).
В production используется сертификат ГОСТ Р 34.10-2012 (УКЭП).
Конфигурация и модели данных вынесены в app.services.fgis.base_service.
"""
import logging
import uuid
import xml.etree.ElementTree as ET
from dataclasses import asdict
from datetime import datetime, timezone, timedelta
from typing import Optional, List
from app.services.fgis import (
FGISConfig,
SyncDirection,
SyncStatus,
FGISAircraft,
FGISCertificate,
FGISOperator,
FGISDirective,
FGISMaintOrg,
SyncResult,
)
logger = logging.getLogger(__name__)
# ===================================================================
# КЛИЕНТ ФГИС РЭВС
# ===================================================================
class FGISREVSClient:
"""
Клиент для взаимодействия с ФГИС РЭВС.
Поддерживает два протокола:
1. REST API для оперативных запросов
2. СМЭВ 3.0 (SOAP) для юридически значимого обмена
В тестовой среде используется mock-режим.
"""
def __init__(self, config: Optional[FGISConfig] = None):
self.config = config or FGISConfig()
self._session = None
self._smev_client = None
self._sync_log: List[SyncResult] = []
# --- REST API методы ---
def _make_request(self, method: str, endpoint: str, data: dict = None) -> dict:
"""HTTP запрос к REST API ФГИС РЭВС."""
import httpx
url = f"{self.config.BASE_URL}/{endpoint}"
headers = {
"Authorization": f"Bearer {self.config.API_KEY}",
"Content-Type": "application/json",
"X-Organization-ID": self.config.ORG_ID,
"X-Request-ID": str(uuid.uuid4()),
}
try:
with httpx.Client(
cert=(self.config.CERT_PATH, self.config.KEY_PATH),
verify=self.config.CA_PATH,
timeout=self.config.TIMEOUT,
) as client:
if method == "GET":
resp = client.get(url, headers=headers, params=data)
elif method == "POST":
resp = client.post(url, headers=headers, json=data)
elif method == "PUT":
resp = client.put(url, headers=headers, json=data)
else:
raise ValueError(f"Unsupported method: {method}")
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error("ФГИС РЭВС request failed: %s %s%s", method, endpoint, str(e))
raise
# --- PULL: Получение данных ---
def pull_aircraft_registry(self, registration: str = None) -> List[FGISAircraft]:
"""
Получить реестр ВС из ФГИС РЭВС.
ВК РФ ст. 33: государственный реестр ГА ВС РФ.
"""
params = {}
if registration:
params["registration"] = registration
try:
data = self._make_request("GET", "registry/aircraft", params)
return [FGISAircraft(**item) for item in data.get("items", [])]
except Exception:
logger.warning("ФГИС РЭВС unavailable — using mock data")
return self._mock_aircraft_registry(registration)
def pull_certificates(self, registration: str = None) -> List[FGISCertificate]:
"""
Получить СЛГ из ФГИС РЭВС.
ВК РФ ст. 36: удостоверение (сертификат) лётной годности.
"""
params = {}
if registration:
params["aircraft_registration"] = registration
try:
data = self._make_request("GET", "certificates/airworthiness", params)
return [FGISCertificate(**item) for item in data.get("items", [])]
except Exception:
logger.warning("ФГИС РЭВС unavailable — using mock certificates")
return self._mock_certificates(registration)
def pull_operators(self) -> List[FGISOperator]:
"""Получить реестр эксплуатантов."""
try:
data = self._make_request("GET", "registry/operators")
return [FGISOperator(**item) for item in data.get("items", [])]
except Exception:
return self._mock_operators()
def pull_directives(self, since: str = None) -> List[FGISDirective]:
"""
Получить директивы ЛГ из ФГИС РЭВС.
ВК РФ ст. 37: обязательные для выполнения ДЛГ.
"""
params = {}
if since:
params["effective_after"] = since
try:
data = self._make_request("GET", "directives", params)
return [FGISDirective(**item) for item in data.get("items", [])]
except Exception:
return self._mock_directives()
def pull_maint_organizations(self) -> List[FGISMaintOrg]:
"""Получить реестр организаций по ТО (ФАП-145)."""
try:
data = self._make_request("GET", "registry/maintenance-organizations")
return [FGISMaintOrg(**item) for item in data.get("items", [])]
except Exception:
return self._mock_maint_orgs()
# --- PUSH: Отправка данных ---
def push_compliance_report(self, report: dict) -> dict:
"""
Отправить отчёт о выполнении ДЛГ в ФГИС РЭВС.
ФАП-148 п.4.3: эксплуатант обязан информировать ФАВТ о выполнении ДЛГ.
"""
payload = {
"report_type": "ad_compliance",
"organization_id": self.config.ORG_ID,
"submitted_at": datetime.now(timezone.utc).isoformat(),
**report,
}
try:
return self._make_request("POST", "reports/compliance", payload)
except Exception:
logger.warning("ФГИС РЭВС push failed — queuing for retry")
return {"status": "queued", "message": "Will retry when ФГИС available"}
def push_maintenance_report(self, wo_data: dict) -> dict:
"""
Отправить данные о выполненном ТО (CRS).
ФАП-145 п.A.55: документация о выполненном ТО.
"""
payload = {
"report_type": "maintenance_completion",
"organization_id": self.config.ORG_ID,
"submitted_at": datetime.now(timezone.utc).isoformat(),
**wo_data,
}
try:
return self._make_request("POST", "reports/maintenance", payload)
except Exception:
return {"status": "queued"}
def push_defect_report(self, defect_data: dict) -> dict:
"""
Отправить донесение о дефекте в ФАВТ.
ФАП-128: обязательное донесение о событиях.
"""
payload = {
"report_type": "defect_mandatory",
"organization_id": self.config.ORG_ID,
"submitted_at": datetime.now(timezone.utc).isoformat(),
**defect_data,
}
try:
return self._make_request("POST", "reports/defects", payload)
except Exception:
return {"status": "queued"}
# --- СМЭВ 3.0: Юридически значимый обмен ---
def smev_send_request(self, service_code: str, request_data: dict) -> str:
"""
Отправить запрос через СМЭВ 3.0.
Используется для юридически значимого обмена данными.
Сервисы:
- FAVT-001: Запрос данных из реестра ВС
- FAVT-002: Подача заявления на СЛГ
- FAVT-003: Уведомление о выполнении ДЛГ
- FAVT-004: Отчёт о ТО
"""
message_id = str(uuid.uuid4())
# Формирование SOAP-конверта СМЭВ 3.0
envelope = self._build_smev_envelope(service_code, message_id, request_data)
try:
import httpx
resp = httpx.post(
self.config.SMEV_URL,
content=envelope,
headers={"Content-Type": "text/xml; charset=utf-8"},
cert=(self.config.CERT_PATH, self.config.KEY_PATH),
verify=self.config.CA_PATH,
timeout=60,
)
# Parse SMEV response
return self._parse_smev_response(resp.content, message_id)
except Exception as e:
logger.error("СМЭВ 3.0 request failed: %s", str(e))
return message_id # Return ID for tracking
def _build_smev_envelope(self, service_code: str, message_id: str, data: dict) -> bytes:
"""Построить SOAP-конверт СМЭВ 3.0 с ЭП ГОСТ Р 34.10-2012."""
timestamp = datetime.now(timezone.utc).isoformat()
xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:smev="urn://x-artefacts-smev-gov-ru/services/message-exchange/types/1.2"
xmlns:basic="urn://x-artefacts-smev-gov-ru/services/message-exchange/types/basic/1.2">
<soap:Header/>
<soap:Body>
<smev:SendRequestRequest>
<smev:SenderProvidedRequestData>
<smev:MessageID>{message_id}</smev:MessageID>
<basic:MessagePrimaryContent>
<fgis:Request xmlns:fgis="urn://x-artefacts-favt-gov-ru/fgis-revs/1.0"
serviceCode="{service_code}"
timestamp="{timestamp}"
orgId="{self.config.ORG_ID}">
{self._dict_to_xml(data)}
</fgis:Request>
</basic:MessagePrimaryContent>
</smev:SenderProvidedRequestData>
</smev:SendRequestRequest>
</soap:Body>
</soap:Envelope>"""
return xml.encode("utf-8")
def _dict_to_xml(self, data: dict, indent: int = 12) -> str:
"""Конвертировать dict в XML-элементы."""
lines = []
for key, value in data.items():
if isinstance(value, dict):
lines.append(f"{' ' * indent}<{key}>{self._dict_to_xml(value, indent + 1)}</{key}>")
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
lines.append(f"{' ' * indent}<{key}>{self._dict_to_xml(item, indent + 1)}</{key}>")
else:
lines.append(f"{' ' * indent}<{key}>{item}</{key}>")
else:
lines.append(f"{' ' * indent}<{key}>{value}</{key}>")
return "\n".join(lines)
def _parse_smev_response(self, content: bytes, message_id: str) -> str:
"""Разобрать ответ СМЭВ 3.0."""
try:
root = ET.fromstring(content)
# Extract message ID from response
ns = {"smev": "urn://x-artefacts-smev-gov-ru/services/message-exchange/types/1.2"}
resp_id = root.find(".//smev:MessageID", ns)
return resp_id.text if resp_id is not None else message_id
except Exception:
return message_id
# --- SYNC: Синхронизация ---
def sync_aircraft(self) -> SyncResult:
"""
Синхронизация реестра ВС с ФГИС РЭВС.
Двунаправленная: pull свежие данные + push обновления.
"""
result = SyncResult(
direction="bidirect", entity_type="aircraft",
status="pending", started_at=datetime.now(timezone.utc).isoformat(),
)
try:
# Pull from ФГИС
fgis_aircraft = self.pull_aircraft_registry()
result.records_total = len(fgis_aircraft)
for ac in fgis_aircraft:
try:
self._upsert_aircraft(ac)
result.records_synced += 1
except Exception as e:
result.records_failed += 1
result.errors.append(f"{ac.registration}: {str(e)[:80]}")
result.status = "success" if result.records_failed == 0 else "partial"
except Exception as e:
result.status = "failed"
result.errors.append(str(e)[:200])
result.completed_at = datetime.now(timezone.utc).isoformat()
self._sync_log.append(result)
logger.info("ФГИС sync aircraft: %s (%d/%d)", result.status, result.records_synced, result.records_total)
return result
def sync_certificates(self) -> SyncResult:
"""Синхронизация СЛГ с ФГИС РЭВС."""
result = SyncResult(
direction="pull", entity_type="certificates",
status="pending", started_at=datetime.now(timezone.utc).isoformat(),
)
try:
certs = self.pull_certificates()
result.records_total = len(certs)
for cert in certs:
try:
self._upsert_certificate(cert)
result.records_synced += 1
except Exception as e:
result.records_failed += 1
result.errors.append(f"{cert.certificate_number}: {str(e)[:80]}")
result.status = "success" if result.records_failed == 0 else "partial"
except Exception as e:
result.status = "failed"
result.errors.append(str(e)[:200])
result.completed_at = datetime.now(timezone.utc).isoformat()
self._sync_log.append(result)
return result
def sync_directives(self, since_days: int = 30) -> SyncResult:
"""Синхронизация директив ЛГ из ФГИС РЭВС."""
since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime("%Y-%m-%d")
result = SyncResult(
direction="pull", entity_type="directives",
status="pending", started_at=datetime.now(timezone.utc).isoformat(),
)
try:
directives = self.pull_directives(since)
result.records_total = len(directives)
for ad in directives:
try:
self._upsert_directive(ad)
result.records_synced += 1
except Exception as e:
result.records_failed += 1
result.errors.append(f"{ad.number}: {str(e)[:80]}")
result.status = "success" if result.records_failed == 0 else "partial"
except Exception as e:
result.status = "failed"
result.errors.append(str(e)[:200])
result.completed_at = datetime.now(timezone.utc).isoformat()
self._sync_log.append(result)
return result
def get_sync_log(self) -> List[dict]:
"""Получить историю синхронизаций."""
return [asdict(r) for r in self._sync_log]
# --- Внутренние методы upsert ---
def _upsert_aircraft(self, ac: FGISAircraft):
"""Создать или обновить ВС в локальной БД из данных ФГИС."""
from app.api.routes.aircraft import _aircraft_store
# В production: SQLAlchemy merge
logger.debug("Upsert aircraft: %s", ac.registration)
def _upsert_certificate(self, cert: FGISCertificate):
"""Создать или обновить СЛГ."""
logger.debug("Upsert certificate: %s", cert.certificate_number)
def _upsert_directive(self, ad: FGISDirective):
"""Создать или обновить ДЛГ из ФГИС."""
from app.api.routes.airworthiness_core import _directives
existing = [d for d in _directives.values() if d.get("number") == ad.number]
if not existing:
did = str(uuid.uuid4())
_directives[did] = {
"id": did,
"number": ad.number,
"title": ad.title,
"issuing_authority": ad.issuing_authority,
"aircraft_types": ad.aircraft_types,
"ata_chapter": ad.ata_chapter,
"effective_date": ad.effective_date,
"compliance_type": ad.compliance_type,
"description": ad.description,
"supersedes": ad.supersedes,
"status": "open",
"source": "ФГИС РЭВС",
"fgis_id": ad.fgis_id,
"created_at": datetime.now(timezone.utc).isoformat(),
}
logger.info("New AD from ФГИС: %s", ad.number)
# --- MOCK данные (тестовая среда) ---
def _mock_aircraft_registry(self, registration: str = None) -> List[FGISAircraft]:
"""Mock данные реестра ВС для тестовой среды."""
fleet = [
FGISAircraft(registration="RA-89001", serial_number="95001", aircraft_type="SSJ-100",
icao_code="SU95", manufacturer="ПАО «ОАК» (филиал «Региональные самолёты»)",
year_manufactured=2019, max_takeoff_weight=49450,
owner="АО «КЛГ Авиа»", operator="АО «КЛГ Авиа»",
operator_certificate="ЭВ-01/2020", base_airport="UMKK",
status="active", registration_date="2019-03-15", fgis_id="FGIS-AC-001"),
FGISAircraft(registration="RA-89002", serial_number="95002", aircraft_type="SSJ-100",
icao_code="SU95", manufacturer="ПАО «ОАК»",
year_manufactured=2020, max_takeoff_weight=49450,
owner="АО «КЛГ Авиа»", operator="АО «КЛГ Авиа»",
operator_certificate="ЭВ-01/2020", base_airport="UMKK",
status="active", registration_date="2020-06-10", fgis_id="FGIS-AC-002"),
FGISAircraft(registration="RA-73201", serial_number="41201", aircraft_type="Boeing 737-800",
icao_code="B738", manufacturer="The Boeing Company",
year_manufactured=2015, max_takeoff_weight=79016,
owner="ООО «КЛГ Лизинг»", operator="АО «КЛГ Авиа»",
operator_certificate="ЭВ-01/2020", base_airport="UMKK",
status="active", registration_date="2018-11-20", fgis_id="FGIS-AC-003"),
FGISAircraft(registration="RA-02801", serial_number="HL-0801", aircraft_type="Ми-8Т",
icao_code="MI8T", manufacturer="Казанский вертолётный завод",
year_manufactured=2010, max_takeoff_weight=12000,
owner="АО «КЛГ Авиа»", operator="АО «КЛГ Авиа»",
operator_certificate="ЭВ-01/2020", base_airport="UMKK",
status="stored", registration_date="2010-08-05", fgis_id="FGIS-AC-004"),
]
if registration:
return [a for a in fleet if a.registration == registration]
return fleet
def _mock_certificates(self, registration: str = None) -> List[FGISCertificate]:
return [
FGISCertificate(certificate_number="СЛГ-001-2025", aircraft_registration="RA-89001",
certificate_type="standard", issue_date="2025-01-15",
expiry_date="2026-01-15", category="transport", status="valid",
fgis_id="FGIS-CRT-001"),
FGISCertificate(certificate_number="СЛГ-002-2025", aircraft_registration="RA-89002",
certificate_type="standard", issue_date="2025-03-20",
expiry_date="2026-03-20", category="transport", status="valid",
fgis_id="FGIS-CRT-002"),
FGISCertificate(certificate_number="СЛГ-003-2024", aircraft_registration="RA-73201",
certificate_type="standard", issue_date="2024-11-01",
expiry_date="2025-11-01", category="transport", status="expired",
fgis_id="FGIS-CRT-003"),
]
def _mock_operators(self) -> List[FGISOperator]:
return [
FGISOperator(certificate_number="ЭВ-01/2020", name="АО «КЛГ Авиа»",
inn="3906123456", ogrn="1023900000001",
issue_date="2020-01-01", expiry_date="2027-01-01",
aircraft_types=["SSJ-100", "Boeing 737-800", "Ми-8Т"],
fleet_count=4, status="active", fgis_id="FGIS-OP-001"),
]
def _mock_directives(self) -> List[FGISDirective]:
return [
FGISDirective(number="АД-2026-0012", title="Осмотр крепления крыла SSJ-100",
effective_date="2026-02-01", aircraft_types=["SSJ-100"],
ata_chapter="57", compliance_type="mandatory",
description="Обязательный осмотр крепления крыла к фюзеляжу по результатам СБ-100-57-0023",
fgis_id="FGIS-AD-001"),
FGISDirective(number="АД-2026-0015", title="Замена датчика угла атаки Boeing 737",
effective_date="2026-02-10", aircraft_types=["Boeing 737-800"],
ata_chapter="34", compliance_type="mandatory",
description="Замена датчика угла атаки P/N 0861FL1 по бюллетеню Boeing SB 737-34-1423",
fgis_id="FGIS-AD-002"),
]
def _mock_maint_orgs(self) -> List[FGISMaintOrg]:
return [
FGISMaintOrg(certificate_number="ТОиР-КЛГ-001", name="АО «КЛГ ТехСервис»",
approval_scope=["A1", "A2", "B1", "C6", "D1"],
issue_date="2023-06-01", expiry_date="2026-06-01",
status="active", fgis_id="FGIS-MO-001"),
]
# Singleton
fgis_client = FGISREVSClient()

View File

@ -1,17 +1,42 @@
# Гайд по безопасности проекта
# Руководство по безопасности проекта КЛГ АСУ ТК
## ⚠️ Критично
1. Никогда не коммитьте файлы .env в репозиторий
2. Не используйте eval() - риск code injection
3. Удаляйте console.log перед продакшеном
## 1. Секреты и переменные окружения
## Проверка перед коммитом
- Проверьте, что .env файлы в .gitignore
- Найдите и удалите все eval()
- Очистите debug логи (console.log)
- **Никогда не коммитьте** файлы `.env`, `.env.local`, `.env.production` и любые файлы с реальными ключами.
- Используйте **`.env.example`** как шаблон: копируйте в `.env` или `.env.local` и подставляйте значения только локально.
- В продакшене храните секреты в защищённом хранилище (переменные окружения платформы, vault), не в коде.
- В `.gitignore` добавлены: `.env`, `.env.*`, `!.env.example`, `certs/`, `*.pem`, `*.key`.
## Команды для проверки
```bash
grep -r "eval(" .
grep -r "console.log" .
```
## 2. Защита от XSS и инъекций
- Не используйте `dangerouslySetInnerHTML` с пользовательским вводом.
- Для инлайн-скриптов используйте компонент `next/script` (как в `app/layout.tsx`).
- Санитизация: для пользовательского контента используйте `DOMPurify` или аналог.
- **Не используйте `eval()`** — риск code injection.
## 3. CORS и API
- CORS настроен только на разрешённые домены. В продакшене запрещён wildcard `*`.
- Бэкенд: переменная `CORS_ORIGINS` — список доменов через запятую.
## 4. Авторизация
- В production отключите dev-обход: `ENABLE_DEV_AUTH=false` на бэкенде, не используйте `NEXT_PUBLIC_DEV_TOKEN` в проде.
- Токены и сессии передавайте по HTTPS; при необходимости используйте httpOnly cookies.
## 5. Логирование и отладка
- В коде используйте **logger** (`lib/logger.ts` на фронте, `logging` на бэкенде), а не `console.log`.
- ESLint: правило `no-console` в режиме error (допускаются только `console.warn` и `console.error`).
- Перед коммитом запускайте `npm run lint`. Pre-commit хук (Husky) может проверять линт и тесты.
## 6. Проверка перед коммитом
- Убедитесь, что `.env` файлы в `.gitignore` и не попадают в коммит.
- Проверьте отсутствие `eval()` в коде: `grep -r "eval(" .`
- Убедитесь, что отладочные логи вынесены в logger, а не в `console.log`.
## 7. Отчёт об уязвимостях
- Обнаруженные уязвимости сообщайте ответственным за безопасность (например, security@company.com или через приватный канал).
- Не создавайте публичные issue с описанием уязвимостей до их устранения.

View File

@ -1,3 +1,2 @@
// Stub: logger-client
export function logError(...a:any[]):any{return null}
export function logInfo(...a:any[]):any{return null}
/** Re-export для клиентских компонентов: используйте logger вместо console */
export { logInfo, logError, log, logWarn } from "./logger";

View File

@ -1,5 +1,38 @@
// Stub: logger
export function logAudit(...a:any[]):any{return null}
export function logError(...a:any[]):any{return null}
export function logSecurity(...a:any[]):any{return null}
export function logWarn(...a:any[]):any{return null}
/**
* Единый логгер приложения. Используйте вместо console.log.
* В production можно заменить вывод на отправку в систему мониторинга.
*/
const isDev = typeof process !== "undefined" && process.env?.NODE_ENV === "development";
function noop(): void {}
function devLog(level: string, ...args: unknown[]): void {
if (typeof console !== "undefined" && isDev) {
const fn = level === "error" ? console.error : level === "warn" ? console.warn : console.log;
fn(`[${level}]`, ...args);
}
}
export function log(...args: unknown[]): void {
devLog("log", ...args);
}
export function logInfo(...args: unknown[]): void {
devLog("info", ...args);
}
export function logAudit(...args: unknown[]): void {
devLog("audit", ...args);
}
export function logError(...args: unknown[]): void {
if (typeof console !== "undefined") console.error("[error]", ...args);
}
export function logSecurity(...args: unknown[]): void {
if (typeof console !== "undefined") console.warn("[security]", ...args);
}
export function logWarn(...args: unknown[]): void {
if (typeof console !== "undefined") console.warn("[warn]", ...args);
}