AI TitanRt — реактивный рантайм для реального времени (и не только HFT)

AI

Редактор
Регистрация
23 Август 2023
Сообщения
3 045
Лучшие ответы
0
Реакции
0
Баллы
51
Offline
#1


Сегодня решил собрать воедино то, что я знаю о разработке высокочастотных систем в связке с Rust. Из кусков кода, по сусекам, что называется, по репозиториям наскреб и склеил с помощью готопоты достойную к вашему вниманию либу. Библа позволит сохранить время всем, кто стремиться одновременно и к скорости, и гибкости. Планирую сам активно юзать, чтобы перейти со старой асинхронной торговой инфры в истинные треды с закосом под ультра. И развивать либу под брендом TitanRt. А если сообществу зайдет, так вообще мотивации прибавиться.

Итак, TitanRt - typed reactive runtime для построения реактивных, низколатентных систем на Rust.

Если упростить: это минималистичная основа для приложений, которые живут в цикле событий, где важны:


  • миллисекунды (а иногда и наносекунды)


  • предсказуемая обратная нагрузка (back-pressure)


  • чёткий контроль жизненного цикла модели и её воркеров


  • максимальная гибкость в области более высокоуровневой разработки


  • возможность прибить всё красиво и быстро, а не висеть на zombie-тредах
Зачем ещё один рантайм?


Rust уже имеет Tokio, Actix и прочих асинхронных гигантов. Но они решают другую задачу — высокоуровневый async/await, HTTP-сервисы, очереди.

TitanRt сделан для систем реального времени:


  • торговые движки (HFT/market making)


  • анализ рыночных потоков


  • телеметрия и алертинг


  • системы с жёстким контролем задержки

То есть там, где:


  • нельзя позволить себе лишний аллокатор


  • хочется контролировать ядро CPU, на котором работает поток


  • нужны строго типизированные каналы между моделью и воркерами
Архитектура


Вместо гигантского фреймворка TitanRt — это всего пара простых идей:


  1. Model-first:
    Ваша бизнес-логика = BaseModel.
    Она сама создаёт коннекторы и стримы, сама ими управляет, хотя не детерминирует это поведение. Вы можете управлять извне через Control Plane слой.


  2. Connector / Stream layer:
    Коннектор = фабрика стримов.
    Стрим = воркер-тред с типизированными каналами и вкуснятиной в виду StateCell<T>, где под капотом arc-swap. Со стримом можно общаться разными способами: через ваш типизированный Action, который вы отправляете через ваш типизированный Tx<Action>, а так же через типизированный hook, в который приходит сырое событие стрима, ваш Rx<Event> и StateCell<T> - здесь вы выбираете, что делать с данными, чтобы оркестрировать этим цикле модели.


  3. Runtime:
    Небольшой управляющий поток, который гоняет команды: Start, Stop,Restart,HotReload, Shutdown. Ну и саму модель, на которой вызывается ваш импл execute() или event() - если вы вдруг гоняете события извне рантайма.
    В общем и целом, рантайм не знает про ваши протоколы и бизнес-логику — только управляет жизненным циклом модели.

Не совсем детальная, но какая-никакая схема:

┌──────────────────────────┐
│ Runtime │
│ Start/Stop/Restart/... │
└───────────▲─────────────┘

┌─────────┴─────────┐
│ Model │
│ owns connectors │
└─────────┬─────────┘

┌───────────▼───────────┐
│ Connector(s) │
│ spawn Stream(s) │
└───────────┬───────────┘

Actions ───►│ │◄─── Events

Stream

Как это выглядит в коде


Минимальная модель:

use titanrt::model::{BaseModel, ExecutionResult, StopKind, StopState};
use titanrt::utils::CancelToken;
use anyhow::Result;

#[derive(Clone, Debug)]
struct MyConfig {
greeting: String,
}

#[derive(Clone, Debug)]
struct MyEvent(String);

#[derive(Clone)]
struct MyOutputTx; // no-op

impl titanrt::io::base::BaseTx for MyOutputTx {
type EventType = String;
fn try_send(&mut self, v: String) -> Result<(), titanrt::error::SendError<String>> {
println!("OUT: {v}");
Ok(())
}
fn send(
&mut self,
v: String,
_: &CancelToken,
_: Option<std::time::Duration>,
) -> Result<(), titanrt::error::SendError<String>> {
self.try_send(v)
}
}

struct MyModel {
cfg: MyConfig,
out: MyOutputTx,
}

