- Unify API: lib/api.ts uses /api/v1, inbox uses /api/inbox (rewrites) - Remove localhost refs: openapi, inbox page - Add rewrites: /api/inbox|tmc -> inbox-server, /api/v1 -> FastAPI - Add stub routes: knowledge/insights, recommendations, search, log-error - Transfer from PAPA: prompts (inspection, tmc), scripts, supabase, data/tmc-requests - Fix inbox-server: ORDER BY created_at, package.json - Remove redundant app/api/inbox/files route (rewrites handle it) - knowledge/ in gitignore (large PDFs) Co-authored-by: Cursor <cursoragent@cursor.com>
8.5 KiB
8.5 KiB
Streaming-First Архитектура
Обзор
Система использует streaming-first архитектуру для real-time обработки данных:
- Redpanda - современная замена Kafka
- Apache Flink - потоковая обработка
- RisingWave - streaming database
- FastAPI + Pydantic v2 - backend API
Архитектура
┌─────────────────────────────────────────────────────────┐
│ Application Layer │
│ (Next.js Frontend) │
└──────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ FastAPI Backend │
│ (Pydantic v2 Models) │
└──────────────────────┬────────────────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Redpanda │ │ Flink │ │ RisingWave │
│ (Events) │ │ (Processing) │ │ (Storage) │
└──────────────┘ └──────────────┘ └──────────────┘
Компоненты
Redpanda
Назначение: Потоковая платформа для событий
Использование:
- Публикация событий изменений данных
- Подписка на события для обработки
- Kafka-совместимый API
Примеры:
import { publishMessage } from '@/lib/streaming/redpanda-client';
// Публикация события
await publishMessage('aircraft-events', {
eventType: 'created',
aircraftId: 123,
registrationNumber: 'RA-12345',
timestamp: new Date(),
});
Apache Flink
Назначение: Потоковая обработка данных
Использование:
- Обработка событий в реальном времени
- Агрегация данных
- Окна времени (tumbling, sliding)
- Состояния (stateful processing)
Примеры:
# Flink SQL для агрегации
CREATE TABLE aircraft_events (
registration_number STRING,
event_type STRING,
timestamp TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'aircraft-events',
'format' = 'json'
);
# Агрегация по окнам
SELECT
registration_number,
COUNT(*) as event_count,
TUMBLE_START(timestamp, INTERVAL '1' HOUR) as window_start
FROM aircraft_events
GROUP BY registration_number, TUMBLE(timestamp, INTERVAL '1' HOUR);
RisingWave
Назначение: Streaming database для real-time аналитики
Использование:
- Materialized views на streaming данных
- Real-time запросы
- Источники из Redpanda
- Автоматическое обновление views
Примеры:
-- Создание source из Redpanda
CREATE SOURCE aircraft_events_source
WITH (
connector = 'kafka',
topic = 'aircraft-events',
properties.bootstrap.server = 'localhost:19092'
)
FORMAT PLAIN ENCODE JSON;
-- Создание materialized view
CREATE MATERIALIZED VIEW aircraft_stats AS
SELECT
registration_number,
COUNT(*) as total_events,
MAX(timestamp) as last_event
FROM aircraft_events_source
GROUP BY registration_number;
-- Запрос к view (real-time)
SELECT * FROM aircraft_stats;
FastAPI Backend
Назначение: REST API с Pydantic v2
Особенности:
- Type-safe модели данных
- Автоматическая валидация
- Async/await поддержка
- OpenAPI документация
Примеры:
from pydantic import BaseModel, Field
from fastapi import FastAPI
class AircraftCreate(BaseModel):
registration_number: str = Field(..., min_length=1)
aircraft_type: Optional[str] = None
@app.post("/aircraft")
async def create_aircraft(aircraft: AircraftCreate):
# Автоматическая валидация через Pydantic
return await create_aircraft_in_db(aircraft)
Поток данных
- Создание/обновление данных → FastAPI Backend
- Публикация события → Redpanda
- Обработка события → Apache Flink
- Сохранение результата → RisingWave
- Запрос данных → RisingWave Materialized Views
API Endpoints
Streaming
POST /api/v1/streaming/events- Публикация событияGET /api/v1/streaming/views/{view_name}- Получение данных из viewGET /api/v1/streaming/health- Проверка здоровья
Aircraft
GET /api/v1/aircraft- Список ВСPOST /api/v1/aircraft- Создание ВС (публикует событие)PUT /api/v1/aircraft/{id}- Обновление ВС (публикует событие)
Конфигурация
Переменные окружения
# Redpanda
REDPANDA_BROKERS=localhost:19092
REDPANDA_CLIENT_ID=klg-backend
# RisingWave
RISINGWAVE_URL=postgresql://root:risingwave@localhost:4566/dev
# Flink
FLINK_REST_URL=http://localhost:8081
Мониторинг
Redpanda
- Admin UI: http://localhost:19644
- Schema Registry: http://localhost:18081
Flink
- Web UI: http://localhost:8081
RisingWave
- Prometheus: http://localhost:5691/metrics
- Grafana: http://localhost:5692
Запуск
# Запуск всех сервисов
docker-compose up -d
# Запуск FastAPI backend
cd backend
uvicorn app.main:app --reload
# Запуск Flink job
flink run -c AircraftStreamingJob aircraft-streaming-job.jar
Примеры использования
Публикация события при создании ВС
# В FastAPI endpoint
@app.post("/aircraft")
async def create_aircraft(aircraft: AircraftCreate):
# Создание в БД
created = await db.create_aircraft(aircraft)
# Публикация события
await publish_event('aircraft-events', {
'event_type': 'created',
'aircraft_id': created.id,
'registration_number': created.registration_number,
'timestamp': datetime.now(),
})
return created
Обработка события во Flink
# Flink обрабатывает событие и агрегирует данные
# Результат сохраняется в RisingWave materialized view
Запрос real-time статистики
// Запрос к RisingWave materialized view
const stats = await queryMaterializedView('aircraft_stats', {
registration_number: 'RA-12345'
});
Преимущества
- Real-time обработка - данные обрабатываются мгновенно
- Масштабируемость - горизонтальное масштабирование
- Надёжность - гарантия доставки сообщений
- Производительность - оптимизированная обработка потоков
- Гибкость - легко добавлять новые обработчики
Troubleshooting
Redpanda не запускается
docker-compose logs redpanda
Flink job не запускается
Проверьте логи Flink: http://localhost:8081
RisingWave не подключается
Проверьте переменную окружения RISINGWAVE_URL