- Регистрация
- 23 Август 2023
- Сообщения
- 3 600
- Лучшие ответы
- 0
- Реакции
- 0
- Баллы
- 243
Offline
До того как я начал кодить, я думал, что у разработчиков на любую задачу есть какое-то одно решение — бест-практика. Достаточно почитать SO или, вот теперь, пообщаться с нейронкой и — вуаля — задача сделана. Но, начав погружаться в литературу по дизайну систем, я стал чаще встречать тему про компромиссы. У задач нет единственных способов решения. Сейчас я рад, что на практике стали попадаться ситуации, где нет одного решения, но есть цели и ограничения.
Задача
Недавно в одной задаче нужно было построчно валидировать XLSX-файл и результат записывать в БД. Делать вставку построчно — может быть дорого. Если в файле тысячи строк, нужно будет делать тысячи insert-ов. Это было моим mvp-решением в виду простоты.
Файлы у нас 100 - 20 000 строк. Тестовый стенд у нас с виртуалками на 8 ГБ оперативы. дальше я его решил отрефачить реализацию на вставку батчами — пачками данных, по 500–1000 строк за транзакцию. Таких транзакций может быть пара десятков Postgres и стенд спокойно это переварят.
Пара нюансов про него и код ниже:
Я не архитектор и не разработчик. Я пишу утилиты для моей команды QA.
В примерах опускаю работу с библиотекой openpyxl или pandas. Это де-факто стандарты на Python для работы с XLSX, но в посте они роли не играют.
Процедуру обработки батча можно делать через генераторы с yield. В них, кроме фикстур для авто-тестов, я не очень силён, поэтому не использую.
Код в примерах синхронный, так как он выполняется внутри джобы воркера библиотеки rq, а она синхронная.
upd. в комментариях справедливо указывают, что батч-метод хрупкий: одна невалидная строка в икселе или проблема с ключем при вставке и вставка пачки ломается. Мы его все же в статье рассмотрим, это тейку про «всему свое место и время» наоборот даже на пользу.
Итак, наполняем батч:
batch: list[dict[str, Any]] = []
for sheet_name, rows in data.items(): # вытаскиваем название листа и строки файла
for row in rows: # итерируемся по каждой строке
insert_obj = FileRowsInsertSchema(
job_id=job_id,
sheet_name=sheet_name,
row_num=row.row_num,
row_data=row.row_data,
status=row.row_status,
errors=row.errors,
)
batch.append(insert_obj.model_dump()) # собираем список отвалидированных словарей для вставки
Из данных XLSX-файла берём листы и строки каждого листа, кастим строки в Pydantic-модель и дампаем их в словари. Получаем список словарей, готовый к вставке в таблицу БД.
Дилемма
Дальше я столкнулся с интересной дилеммой: в какой момент формировать батч на вставку? Основных вариантов было два: во время транзакции и до. Сначала я сделал первый.
Вариант один: инсертить чанками внутри транзакции
Открываем транзакцию
Наполняем список отвалидированными данными
По мере наполнения чанка, вставляем его в БД
Очищаем батч и повторяем для следующе��о чанка
После завершения цикла не забываем вставить остаток, размер которого меньше чанка
try:
with SessionLocal() as db, db.begin():
# помечаем джоб как VALIDATING
stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id)
job = db.execute(stmt).scalar_one_or_none()
job.status = JobStatusEnum.VALIDATING.value
# наполняем батч внутри транзакции
batch: list[dict[str, Any]] = []
for sheet_name, rows in data.items():
for row in rows:
insert_obj = FileRowsInsertSchema(
job_id=job_id,
sheet_name=sheet_name,
row_num=row.row_num,
row_data=row.row_data,
status=row.row_status,
errors=row.errors,
)
batch.append(insert_obj.model_dump())
# как только батч наполнен, вставляем в БД и чистим батч
if len(batch) >= config.db.batch_size:
db.execute(insert(FileRowsModel).values(batch))
batch.clear()
# когда чанки закончились, заливаем в БД остаток
if batch:
db.execute(insert(FileRowsModel).values(batch))
except SQLAlchemyError as e:
pass
Плюсы. Во-первых, он простой и читаемый. Это то, как мне без опыта создания чанковой вставки этот алгоритм и пришел в голову. Во-вторых, у способа низкий риск out-of-memory: мы управляем размером списка, регулярно его очищаем. Подходит для нод с ограниченными ресурсами, например для тестовых стендов.
Минус — потенциально долгая транзакция: мы делаем валидацию, сборку батча и проверки внутри одной транзакции, держим локи и соединение с БД дольше. А также смешение зон ответственности. Код транзакции отвечает за валидацию данных.
К этому варианту пришел пообщавшись с нейронкой.
Собираем полный список данных заранее
Открываем транзакцию
Внутри неё нарезаем чанки и вставляем в БД
try:
# собираем batch до транзакции (код выше)
# ...
with SessionLocal() as db, db.begin(): # открываем сессию и транзакцию
stmt = select(FileJobsModel).where(FileJobsModel.job_id == job_id)
job = db.execute(stmt).scalar_one_or_none()
job.status = JobStatusEnum.VALIDATING.value
# вставка чанков
chunk_size = config.db.chunk_size # размер чанка из конфига
for i in range(0, len(batch), chunk_size):
chunk = batch[i : i + chunk_size]
db.execute(insert(FileRowsModel).values(chunk))
except SQLAlchemyError as e:
pass
Плюс — разделение зон ответственности: сбор и валидация данных выполняются до транзакции, код взаимодействия с БД только нарезает и инсертит. Транзакции короче, локи и конкуренция за коннекты ниже.
Минус — память O. Если файл содержит десятки тысяч строк и много столбцов/листов, на слабой ноде может случиться OOM. Нужны надёжные политики ретраев, хелс-чеки и обработка рестартов воркера.
На проекте у меня файлы будут на несколько тысяч строк и сервис выполняет функцию утилиты для тестирования. А вот коннекты до БД могут быть нужны другим ребятам на стенде, сервис выполняет и другие функции. Поэтому второй вариант с точки зрения чистоты реализации и достаточных ресурсов обработать список в памяти подошел мне больше. Остановился на нем.
Итого
На вскидку можно предложить и третий, гибридный вариант: наполняем батч, достигаем размера чанка, открываем тразакцию, коммитим, повторяем. А если углубляться в возможности драйверов SQLAlchemy и Postgres, то можно еще и делать массовую вставку с помощью COPY из отвалидированного временного CSV или использовать bulk_insert_mappings. Но так глубоко я не копал и моя задача этого не требует. В общем, вариантов решения задачи много.
Примеры выше — это только те решения, на которые хватило моего опыта. У меня не высоконагруженный сервис. Оба варианта реализации работают. Но если бы я сталкивался с нефункциональными требованиями по железу, времени отклика, огромными данными, конкуренцией за ресурсы, масштабированием, роллбэками, кэшированием, идемпотентностью и прочим, компромиссы становились бы еще весомее.
Тема компромиссов меня завораживает. Мне нравится, что в работе не приходится мыслить чёрно-белым, квадратным. Задачи поощряют думать над следующим шагом, аргументировать выбор. Что-то мы закладываем в наши решения на старте, на что-то реагируем, что-то делаем для души. С послед��твиями живем мы, наш проект, наши юзеры. Аргументы в пользу решений документируем с помощью ADR, о них спорим, их отстаиваем. В сухом остатке, наше решение – это не выбор правильного против неправильного. Это просто выбор.