impl BaseModel for MyModel {
type Config = MyConfig;
type OutputTx = MyOutputTx;
type Event = MyEvent;
type Ctx = ();

fn initialize(
_ctx: (),
config: MyConfig,
_core_id: Option<usize>,
output_tx: MyOutputTx,
_cancel: CancelToken,
) -> Result<Self> {
Ok(Self { cfg: config, out: output_tx })
}

fn execute(&mut self) -> ExecutionResult {
let _ = self.out.try_send(format!("{}!", self.cfg.greeting));
ExecutionResult::Relax
}

fn on_event(&mut self, event: MyEvent) {
let _ = self.out.try_send(format!("event: {}", event.0));
}

fn stop(&mut self, _kind: StopKind) -> StopState {
StopState::Done
}
}


А потом — просто запускаем:

fn main() -> Result<()> {
let cfg = titanrt::config::RuntimeConfig {
init_model_on_start: true,
core_id: None,
max_inputs_pending: Some(1024),
max_inputs_drain: Some(64),
stop_model_timeout: Some(5),
};

let rt = titanrt::runtime::Runtime::<MyModel>::spawn(
cfg,
NullModelCtx,
MyConfig { greeting: "Hello, TitanRt".into() },
MyOutputTx,
)?;

rt.run_blocking()?;
Ok(())
}

Чем это отличается от Tokio?


  • Без async/await: здесь честные треды, без скрытого планировщика. Тем не менее, планируется добавить опциональный токио рантайм из коробки.


  • CPU pinning: можно закрепить поток за ядром. Хочется в будущих версиях добавить настройку планировщика и приоритетность.


  • Typed I/O: никакого Any, всё компилируется статически. Строго.


  • Минимум зависимостей: crossbeam, arc-swap,ahash, ringbuf, немного serde.
Когда использовать TitanRt


  • Нужно максимально предсказуемое поведение под нагрузкой.


  • Ваш код живёт в бесконечном цикле: обработка стакана, телеметрии, сигналов.


  • Вы пишете HFT-движок, рынок данных, RT-алертинг или даже какую-нибудь игровую сетевую петлю.

А если нужен HTTP, gRPC, база и веб — берите Tokio/Actix, TitanRt не про это.

Репозиторий и документация

Итог


TitanRt — это скелет для реактивных систем, где важна латентность и контроль.
Вы пишете модель и коннекторы → модель создаёт коннекторы и стримы → стримы гоняют события → рантайм следит за жизненным циклом.

Просто, предсказуемо и типобезопасно.


Плюшка в виде простого yellowstone-grpc стрима:


pub struct CompositeConnector {
pub(crate) config: CompositeConfig,
pub(crate) cancel_token: CancelToken,
pub(crate) core_stats: Option<Arc<CoreStats>>,
}

impl BaseConnector for CompositeConnector {
type Config = CompositeConfig;

fn init(
config: Self::Config,
cancel_token: CancelToken,
reserved_core_ids: Option<Vec<usize>>,
) -> anyhow::Result<Self> {
let core_stats = if config.with_core_stats {
Some(CoreStats::new(
config.default_max_cores,
config.specific_cores.clone(),
reserved_core_ids.unwrap_or_default(),
)?)
} else {
None
};

Ok(Self {
config,
cancel_token,
core_stats,
})
}

fn name(&self) -> impl AsRef<str> + Display {
"CompositeConnector"
}

fn config(&self) -> &Self::Config {
&self.config
}

fn cancel_token(&self) -> &CancelToken {
&self.cancel_token
}

fn cores_stats(&self) -> Option<Arc<CoreStats>> {
self.core_stats.clone()
}
}

#[derive(Clone)]
pub enum GeyserAction {
Subscribe(SubscribeRequest),
UnsubscribeAll,
}

#[derive(Clone, Debug)]
pub struct GeyserGrpcDescriptor {
pub endpoint: String,
pub auth_token: Option<String>,
pub max_pending_actions: Option<usize>,
pub max_pending_events: Option<usize>,
pub core_pick_policy: CorePickPolicy,
pub subscription: Option<SubscribeRequest>,
}

impl GeyserGrpcDescriptor {
pub fn new(
endpoint: String,
auth_token: Option<String>,
max_pending_actions: Option<usize>,
max_pending_events: Option<usize>,
core_pick_policy: CorePickPolicy,
) -> Self {
Self {
endpoint,
auth_token,
max_pending_actions,
max_pending_events,
core_pick_policy,
subscription: None,
}
}

pub fn with_subscription(mut self, sub: SubscribeRequest) -> Self {
self.subscription = Some(sub);
self
}
}

impl StreamDescriptor for GeyserGrpcDescriptor {
fn venue(&self) -> impl Venue {
Venues::Solana
}

fn kind(&self) -> impl Kind {
"YellowstoneGrpc"
}

fn max_pending_actions(&self) -> Option<usize> {
self.max_pending_actions
}

fn max_pending_events(&self) -> Option<usize> {
self.max_pending_events
}

fn core_pick_policy(&self) -> Option<CorePickPolicy> {
Some(self.core_pick_policy)
}

fn health_at_start(&self) -> bool {
false
}
}

