Решил я как-то выгрузить пару лет переписки из Telegram в Apache Doris на своем компе. Зачем? Я тестирую Doris как единую систему хранения и поиска по всем личным данным: сообщениям из мессенджеров, ChatGPT, записям встреч и почте.
Первый запуск был болезненным: загрузка занимала почти 2 часа. После небольшой оптимизации пайплайна тот же сценарий дал другой результат: 206 400 сообщений за 5 секунд, то есть примерно 41 280 сообщений в секунду. Для контекста: каждое сообщение я грузил как JSON-массив.
В этот момент я подумал: «Окей, а что именно внутри Doris делает такую разницу между “2 часа” и “5 секунд”?».
И полез в исходники.
Но моего знания C++ не хватило, поэтому ассистировал мне GPT-5.3 Codex, Gemini и Opus 4.6
Часть 1: А что вообще такое Stream Load?
HTTP PUT вместо SQL, и почему это принципиально
В моей школьности нас учили загружать данные в базу через SQL.
Пишешь INSERT INTO ... VALUES(...), жмёшь Enter, ждёшь. Или, если данных много, используешь LOAD DATA INFILE или какой-нибудь bulk insert.
В Apache Doris есть другой путь: Stream Load. Это когда вы отправляете данные прямо через HTTP PUT как будто загружаете файл на сервер. Без SQL. Без парсинга запросов. Без оптимизатора.
curl --location-trusted -u root: \ -H "format:csv" \ -H "column_separator:," \ -T data.csv \ http://fe_host:8030/api/my_db/my_table/_stream_load
Один HTTP-запрос и данные в базе. Синхронно. С JSON-ответом о результате.
Три способа загрузки: кто есть кто
Прежде чем нырять в исходники, давайте разберёмся, какие вообще есть способы загрузки данных в Doris и чем они отличаются:
Stream Load | Broker Load | INSERT INTO | |
|---|---|---|---|
Протокол | HTTP PUT | Thrift RPC (через Broker) | MySQL Protocol |
Режим | Син��ронный | Асинхронный | Синхронный |
Источник | Локальные файлы, потоки | HDFS, S3, облачное хранилище | SQL-запрос, подзапрос |
Рекомендуемый объём | До 10 GB | До сотен GB | Мелкие батчи |
SQL parsing | Нет | Нет | Да, полный цикл |
Основной use case | Real-time загрузка, ETL | Массовый batch import | Интерактивные вставки |
Ключевая мысль: Stream Load убирает SQL parsing из критического пути. Никакого лексера, парсера, анализатора запросов, оптимизатора. Данные идут напрямую из HTTP в движок записи.
Это как разница между тем, чтобы отправить посылку через приёмное отделение почты (заполни бланк, встань в очередь, покажи паспорт) и просто закинуть её в грузовик, который уже стоит у двери.
Часть 2: 14 шагов HTTP-запроса
Что происходит, когда вы нажимаете Enter после curl
Вот полная схема того, что происходит от момента отправки HTTP-запроса до получения ответа:
Клиент ──HTTP PUT──> FE ──307 Redirect──> Coordinator BE │ Begin Transaction (Thrift RPC → FE) │ Get Import Plan (Thrift RPC → FE) │ StreamLoadPipe (буферизованное чтение) │ ┌────────┴────────┐ ▼ ▼ Executor BE #1 Executor BE #2 │ │ MemTable MemTable │ │ Segment Segment (на диск) (на диск) │ │ └────────┬─────────┘ │ Commit Transaction (→ FE) │ Publish Version (FE → Executor BE) │ Result JSON ──> Клиент
Давайте пройдёмся по шагам. Открываем be/src/http/action/stream_load.cpp:
Шаг 1-2: Клиент → FE.
Клиент отправляет HTTP PUT на Frontend. FE парсит заголовки (база, таблица, label, формат), проверяет аутентификацию. Если всё ок — делает HTTP 307 Redirect на один из Backend-узлов. Этот BE становится Coordinator данной загрузки.
Шаг 3-4: Coordinator BE парсит заголовки. В методе on_header создаётся StreamLoadContext — объект, который будет жить на протяжении всей загрузки:
// stream_load.cpp — on_header() std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); ctx->load_type = TLoadType::MANUL_LOAD; ctx->load_src_type = TLoadSourceType::RAW; url_decode(req->param(HTTP_DB_KEY), &ctx->db); url_decode(req->param(HTTP_TABLE_KEY), &ctx->table); ctx->label = req->header(HTTP_LABEL_KEY);
Обратите внимание: StreamLoadContext это не какой-то легковесный DTO. Это полноценный объект с таймингами каждого этапа: begin_txn_cost_nanos, stream_load_put_cost_nanos, write_data_cost_nanos, commit_and_publish_txn_cost_nanos.
Doris измеряет кажд��й шаг загрузки с точностью до наносекунды.
Шаг 5-7: Транзакция и план.
Coordinator BE отправляет два Thrift RPC запроса к FE:
Begin Transaction FE открывает транзакцию, присваивает Transaction ID
Get Import Plan FE генерирует план импорта и возвращает его
// stream_load.cpp — _process_put() RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, [&request, ctx](FrontendServiceConnection& client) { client->streamLoadPut(ctx->put_result, request); }));
Шаг 8: StreamLoadPipe.
Вот тут начинается самое интересное. Coordinator создаёт StreamLoadPipe — буферизированный канал для потоковой передачи данных. Подробный разбор в Часть 3: Streaming Pipeline — данные не ждут (ниже), а вот минимальный фрагмент из исходников:
// be/src/io/fs/stream_load_pipe.h static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // 4 MB class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, size_t min_chunk_size = 64 * 1024, int64_t total_length = -1, bool use_proto = false); };
Шаг 9-11: Распределение и запись.
Coordinator BE режет поток на батчи и отправляет их по BRPC (нутренний RPC-фреймворк) на Executor BE. Там батчи сначала пишутся в MemTable (RAM), а затем фоново сбрасываются на диск в Segment-файлы.
Шаг 12-14: Коммит.
После записи всех данных Coordinator отправляет Commit Transaction на FE. FE проверяет, что большинство реплик записано успешно, и отправляет Publish Version на все Executor BE. Данные становятся видимыми.
FE — это диспетчер аэропорта. Он решает, куда направить самолёт (данные), проверяет документы (аутентификация), выдаёт разрешение на посадку (транзакция).
А BE — это грузчики на лётном поле: они принимают груз и раскладывают его по складам (таблетам).
Три метрики, которые расскажут всё
В самом начале stream_load.cpp определяются три метрики:
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
Количество запросов, длительность, текущая параллельная нагрузка.
Если у вас в продакшене Doris следите за streaming_load_current_processing. Рекомендация из документации: не более 128 одновременных Stream Load на один BE (параметр webserver_num_workers).
Больше и BE может начать подвисать.
Часть 3: Streaming Pipeline — данные не ждут
Почему Stream Load не "disk-first"
Классический подход к загрузке данных «сначала сохрани на диск, потом обработай».
В batch-сценариях Broker Load обычно даёт более высокий latency до видимости данных и менее «поточную» модель, чем Stream Load (особенно на небольших частых батчах).
Stream Load работает иначе.
Данные обрабатываются по мере поступления.
Пока HTTP-клиент отправляет следующую порцию CSV, предыдущая уже распарсена и летит на Executor BE.
Сердце этого механизма StreamLoadPipe. Откроем io/fs/stream_load_pipe.h:
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; // 4 MB class StreamLoadPipe : public MessageBodySink, public FileReader { public: StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes, size_t min_chunk_size = 64 * 1024, // 64 KB int64_t total_length = -1, bool use_proto = false); // ... private: size_t _buffered_bytes; size_t _max_buffered_bytes; // 4 MB по умолчанию size_t _min_chunk_size; // 64 KB по умолчанию std::deque<ByteBufferPtr> _buf_queue; std::condition_variable _put_cond; std::condition_variable _get_cond; };
Что здесь происходит?
kMaxPipeBufferedBytes = 4MBмаксимальный размер буфера. Когда буфер заполнен, HTTP-поток блокируется (backpressure) через_put_cond.min_chunk_size = 64KBминимальный размер чанка. Данные не передаются дальше, пока не накопится хотя бы 64 КБ. Это снижает overhead от слишком частых переключений._buf_queueочередь изByteBuffer'ов. Классический producer-consumer паттерн с condition variables.
Поток данных внутри Coordinator BE
HTTP chunks ──→ StreamLoadPipe ──→ BrokerScanNode ──→ OlapTableSink ──→ BRPC ──→ Executor BE (4 MB буфер) (парсинг CSV/ (распределение (сетевая JSON → Block) по таблетам) передача)
BrokerScanNode читает данные из StreamLoadPipe батчами и парсит их в формат vectorized::Block — колоночное представление данных в Doris.
OlapTableSink определяет, в какой Partition и Tablet попадает каждая строка (по PartitionKey и DistributionKey), и отправляет данные на соответствующие Executor BE через BRPC.
Ключевой инсайт: пока Coordinator BE читает HTTP-поток и парсит CSV, Executor BE уже пишут предыдущие батчи в свои MemTable'ы.
Это конвейер, а не последовательная обработка.
Представьте завод по разливу воды: пока одна бутылка наполняется, предыдущая уже закупоривается, а та, что перед ней, уже едет по конвейеру к упаковке. Никто не ждёт, пока все бутылки наполнятся, чтобы начать закупоривать.
Часть 4: Иерархия записи — от LoadChannel до MemTable
Самое интересное: как данные попадают на диск
(Если вы дочитали до сюда — вы точно GPT или крутой человек с остствуем деффицита внимания. Наливайте чай, дальше будет ещё интереснее.)
Когда данные прилетают на Executor BE через BRPC, начинается многоуровневая запись.
Здесь архитектура Doris хорошо декомпозирована: каждый слой в цепочке записи отвечает за одну задачу, а вместе они дают потоковую запись с backpressure и асинхронным flush без блокировки ingest (практически это значит: при пике входящих данных система «притормаживает» приём, а не падает; при этом новые батчи продолжают приниматься, пока предыдущие фоново сбрасываются на диск).
Откроем исходники и посмотрим на иерархию:
LoadChannelMgr └── LoadChannel (1 на Stream Load задачу на данном BE) └── TabletsChannel (1 на Index — таблица + её Materialized View) └── DeltaWriter (1 на каждый Tablet) └── MemTable (in-memory буфер для записи) └── Segment (файл на диске, ≤256 MB)
Если сжать до одного абзаца:
LoadChannelдержит запись текущего load на конкретном BE;TabletsChannelраскладывает строки по нужным tablet/index;DeltaWriterпишет в конкретный tablet и управляет жизненным цикломMemTable.
Ключевой фрагмент из olap/delta_writer.h:
// Writer for a particular (load, index, tablet). class BaseDeltaWriter { public: virtual Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) = 0; virtual Status close() = 0; virtual Status build_rowset(); int64_t mem_consumption(MemType mem); Status wait_flush(); int64_t tablet_id() const { return _req.tablet_id; } int64_t txn_id() const { return _req.txn_id; } protected: std::unique_ptr<BaseRowsetBuilder> _rowset_builder; std::shared_ptr<MemTableWriter> _memtable_writer; };
Важно: метод write принимает vectorized::Block (батч колонок), а не по одной строке. Это один из источников высокой скорости записи.
MemTable: где живут данные до flush
MemTable — это in-memory буфер перед диском. Из olap/memtable.h:
class MemTable { public: // Вставка батча строк (векторизованная!) Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs); void shrink_memtable_by_agg(); // Агрегация прямо в памяти bool need_flush() const; Status to_block(std::unique_ptr<vectorized::Block>* res); size_t memory_usage() const { return _mem_tracker->consumption(); } private: vectorized::MutableBlock _input_mutable_block; // Входные данные vectorized::MutableBlock _output_mutable_block; // Отсортированный результат vectorized::Arena _arena; // Аллокатор для строковых данных size_t _sort(); // Сортировка по ключу template <bool is_final> void _aggregate(); // Агрегация для Aggregate/Unique моделей std::unique_ptr<DorisVector<std::shared_ptr<RowInBlock>>> _row_in_blocks; };
Три вещи, которые важны для понимания производительности:
Колоночный батч в памяти (
vectorized::MutableBlock) вместо row-by-row.Сортировка и агрегация в RAM (
_sort,_aggregate) до записи на диск.Специализированный аллокатор (
vectorized::Arena) + статистика (MemTableStat) для контроля памяти и таймингов.
Async Flush: конвейер не останавливается
Ключевая оптимизация: когда MemTable заполняется (в типичных конфигурациях около 200 MB), запись не останавливается.
Старая MemTable уходит в очередь на flush, новая сразу принимает данные, а MemtableFlushExecutor фоново пишет на диск через RowsetWriter -> SegmentWriter.
// Три состояния MemTable: enum MemType { ACTIVE = 0, // Принимает данные WRITE_FINISHED = 1, // Ждёт в очереди на flush FLUSH = 2 // Сбрасывается на диск };
Практический итог: Segment ограничен ~256 MB, а один Stream Load на tablet формирует Rowset из одного или нескольких Segment-файлов.
Это как конвейер на заводе: пока один ящик упаковывают (flush на диск), следующий уже наполняется (write в MemTable), а третий уже запечатан и едет на склад (завершённый Segment). Никто не стоит и не ждёт.
Практические выводы: как получить скорость Stream Load и не устроить себе пожар
Мы разобрали 4 уровня архитектуры от HTTP-запроса до Segment-файла на диске.
Теперь давайте переведём это в конкретные рекомендации.
1. Убедитесь, что вы реально в streaming path
Stream Load может быть потоковым, но если формат или режим не поддерживает стриминг, BE сначала сохранит тело запроса во временный файл и только потом начнёт выполнение это уже disk-first и совсем другая латентность.
В коде это буквально одна строка:
// stream_load.cpp — _process_put() ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);
Если скорость «не похожа на Doris», проверьте формат и заголовки.
Каждый Stream Load возвращает JSON с разбивкой по этапам — это не логи и не C-код, а обычный HTTP-ответ:
{ "Status": "Success", "NumberTotalRows": 1000000, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106 }
Посмотрите на ReadDataTimeMs, WriteDataTimeMs, CommitAndPublishTimeMs они покажут, где именно время уходит.
2. Для JSON не верьте в магию — выбирайте режим парсинга осознанно
NDJSON (по одному объекту на строку): используйте
read_json_by_line— это даёт самую предсказуемую потоковую обработку.JSON-массив: нужен
strip_outer_array, иначе можно получить не тот режим парсинга.Chunked transfer + JSON: внутри
StreamLoadPipeесть сценарии, где JSON нужно прочитать целиком до парсинга. Это явно отмечено в коде:
// stream_load_pipe.h // When importing JSON data and using chunked transfer encoding, // the data needs to be completely read before it can be parsed. bool _is_chunked_transfer = false;
3. Конкурентность: держите под контролем
В best practices прямо рекомендуют держать Stream Load concurrency на один BE < 128 (это упирается в webserver_num_workers).
Высокая конкурентность «съедает» webserver-потоки и роняет производительность; на совсем больших значениях возможны подвисания процесса.
Стартуйте с 16–64 параллельных загрузок (или меньше на ноутбуке) и увеличивайте, пока не упрётесь в CPU/IO.
4. Размер батча важнее красивых слов про HTTP
Большие батчи уменьшают транзакционный и метаданный overhead и дают более крупные сегменты. Stream Load официально позиционируется как метод для файлов до 10 GB (больше — дробить).
Ориентир «несколько MB и выше» почти всегда лучше, чем «много запросов по 50 KB».
5. Настройка MemTable = баланс между мелкими файлами и таймаутами
Порог flush управляется write_buffer_size (дефолт в Doris 3.x — 100 MB).
В документации прямо говорится: маленький порог — много мелких файлов; слишком большой — риск RPC timeout.
Как это увидеть на практике (без исходников, обычным SQL и метриками):
-- Compaction Score по tablet'ам конкретной таблицы SHOW TABLET FROM your_table; -- Смотреть колонки: VersionCount, RowCount, DataSize -- Если VersionCount растёт и не снижается — compaction не справляется -- Максимальный compaction score по кластеру SHOW PROC '/statistic'; -- Столбец MaxCompactionScore — если > 100, уже стоит разбираться
Плюс метрики для Prometheus/Grafana:
doris_fe_max_tablet_compaction_score— максимальный compaction score по всем BE (P0-метрика)doris_be_tablet_base_max_compaction_score— то же самое, но с каждого BEdoris_be_compaction_bytes_total— объём данных, прошедших через compaction
Если видите тонны мелких сегментов и давление compaction, то увеличивайте write_buffer_size аккуратно (NetEase для лог-сценариев ставят до 1 GB) и следите за таймаутами и IO.
6. Что мерить, чтобы не гадать
Stream Load на BE измеряет этапы и возвращает их в JSON-ответе, плюс есть метрики текущей нагрузки (streaming_load_current_processing).
Практика: если «медленно», первым делом разделите проблему на три зоны:
Сеть / приём данных —
ReceiveDataCostMs,ReadDataCostMsЗапись —
WriteDataCostMs(flush в MemTable и на диск)Commit / Publish —
CommitAndPublishCostMs(метаданные, реплики)
Это разделение сразу покажет, виноват ли сетевой слой, движок записи или координация транзакций.
Что дальше
В этой статье мы разобрали путь данных от curl до Segment-файла: HTTP-пайплайн, StreamLoadPipe с backpressure, иерархию записи LoadChannel -> DeltaWriter -> MemTable и async flush.
Но есть ещё три механизма, которые радикально влияют на скорость в конкретных сценариях:
Group Commit — как Doris объединяет тысячи мелких загрузок в одну транзакцию (и почему без этого на high-frequency ingest вы получите ошибку -235 вместо результата).
Pipeline Execution Engine — как push-модель и векторизация ускоряют путь записи.
Memtable Forwarding — оптимизация, которая убирает лишние encode/decode и даёт ускорение до 2.8x.
Разберём их во второй части, если меня выпустят из Песочницы.
Короче, если тема зашла дайте знать в комментариях, какой из трёх механизмов интересен больше всего.
