AI Сервис очереди, или как подогнать код под все случаи

AI

Редактор
Регистрация
23 Август 2023
Сообщения
2 819
Лучшие ответы
0
Реакции
0
Баллы
51
Offline
#1
Привет! С вами снова писатель-программист из компании Simpl Group (да, без e).
Совсем недавно я выступала на нашем внутреннем Meet Up — уже 6-м, между прочим, — и рассказала своим коллегам занимательную историю, которую поведаю сегодня и вам. Не про ведьм и демонов, конечно, как в моей книге. А про цирк — цифровой цирк, в котором задачи прыгают через обручи, катаются на велосипедах и не падают.
Или, по крайней мере, мы стараемся, чтобы не падали.

(К слову, книгу тоже можете почитать: "Пороки", Ingini)

Представим ситуацию


У вас есть очередь, которая умеет выполнять только один трюк — например, отправлять задачи в расчёт. Всё, как в старом цирке: один артист, один номер.

Но однажды появляется новая задача. И за ней ещё одна. И вот уже зрители хотят видеть не только прыжки через обруч, но и медведя на велосипеде, жонглёров, акробатов и даже кибердракона с машинным обучением.

Значит, мы должны перестроить всё шоу. Так, чтобы:


  • Добавлять новых артистов минимальными усилиями;


  • Не перестраивать манеж каждый раз;


  • Всё работало: надёжно, масштабируемо и эффектно.
Кто в нашем цирке — участники шоу


Представим, что у нас есть две задачи:
Одна — это лошадки, прыгающие через обруч;
Вторая — это мишка на велосипеде, проезжающий то в одну, то в другую сторону.

Что есть общего в этих двух выступлениях?


  • Контроллеры — дрессировщики, подающиеся голосом: "Вперёд!".


  • Репозитории — реквизиторы, вытаскивающие снаряжение из склада (БД).


  • Очереди — манеж, где артисты ждут, пока их объявят.


  • Экзекьютор — тёмный коридор, ведущий от кулис к свету рампы.


  • Менеджеры — двери между закулисьем и сценой.


  • Другие сервисы — собственно сцена, где и происходит номер.

Но есть нюанс:


  • Лошадки прыгают по одной.


  • Медведи катаются по трое.


  • Кто-то выходит через Kafka, кто-то — через HTTP.

По сути мы имеем парочку небольших различий, для которых будет неверно создавать второй сервис или писать почти аналогичный код.
Поэтому представим, что кулисы и дрессировщики у мишек и лошадок одни и те же, а двери, актеры и сцены разные.

Как устроен наш манеж — Архитектура


Теперь, когда мы поняли, кто у нас выступает, давайте посмотрим, как работает наш цирк изнутри. Что там в кулисах, и почему никто не спотыкается?
Идея проста:
Сделать единый цирк, куда можно легко впустить любого нового артиста: хоть медведя, хоть жонглёра, ну и да, кибердракона тоже.

В базе


У каждой задачи есть два слоя костюма:


  • Общий: ID, тип, статус, время постановки, ошибки.


  • Специфичный: параметры конкретного артиста.

Получается:

task -- общий склад задач
task_{taskType}_parameters -- гардероб для костюмов
В коде


Самое главное наше оружие — абстрактный дженерик класс на всё, что скорее всего будет использовано не только для одного типа задач.

Покажу вам, как может выглядеть примерный код.

1. Модельки


public interface ITaskParameters { }
public interface ITaskDto { }

public record TaskOneParameters(int Value) : ITaskParameters;
public record TaskTwoParameters(string Data) : ITaskParameters;

public record TaskOneDto(int Value) : ITaskDto;
public record TaskTwoDto(string Data) : ITaskDto;

public class QueueTask where TParam : ITaskParameters
{
public QueueTask(TParam parameters)
{
Parameters = parameters;
TaskInfo = new QueueTaskInfo();
}

public TParam Parameters { get; }
public QueueTaskInfo TaskInfo { get; }
}

public class QueueTaskInfo
{
public Guid Id { get; set; }
public DateTime QueueTime { get; set; }
public QueueTaskStatus Status { get; set; }
public QueueTaskType Type { get; set; }
}

public enum QueueTaskStatus
{
ReadyForExecution,
InProgress,
Completed,
Failed
}

public enum QueueTaskType
{
TaskOne,
TaskTwo
}
2. Контроллеры