#[derive(Clone, Debug)]
pub enum GeyserEvent {
Raw(UpdateOneof),
}

impl<E, S> StreamSpawner<GeyserGrpcDescriptor, E, S> for CompositeConector
where
S: StateMarker,
E: BaseTx + TxPairExt,
{
}

impl<E, S> StreamRunner<GeyserGrpcDescriptor, E, S> for CompositeConector
where
S: StateMarker,
E: BaseTx,
{
type Config = ();
type ActionTx = RingSender<GeyserAction>;
type RawEvent = GeyserEvent;
type Hook = fn(&Self::RawEvent, &mut E, &StateCell<S>);

fn build_config(&mut self, _desc: &GeyserGrpcDescriptor) -> anyhow::Result<Self::Config> {
Ok(())
}

fn run(
mut ctx: RuntimeCtx<GeyserGrpcDescriptor, Self, E, S>,
hook: Self::Hook,
) -> StreamResult<()> {
// Однопоточный рантайм ТОЛЬКО внутри run (внешний API остаётся синхронным)
let rt = Builder::new_current_thread()
.enable_time()
.enable_io()
.build()
.map_err(|e| StreamError::Unknown(anyhow!(e)))?;

let mut rng = SmallRng::from_os_rng();
let mut backoff_ms: u64 = 50;
let backoff_max_ms: u64 = 5_000;
let backoff_mul: f64 = 1.7;

if !ctx.desc.health_at_start() {
ctx.health.set(true);
}

rt.block_on(async move {
'reconnect: loop {
if ctx.cancel.is_cancelled() { break Ok(()); }

if backoff_ms > 50 {
let j = rng.random_range(0..(backoff_ms / 5 + 1));
tokio::time::sleep(Duration::from_millis(j)).await;
}

let mut client = match GeyserGrpcClient::build_from_shared(ctx.desc.endpoint.clone())
.map_err(|e| StreamError::Unknown(anyhow!(e)))?
.x_token(ctx.desc.auth_token.clone())
.map_err(|e| StreamError::Unknown(anyhow!(e)))?
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|e| StreamError::Unknown(anyhow!(e)))?
.connect().await
{
Ok(c) => c,
Err(e) => {
tracing::warn!("grpc connect error: {e}");
let sleep = backoff_ms.min(backoff_max_ms);
tokio::time::sleep(Duration::from_millis(sleep)).await;
backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
continue 'reconnect;
}
};

let (mut tx, mut rx) = match client.subscribe().await {
Ok(x) => x,
Err(e) => {
tracing::warn!("subscribe call error: {e}");
let sleep = backoff_ms.min(backoff_max_ms);
tokio::time::sleep(Duration::from_millis(sleep)).await;
backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
continue 'reconnect;
}
};

if let Some(req) = ctx.desc.subscription.as_ref() {
if let Err(e) = tx.send(req.clone()).await {
tracing::warn!("send initial subreq: {e}");
}
}

ctx.health.set(true);

backoff_ms = 50;

let mut tick = tokio::time::interval(Duration::from_millis(1));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

'session: loop {
tokio::select! {
biased;

m = rx.next() => {
match m {
Some(Ok(update)) => {
match update.update_oneof {

Some(UpdateOneof::Ping(_)) => {
tracing::debug!("ping update");
let _ = tx.send(SubscribeRequest {
ping: Some(SubscribeRequestPing {id: 0}),
..Default::default()
}).await;
}
Some(u) => {
// А вот и хук из модели: модель сама решает, что делать
hook(&GeyserEvent::Raw(u), &mut ctx.event_tx, &ctx.state);
}
None => {}
}
}
Some(Err(status)) => {
tracing::warn!("grpc stream error: {status}");
ctx.health.set(false);
break 'session;
}
None => {
tracing::warn!("grpc stream ended");
ctx.health.set(false);
break 'session;
}
}
}

_ = tick.tick() => {
let mut sent = 0usize;

Проверим команды из модели
while let Ok(a) = ctx.action_rx.try_recv() {
match a {
GeyserAction::Subscribe(req) => {
match tx.send(req).await {
Ok(_) => {
sent += 1;
if sent >= 4096 { break; }
}
Err(e) => {
tracing::warn!("subreq send failed: {e}");
break 'session;
}
}
},
GeyserAction::UnsubscribeAll => {}
}
}
}
}
if ctx.cancel.is_cancelled() { break 'reconnect Ok(()); }
}

let sleep = backoff_ms.min(backoff_max_ms);
tokio::time::sleep(Duration::from_millis(sleep)).await;
backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
}
})?;

Ok(())
}
}


Ну что, делаем SolanaTxStream для отправки лидерам?
 
Сверху Снизу