AI [Перевод] Шаблоны проектирования поглощения данных-01: пакетная обработка, потоковая передача и CDC — практическое руководство

AI

Редактор
Регистрация
23 Август 2023
Сообщения
3 709
Лучшие ответы
0
Реакции
0
Баллы
243
Offline
#1
Привет, Хаброжители!

От выбора подхода к поглощению данных может зависеть успех или провал работы конвейера данных

Получив опыт создания конвейеров данных, которые ежедневно обрабатывают сотни миллионов записей, я понял, что именно на уровне поглощения данных решается успех или провал большинства проектов в области инженерии данных. Если здесь допустить ошибку, то придется месяцами бороться с проблемами, возникающими с производительностью, качеством данных и недовольством заинтересованных сторон. Если все сделать правильно, то ваш конвейер станет надежной основой для принятия важных бизнес-решений.

Сегодня я разберу три основных шаблона поглощения — пакетный, потоковый и захват изменений данных (CDC) — на материале реальных примеров, фрагментов кода и схем принятия решений, чтобы помочь вам сделать правильный выбор.

Решение о поглощении данных: дело не в технологии, а в требованиях

Прежде чем перейти к рассмотрению паттернов, давайте развеем распространенное заблуждение: выбор между пакетной обработкой, потоковой обработкой и CDC заключается не в том, что «лучше», а в том, что решает вашу конкретную проблему.

Я видел, как разрабатываются сложные потоковые конвейеры для данных, которые обновляются раз в день, а другие команды боролись с пакетной обработкой, когда пользователям требовалась информация практически в режиме реального времени. Я также видел, как команды выполняли полное извлечение таблиц, когда CDC было бы в 100 раз эффективнее. Все эти ошибки могут дорого обходиться.

Паттерн 1: Пакетная обработка – надежный рабочий инструмент 📦

Что такое пакетная обработка?

Пакетная обработка собирает и обрабатывает данные через определенные промежутки времени — ежечасно, ежедневно, еженедельно или по запросу. Представьте себе грузовик, который забирает посылки в установленное время и доставляет их все сразу. Когда использовать пакетную обработку

Идеально для:


  • Загрузки и заполнения исторических данных


  • Отчетности и аналитики с требованиями к ежедневной/ежечасной актуальности


  • Рабочих нагрузок, чувствительных к стоимости (пакетная обработка обычно в 5–10 раз дешевле потоковой)


  • Источников данных, которые обновляются периодически (ежедневные отчеты о продажах, ежемесячные финансовые данные)


  • Сценариев с высокой пропускной способностью, где допустимы небольшие задержки

    Не подходит для:


  • Дашбордов, работающих в режиме реального времени (требуется задержка менее одной минуты)


  • Обнаружения мошенничества или оповещения о безопасности


  • Мониторинга датчиков IoT


  • Отслеживания поведения пользователей в режиме реального времени

Ключевые особенности пакетной обработки

1. Срабатывания по времени: запуск по расписанию (cron-выражения, Airflow DAGs)

2. Высокая пропускная способность: возможность эффективной обработки миллионов записей за один раз

3. Простое восстановление: неудачные пакеты можно легко повторить или переработать

4. Предсказуемые затраты: ресурсы запускаются, обрабатывают данные и выключаются

Пример из реальной жизни: ежедневный анализ заказов в электронной коммерции

Допустим, вы создаете аналитический конвейер для платформы электронной коммерции. Заинтересованные стороны нуждаются в ежедневных отчетах о заказах, доходах и поведении клиентов, но им не нужны обновления в режиме реального времени.

Архитектура:



Лучшие практики пакетной обработки

1. Идемпотентность критически важна: ваши batch-задачи должны давать один и тот же результат при повторном запуске с теми же входными данными.

2. Внедрение контрольных точек и восстановления

3. Важность стратегии разбиения на разделы

4. Мониторинг производительности пакетов


Паттерн 2: Потоковая обработка — движок реального времени

Когда использовать потоковую обработку?

Потоковая обработка непрерывно обрабатывает данные по мере их поступления с минимальной задержкой. Представьте себе конвейерную ленту, по которой данные постоянно движутся и сразу же обрабатываются.

Когда использовать потоковую обработку

Идеально для:


  • Дашбордов и мониторинга в реальном времени


  • Обнаружения мошенничества и предупреждений о безопасности


  • Данных датчиков IoT и телеметрии устройств


  • Отслеживания активности пользователей и персонализации


  • Архитектур, управляемых событиями


  • Захвата изменений данных (CDC) из баз данных

    Не подходит для:


  • Загрузки больших объемов исторических данных (используйте пакетную обработку для заполнения)


  • Проектов с ограниченным бюджетом и нестрогими требованиями к задержке


  • Источников данных, которые обновляются нечасто