/// <summary>
/// Базовый контроллер для постановки задач в очередь
/// </summary>
[Route("api/[controller]")]
[ApiController]
public abstract class AbstractQueueTasksController : ControllerBase
where TParam : ITaskParameters
{
protected AbstractQueueTasksController(IMediator mediator)
{
_mediator = mediator;
}

protected IMediator _mediator { get; }

/// <summary>
/// Общий метод для всех типов задач
/// </summary>
[HttpGet("GetTasks")]
public Task<...> GetAsync(CancellationToken cancellationToken = default)
{
return _mediator.Send(new AbstractGetQueueTasksRequest(), cancellationToken);
}
}

/// <summary>
/// Контроллер для задач типа "TaskOne"
/// </summary>
public class TaskOneController : AbstractQueueTasksController
{
public TaskOneController(IMediator mediator) : base(mediator) { }

/// <summary>
/// Постановка задачи, которая пришла из другого сервиса, а значит дажнные уже обработаны
/// </summary>
[HttpPost("Enqueue")]
public Task EnqueueAsync(TaskOneDto dto, CancellationToken cancellationToken = default)
{
return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);
}
}

/// <summary>
/// Контроллер для задач типа "TaskTwo"
/// </summary>
public class TaskTwoController : AbstractQueueTasksController
{
public TaskTwoController(IMediator mediator) : base(mediator) { }

/// <summary>
/// Постановка задачи, которая пришла с фронта
/// </summary>
[HttpPost("Enqueue")]
public Task EnqueueAsync(TaskTwoDto dto, CancellationToken cancellationToken = default)
{
... // тут какая-то обратка и валидация данных
return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);
}
}
3. Команда и обработчик



/// <summary>
/// Команда постановки задачи в очередь
/// </summary>
public class EnqueueTaskCommand : IRequest
where TDto : ITaskDto
{
public EnqueueTaskCommand(TDto dto) => TaskDto = dto;

public TDto TaskDto { get; }
}

/// <summary>
/// Базовый обработчик постановки задач
/// </summary>
public abstract class EnqueueTaskCommandHandler : IRequestHandler&gt;
where TParam : ITaskParameters
where TDto : ITaskDto
{
private readonly AbstractDataflowQueue _queue;

protected EnqueueTaskCommandHandler(AbstractDataflowQueue queue)
{
_queue = queue;
}

public async Task Handle(EnqueueTaskCommand request, CancellationToken cancellationToken)
{
if (request is null)
throw new ArgumentNullException(nameof(request));

var param = Map(request.TaskDto);
await _queue.EnqueueAsync(param, cancellationToken);
}

/// <summary>
/// Просто какая-то работа с данными
/// </summary>
protected abstract TParam Map(TDto dto);
}
4. Очередь


public abstract class AbstractDataflowQueue
where TParam : ITaskParameters
{
private readonly SemaphoreSlim _locker = new(1, 1); // Нужен для защиты от одновременной постановки нескольких задач

protected AbstractQueueTaskRepository _repository { get; }
protected AbstractBackgroundExecutingTask _executor { get; }

protected AbstractDataflowQueue(
AbstractBackgroundExecutingTask executor,
AbstractQueueTaskRepository repository)
{
_executor = executor;
_repository = repository;
}

public async Task EnqueueAsync(QueueTask item, CancellationToken cancellationToken = default)
{
if (item is null)
throw new ArgumentNullException(nameof(item));

await _locker.WaitAsync(cancellationToken);
try
{
item.TaskInfo.QueueTime = DateTime.Now;
item.TaskInfo.Status = QueueTaskStatus.ReadyForExecution;

await _repository.SaveAsync(item, cancellationToken);
await _executor.TrySendQueueTask(item.Id);
}
finally
{
_locker.Release();
}
}
}
5. Экзекьютер


/// <summary>
/// Базовый экзекьютер: достаёт задачу из очереди и отправляет её в менеджер
/// </summary>
public abstract class AbstractBackgroundExecutingTask
where TParam : ITaskParameters
{
protected AbstractBackgroundExecutingTask(
IManager manager,
AbstractQueueTaskRepository repository,
int defaultMaxParallelism = 1)
{
_manager = manager;
_repository = repository;

var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = defaultMaxParallelism,
BoundedCapacity = DataflowBlockOptions.Unbounded
};

_block = new ActionBlock(HandleAsync, options);
}

protected IManager _manager { get; }
protected AbstractQueueTaskRepository _repository { get; }
protected ActionBlock _block { get; }

public bool TrySendQueueTask(Guid taskId)
{
return _block.Post(taskId);
}

