klg-asutk-app/docs/STREAMING_ARCHITECTURE.md
Yuriy 0150aba4f5 Consolidation: KLG ASUTK + PAPA integration
- Unify API: lib/api.ts uses /api/v1, inbox uses /api/inbox (rewrites)
- Remove localhost refs: openapi, inbox page
- Add rewrites: /api/inbox|tmc -> inbox-server, /api/v1 -> FastAPI
- Add stub routes: knowledge/insights, recommendations, search, log-error
- Transfer from PAPA: prompts (inspection, tmc), scripts, supabase, data/tmc-requests
- Fix inbox-server: ORDER BY created_at, package.json
- Remove redundant app/api/inbox/files route (rewrites handle it)
- knowledge/ in gitignore (large PDFs)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-08 17:18:31 +03:00

8.5 KiB
Raw Permalink Blame History

Streaming-First Архитектура

Обзор

Система использует streaming-first архитектуру для real-time обработки данных:

  • Redpanda - современная замена Kafka
  • Apache Flink - потоковая обработка
  • RisingWave - streaming database
  • FastAPI + Pydantic v2 - backend API

Архитектура

┌─────────────────────────────────────────────────────────┐
│                    Application Layer                      │
│              (Next.js Frontend)                          │
└──────────────────────┬────────────────────────────────────┘
                       │
                       ▼
┌─────────────────────────────────────────────────────────┐
│                  FastAPI Backend                          │
│              (Pydantic v2 Models)                         │
└──────────────────────┬────────────────────────────────────┘
                       │
        ┌──────────────┼──────────────┐
        │              │              │
        ▼              ▼              ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│  Redpanda    │ │    Flink     │ │ RisingWave   │
│  (Events)    │ │ (Processing) │ │  (Storage)   │
└──────────────┘ └──────────────┘ └──────────────┘

Компоненты

Redpanda

Назначение: Потоковая платформа для событий

Использование:

  • Публикация событий изменений данных
  • Подписка на события для обработки
  • Kafka-совместимый API

Примеры:

import { publishMessage } from '@/lib/streaming/redpanda-client';

// Публикация события
await publishMessage('aircraft-events', {
  eventType: 'created',
  aircraftId: 123,
  registrationNumber: 'RA-12345',
  timestamp: new Date(),
});

Назначение: Потоковая обработка данных

Использование:

  • Обработка событий в реальном времени
  • Агрегация данных
  • Окна времени (tumbling, sliding)
  • Состояния (stateful processing)

Примеры:

# Flink SQL для агрегации
CREATE TABLE aircraft_events (
    registration_number STRING,
    event_type STRING,
    timestamp TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'aircraft-events',
    'format' = 'json'
);

# Агрегация по окнам
SELECT 
    registration_number,
    COUNT(*) as event_count,
    TUMBLE_START(timestamp, INTERVAL '1' HOUR) as window_start
FROM aircraft_events
GROUP BY registration_number, TUMBLE(timestamp, INTERVAL '1' HOUR);

RisingWave

Назначение: Streaming database для real-time аналитики

Использование:

  • Materialized views на streaming данных
  • Real-time запросы
  • Источники из Redpanda
  • Автоматическое обновление views

Примеры:

-- Создание source из Redpanda
CREATE SOURCE aircraft_events_source
WITH (
    connector = 'kafka',
    topic = 'aircraft-events',
    properties.bootstrap.server = 'localhost:19092'
)
FORMAT PLAIN ENCODE JSON;

-- Создание materialized view
CREATE MATERIALIZED VIEW aircraft_stats AS
SELECT 
    registration_number,
    COUNT(*) as total_events,
    MAX(timestamp) as last_event
FROM aircraft_events_source
GROUP BY registration_number;

-- Запрос к view (real-time)
SELECT * FROM aircraft_stats;

FastAPI Backend

Назначение: REST API с Pydantic v2

Особенности:

  • Type-safe модели данных
  • Автоматическая валидация
  • Async/await поддержка
  • OpenAPI документация

Примеры:

from pydantic import BaseModel, Field
from fastapi import FastAPI

class AircraftCreate(BaseModel):
    registration_number: str = Field(..., min_length=1)
    aircraft_type: Optional[str] = None

@app.post("/aircraft")
async def create_aircraft(aircraft: AircraftCreate):
    # Автоматическая валидация через Pydantic
    return await create_aircraft_in_db(aircraft)

Поток данных

  1. Создание/обновление данных → FastAPI Backend
  2. Публикация события → Redpanda
  3. Обработка события → Apache Flink
  4. Сохранение результата → RisingWave
  5. Запрос данных → RisingWave Materialized Views

API Endpoints

Streaming

  • POST /api/v1/streaming/events - Публикация события
  • GET /api/v1/streaming/views/{view_name} - Получение данных из view
  • GET /api/v1/streaming/health - Проверка здоровья

Aircraft

  • GET /api/v1/aircraft - Список ВС
  • POST /api/v1/aircraft - Создание ВС (публикует событие)
  • PUT /api/v1/aircraft/{id} - Обновление ВС (публикует событие)

Конфигурация

Переменные окружения

# Redpanda
REDPANDA_BROKERS=localhost:19092
REDPANDA_CLIENT_ID=klg-backend

# RisingWave
RISINGWAVE_URL=postgresql://root:risingwave@localhost:4566/dev

# Flink
FLINK_REST_URL=http://localhost:8081

Мониторинг

Redpanda

RisingWave

Запуск

# Запуск всех сервисов
docker-compose up -d

# Запуск FastAPI backend
cd backend
uvicorn app.main:app --reload

# Запуск Flink job
flink run -c AircraftStreamingJob aircraft-streaming-job.jar

Примеры использования

Публикация события при создании ВС

# В FastAPI endpoint
@app.post("/aircraft")
async def create_aircraft(aircraft: AircraftCreate):
    # Создание в БД
    created = await db.create_aircraft(aircraft)
    
    # Публикация события
    await publish_event('aircraft-events', {
        'event_type': 'created',
        'aircraft_id': created.id,
        'registration_number': created.registration_number,
        'timestamp': datetime.now(),
    })
    
    return created
# Flink обрабатывает событие и агрегирует данные
# Результат сохраняется в RisingWave materialized view

Запрос real-time статистики

// Запрос к RisingWave materialized view
const stats = await queryMaterializedView('aircraft_stats', {
  registration_number: 'RA-12345'
});

Преимущества

  1. Real-time обработка - данные обрабатываются мгновенно
  2. Масштабируемость - горизонтальное масштабирование
  3. Надёжность - гарантия доставки сообщений
  4. Производительность - оптимизированная обработка потоков
  5. Гибкость - легко добавлять новые обработчики

Troubleshooting

Redpanda не запускается

docker-compose logs redpanda

Проверьте логи Flink: http://localhost:8081

RisingWave не подключается

Проверьте переменную окружения RISINGWAVE_URL