Ключевые особенности потоковой обработки

1. Событийно-ориентированный подход: обработка данных сразу после их поступления

2. Низкая задержка: время обработки от долей секунды до долей минуты

3. Отказоустойчивость: встроенная система контрольных точек и семантика «точно один раз»

4. Требует обработки с учетом состояния: управление окнами, агрегациями и дедупликацией

Пример из реальной жизни: потоковая передача телеметрических данных с устройств IoT

Представьте, что вы обрабатываете телеметрические данные с миллионов устройств IoT, которые каждые 30 секунд отправляют показатели работоспособности. Вам необходимо обнаруживать аномалии, запускать оповещения и обновлять панели мониторинга в режиме реального времени.

Архитектура:



Лучшие практики потоковой обработки

1. Всегда используйте контрольные точки

Контрольная точка обеспечивает однократную обработку и восстановление после сбоев:

# Хорошо: указано местоположение контрольной точки
query = stream.writeStream \
.option("checkpointLocation", "s3://checkpoints/my-stream/") \
.start()
# Плохо: нет контрольной точки — при перезапуске будет потеряно состояние
query = stream.writeStream.start() # ❌ Не делайте так!

2. Внедрение водяных знаков для запоздалых данных

Водяные знаки помогают управлять запоздалыми событиями и предотвращают неограниченный рост состояния:

# Обработка событий, поступающих с опозданием до 10 минут
stream_with_watermark = stream \
.withWatermark("event_timestamp", "10 minutes")

# Агрегации с водяными знаками автоматически удаляют старое состояние
aggregated = stream_with_watermark \
.groupBy(window("event_timestamp", "5 minutes"), "device_id") \
.agg(avg("metric").alias("avg_metric"))

3. Выберите правильный интервал запуска

# Микропакет (наиболее распространенный): обработка каждые 30 секунд
.trigger(processingTime="30 seconds")

# Непрерывные (низкая задержка): задержка ~1 мс, но ограниченные операции
.trigger(continuous="1 second")# Доступно сейчас: однократная обработка доступных данных (как пакетная)
.trigger(availableNow=True)# По умолчанию: обработка как можно быстрее (не рекомендуется для продуктива)
.trigger()

4. Мониторинг работоспособности потока

5. Работа со эволюцией схемы


Паттерн 3: Захват изменений данных (CDC) — интеллектуальная синхронизация 🔄

Что такое CDC?

Захват изменений данных (CDC) фиксирует только изменения (вставки, обновления, удаления) из исходных систем, а не извлекает целые наборы данных. Это похоже на получение сравнения изменений, а не на повторное чтение всей книги.

Когда использовать CDC

Идеально для:


  • Репликации баз данных в режиме, близком к реальному времени.


  • Минимизации нагрузки на исходные системы (особенно на продуктивные базы данных).


  • Отслеживания исторических изменений и ведения контрольных журналов


  • Синхронизации данных между операционными и аналитическими системами


  • Архитектуры с событийным источником данных


  • Сокращения затрат на передачу данных (перемещение только измененных данных)

    Не подходит для:


  • Первоначальной загрузки данных (используйте пакетную обработку для заполнения, затем переключитесь на CDC)


  • Источников без возможности отслеживания изменений


  • Простых файловых источников данных


  • Систем, в которых полное обновление дешевле, чем инкрементальное наполнение

Ключевые особенности CDC

1. Получение данных на основе журналов: считывает журналы транзакций базы данных (binlog, WAL, redo logs)

2. Инкрементальный характер: обрабатывает только изменения, а не полные таблицы

3. Сохраняет историю исходной системы: фиксирует тип операции (INSERT/UPDATE/DELETE)

Низкое воздействие на исходную систему: минимальная нагрузка на производительность продуктивных баз данных

Как работает CDC: три подхода

1. CDC на основе журналов (наиболее эффективный) 📝

Прямое чтение журналов транзакций базы данных без запроса таблиц:

# Пример: Debezium читает бинарный журнал MySQL
{
"before": null, # Данные до изменения (null для INSERT)
"after": { # Данные после изменения
"order_id": 12345,
"customer_id": 789,
"order_total": 149.99,
"status": "completed"
},
"op": "c", # Операция: c=создание, u=обновление, d=удаление
"ts_ms": 1704067200000,
"source": {
"db": "ecommerce",
"table": "orders"
}
}

Инстркменты: Debezium, AWS DMS, Oracle GoldenGate, Maxwell, Striim

Плюсы:

Плюсы:


  • Отсутствие нагрузки на исходную базу данных


  • Захват всех изменений в режиме реального времени


  • Отсутствие необходимости в изменении схемы

    Минусы:


  • Требуется доступ к журналу базы данных и его хранение


  • Сложная настройка и мониторинг


  • Различные форматы журналов для разных баз данных

    2. CDC на основе триггеров 🎯

    Триггеры базы данных записывают изменения в отдельные таблицы отслеживания т:

    Плюсы:


  • Проще в реализации


  • Работает с любой базой данных, поддерживающей триггеры


  • Возможна настройка кастомной логики

    Минусы:


  • Влияние на производительность исходной базы данных


  • Накладные расходы на обслуживание триггеров


  • Возможность сбоев тригерров

3. CDC на основе запросов (отслеживание временных меток/версий) 🕒

Отслеживает изменения с помощью столбцов временных меток или версий:

# Отслеживание последней обработанной временной метки
LAST_WATERMARK = "2024-01-15 10:00:00"
# З
query = f"""
SELECT *
FROM orders
WHERE updated_at > '{LAST_WATERMARK}'
ORDER BY updated_at
"""# После обработки обновляет водяной знак
NEW_WATERMARK = max(df['updated_at'])

Плюсы:


  • Простота реализации


  • Не требует специальных функций базы данных


  • Работает с большинством источников данных

    Минусы:


  • Не может распознавать удаления (если не существует столбец soft-delete)


  • Требует наличия столбца updated_at во всех таблицах


  • Проблемы с разницей во времени в распределенных системах


  • Пропускает изменения, если updated_at не изменяется

Пример из реальной жизни: синхронизация заказов в электронной коммерции с помощью CDC

Сценарий: синхронизация данных о заказах из продуктивной базы PostgreSQL с аналитическим озером данных с минимальной задержкой и минимальным воздействием на исходную систему.

Архитектура:



Лучшие практики CDC

1. Работа с эволюцией схемы: системы CDC должны корректно обрабатывать изменения исходной схемы:

2. Обработка эволюции схемы: системы CDC должны корректно обрабатывать изменения исходной схемы:

3. Мониторинг задержки CDC: отслеживание отставания конвейера CDC от источника:

4. Обработка начальной загрузки + переход на CDC: начало с пакетной загрузки, затем переход на CDC:

5. Идемпотентная обработка CDC: обеспечение того, что повторная обработка одних и тех же событий CDC дает одинаковые результаты

Размышления о производительности CDC

Оптимизация исходной базы данных:

-- Для CDC на основе журналов убедитесь, что срок хранения журналов достаточен
-- MySQL
SET GLOBAL binlog_expire_logs_seconds = 604800; -- 7 days

-- PostgreSQL
ALTER SYSTEM SET wal_keep_size = '10GB';

Пакетные события CDC:

# Обрабатывайте события CDC микропакетами для повышения эффективности
.trigger(processingTime="30 seconds") # Instead of continuous
.option("maxOffsetsPerTrigger", 10000) # Limit events per batch

Дедупликация событий CDC:

# Обработка дубликатов сообщений CDC (доставка «хотя бы один раз»)
deduplicated_cdc = parsed_cdc \
.withWatermark("change_timestamp", "5 minutes") \
.dropDuplicates(["order_id", "change_timestamp", "operation"])

Пакетная обработка, потоковая обработка и CDC: матрица принятия решений

Вот практичная схема для выбора между пакетной обработкой, потоковой обработкой и CDC:

Фактор

Batch

Streaming

CDC

Требование к задержке (latency)

Часы — дни​

Секунды — минуты​

Минуты — секунды​

Объём данных

Высокий (терабайты за пакет)​

Непрерывный поток (гигабайты в час)​

Только изменения (мегабайты в час)​

Нагрузка на источник данных

Высокая (полные сканирования)​

Н/Д (источник — события)​

Низкая (чтение логов)​

Стоимость

Ниже (в 5–10 раз дешевле)​

Выше (постоянно работающая инфраструктура)​

Средняя (обработка логов)​

Сложность

Проще (без состояния)​

Сложно (со состоянием)​

Умеренная (парсинг логов)​

Типовые сценарии

Отчётность, обучение ML​

Мониторинг, алерты​

Синхронизация БД, аудит​

Актуальность данных

По расписанию​

Почти в реальном времени​

Почти в реальном времени​

Обработка удалений

Да (полное обновление)​

Зависит от источника​

Да (если на основе логов)​

Первичная загрузка

Нативно поддерживается​

Требуется добавление данных​

Требуется сначала пакетная обработка​



Дерево принятия решения



Гибридный паттерн: лучшее из двух миров