private async Task HandleAsync(Guid taskId)
{
var task = await _repository.GetTask(taskId);
if (task == null) return;

task.TaskInfo.Status = QueueTaskStatus.InProgress;
await _manager.TransferTask(task);

task.TaskInfo.Status = QueueTaskStatus.Completed;
await _repository.UpdateAsync(task);
}
}

/// <summary>
/// Лошадки прыгают по одной
/// </summary>
public class TaskOneExecutor : AbstractBackgroundExecutingTask
{
public TaskOneExecutor(
IManager manager,
AbstractQueueTaskRepository repository)
: base(manager, repository, defaultMaxParallelism: 1) { }
}

/// <summary>
/// Медведи катаются втроём
/// </summary>
public class TaskTwoExecutor : AbstractBackgroundExecutingTask
{
public TaskTwoExecutor(
IManager manager,
AbstractQueueTaskRepository repository)
: base(manager, repository, defaultMaxParallelism: 3) { }
}
6. Репозиторий


public abstract class AbstractQueueTaskRepository
where TParam : ITaskParameters
{
// Простое хранилище в памяти
protected readonly Dictionary<Guid, QueueTask<TParam>> _storage = new();

public virtual Task SaveAsync(QueueTask task, CancellationToken cancellationToken = default)
{
_storage[task.TaskInfo.Id] = task;
return Task.CompletedTask;
}

public virtual Task UpdateAsync(QueueTask task, CancellationToken cancellationToken = default)
{
if (_storage.ContainsKey(task.TaskInfo.Id))
{
_storage[task.TaskInfo.Id] = task;
}
return Task.CompletedTask;
}

public virtual QueueTask? GetTask(Guid taskId)
{
_storage.TryGetValue(taskId, out var task);
return task;
}

...
}

+реализации, сохранение в бд и другая логика

7. Менеджеры


public interface IManager
where TParam : ITaskParameters
{
Task TransferTask(QueueTask task);
}

/// <summary>
/// Тут у нас кафка
/// </summary>
public class TaskOneManager : IManager
{
private readonly ITaskOneProducer _producer;
private readonly ITaskOneConsumer _consumer;

public TaskOneManager(
ITaskOneProducer producer,
ITaskOneConsumer consumer)
{
_producer = producer;
_consumer = consumer;
}

public async Task TransferTask(QueueTask queueTask)
{
// Отправка задачи через продюсера
await _producer.PublishAsync(queueTask);

// Ожидаем результат через консюмера
await _consumer.GetResult(queueTask.TaskInfo.Id);
}
}

/// <summary>
/// Тут у нас Refit клиент
/// </summary>
public class TaskTwoManager : IManager
{
private readonly ITaskTwoClient _client;

public TaskTwoManager(ITaskTwoClient client)
{
_client = client;
}

public async Task TransferTask(QueueTask task)
{
await _client.SendTaskTwoAsync(task);
}
}

Разумеется, код самый примитивный, который просто показывает, как можно сделать.

И не забудьте зарегистрировать реализации как синглтон объекты (иначе вся ваша очередь потеряется). Только Менеджеры можно сделать Transient.

Итоговая архитектура:


Как бы мы приручили разношёрстных артистов — расширяемость


Теперь представим, что завтра к нам заходят:


  • Слоны, которые будут делать запросы по SOAP.


  • Пингвины, которые будут танцевать параллельно в 10 потоков. Наша архитектура говорит: "Да не вопрос". Вот как мы добавляем нового зверя в наш цирк:

В базу:


  • Новая таблица параметров.

В код:


  • Реализация абстрактного контроллера, необходимых команд и запросов, модельки


  • Реализация репозитория, очереди, экзекьютора и менеджера.


  • DI-регистрация в Program.cs. (то есть подписываем, что наши животные могут пользоваться любыми нашими рельсами)

И всё. Весь путь — по накатанной. Никто не мешает мишкам, лошадям и слонам выступать одновременно.

Слова автора


Спасибо большое, что прочитали статью мини-мидла. Надеюсь, что вам понравились метафоры) Моим коллегам на Meet Up очень понравились! А там, между прочим, не только разработчики были, но и аналитики, тестеры и даже медийщики!
Если вам интересны такие мероприятия, то заглядывайте к нам. Возможно мы даже скоро выйдем на более глобальный уровень с нашим митапом)

Ну и, конечно же, если вам есть что сказать, то милости прошу в комментарии. Я всегда рада конструктивной критике!
 
Сверху Снизу