На практике в большинстве продуктивных систем используются несколько паттернов одновременно:

Паттерн 1: Пакетная обработка + CDC (Наиболее распространенный)

Начальная загрузка: пакетная обработка (исторические данные, 2+ года)

Постоянная синхронизация: CDC (только ежедневные изменения)

Аналитика: беспрепятственный запрос обоих через Delta Lake

Пример использования: синхронизация продуктивной базы данных с хранилищем данных


  • День 1: пакетная загрузка 500 ГБ исторических заказов


  • День 2+: CDC фиксирует только измененные записи (~500 МБ/день)


  • Результат: в 1000 раз меньший объем перемещаемых данных, минимальное воздействие на исходную систему

Паттерн 2: CDC + Потоковые агрегации

База данных → CDC → Kafka → Потоковые агрегации → Дашборды в реальном времени

Bronze/Silver/Gold Delta Tables

Пример использования: аналитика заказов в реальном времени

Пакетный уровень: Исторические агрегации (ежедневно)
Потоковый уровень: Обновления в реальном времени (последние 24 часа)
Уровень обслуживания: Объединение обоих представлений для запросов

Паттерн 3: Пакетная обработка + Потоковая обработка (Архитектура Lambda)

Batch Layer: Исторические агрегаты (ежедневные)
Stream Layer: Потоковый слой: Реальные обновления (за последние 24 часа)
Serving Layer: Объединение обоих представлений для запросов

Пример использования: рекомендательные движки


  • Пакетный: Обучение моделей на основе исторического поведения пользователей (еженедельно)


  • Потоковый: Захват кликов в реальном времени для немедленной персонализации


  • Обслуживание: Объединение пакетных прогнозов + сигналов в реальном времени

    Вывод: выбирайте с умом, внедряйте грамотно

    Выбор между пакетной, потоковой и CDC-обработкой не является бинарным — это спектр, основанный на ваших конкретных требованиях:


  • Если вы не уверены, начните с пакетной обработки. Она проще, дешевле и решает 80 % случаев использования.


  • Добавьте CDC при инкрементной синхронизации баз данных — это в 10–100 раз эффективнее пакетного ввода для рабочих нагрузок с большим количеством изменений.


  • Переходите к потоковой обработке только в том случае, если требования к задержке менее одной минуты оправдывают дополнительную сложность и затраты.


  • Используйте гибридные подходы для комплексных платформ данных, которым требуется историческая полнота и аналитика в реальном времени.

Ключевые выводы

Пакетная обработка: запланированная, высокопроизводительная, экономичная — идеально подходит для аналитики и отчетности
Потоковая обработка: непрерывная, с низкой задержкой, сложная — необходима для мониторинга в реальном времени
CDC-обработка: инкрементная, эффективная, практически в реальном времени — идеально подходит для синхронизации баз данных
Гибридные паттерны: сочетание всех трех для комплексных платформ данных. (наиболее распространенные в практике)
Всегда внедряйте: идемпотентность, контроль, мониторинг и проверку качества данных
Оптимизируйте под свои конкретные требования к задержке, стоимости и сложности.

Справочник по выбору паттернов

1. Синхронизация базы данных объемом 100 ГБ, которая изменяется на 1 % ежедневно → CDC (на основе журналов)

2. Ежедневный отчет о продажах, допустимая задержка 24 часа → Пакетная обработка (по расписанию)

3. Данные датчиков IoT, требуются оповещения в режиме реального времени → Потоковая обработка (Kafka)

4. Финансовые транзакции, требуется аудиторский след → CDC (сохранение истории)

5. Большой исторический набор данных для обучения ML → Пакетная обработка (однократная загрузка)

6. Поток кликов пользователей для персонализации в реальном времени → Потоковая обработка (основанная на событиях)

7. Репликация базы данных с несколькими таблицами → CDC (Debezium + Kafka)

8. Данные API, обновляемые каждые 6 часов → Пакетная обработка (опрос API)

Помните: лучший паттерн поглощения — это тот, который соответствует вашим требованиям без излишней инженерной проработки. Я видел, как команды тратили месяцы на создание потоковых CDC-конвейеров для данных, которые обновляются раз в неделю, а другие боролись с ежедневными пакетными извлечениями, когда CDC могло бы сэкономить 95% времени и затрат на обработку.

Начните с простого, измеряйте все и развивайте свою архитектуру по мере изменения требований.

Каков ваш опыт в использовании этих моделей поглощения? Сталкивались ли вы с трудностями при внедрении какой-либо из них? Поделитесь своими мыслями в комментариях!
 
Яндекс.Метрика Рейтинг@Mail.ru
Сверху Снизу