Pull to refresh
71.32
Слёрм
Учебный центр для тех, кто работает в IT

Действительно ли Apache Kafka сохраняет правильный порядок сообщений?

Reading time6 min
Views9.4K
Original author: Francesco Tisiot

Нам говорят, что Apache Kafka сохраняет порядок сообщений для каждой темы/раздела, но насколько это правда? В этой статье мы проанализируем несколько реальных сценариев, в которых слепое принятие этой догмы, может привести к неожиданным и ошибочным последовательностям сообщений.

Базовый сценарий: один производитель

Начнём с базового сценария: один производитель отправляет сообщения в топик Apache Kafka с одним разделом последовательно, одно за другим.

В этой базовой ситуации, согласно известной мантре, мы должны ожидать всегда правильного порядка. Но так ли это? Смотря как!

Нестабильная сеть

В идеальном мире сценарий с одним производителем всегда должен приводить к правильному порядку. Но наш мир не идеален! Различные сетевые пути, ошибки и задержки могут привести к задержке или потере сообщения.

Давайте представим ситуацию ниже: один продюсер отправляет в топик три сообщения:

  • Сообщение 1 по какой-то причине находит длинный сетевой маршрут к Apache Kafka

  • Сообщение 2 находит самый быстрый сетевой маршрут к Apache Kafka

  • Сообщение 3 теряется в сети

Даже в этом базовом сценарии только с одним производителем мы могли бы получить неожиданную серию сообщений в теме. Конечный результат в теме Kafka покажет только два сохраняемых события с неожиданным порядком 2, 1.

С точки зрения Apache Kafka это может быть правильный порядок. Тема — это всего лишь журнал информации, и Apache Kafka будет записывать сообщения в журнал в зависимости от того, когда он «чувствует» приход нового события. Он основан на времени приема Kafka, а не на времени создания сообщения (время события).

Подтверждения и повторы

Но не все потеряно! Если мы посмотрим в библиотеки (например, aiokafka), то заметим, что у нас есть способы обеспечить правильную доставку сообщений.

Прежде всего, чтобы избежать проблемы с сообщением 3 в приведенном выше сценарии, мы могли бы определить правильный механизм подтверждения. Параметр acks у производителя позволяет нам определить, какое подтверждение получения сообщения мы хотим получить от Apache Kafka.

Установка этого параметра в 1 гарантирует, что мы получим подтверждение от основного брокера, ответственного за тему (и раздел). Установка этого параметра all гарантирует, что мы получим подтверждение только в том случае, если и первичный, и реплики правильно сохранят сообщение. Это избавит нас от проблем, когда только первичный получает сообщение и падает, прежде чем распространить его на реплики.

После того как мы установили ack, мы должны установить возможность повторной отправки сообщения, если мы не получим надлежащее подтверждение. В отличие от других библиотек (одной из них является kafka-python), aiokafka автоматически повторяет попытку отправки сообщения до тех пор, пока не будет превышено время ожидания (установленное параметром request_timeout_ms).

С подтверждением и автоматическими повторными попытками мы должны решить проблему с сообщением 3. При первой отправке производитель не получит ack, поэтому по истечении интервала retry_backoff_ms он отправит сообщение 3 снова.

Максимальное количество flight_request

Однако, если вы внимательно посмотрите на конечный результат в теме Apache Kafka, полученный порядок неверен: мы отправили 1,2,3 и получили 2,1,3в теме... как это исправить?

Старый метод (доступный в kafka-python) заключался в том, чтобы установить максимальное количество flight_request на одно соединение: это то количество сообщений, которому мы позволяем висеть «в воздухе» одновременно без подтверждения. Чем больше сообщений мы запускаем одновременно, тем больше риск получить сообщения не по порядку.

При использовании kafka-python, если нам абсолютно необходимо было иметь определенный порядок в теме, мы были вынуждены ограничить max_in_flight_requests_per_connection до 1. Предположим, что мы установили минимальный ack параметр как 1 и ожидали подтверждения каждого отдельного сообщения (или пакета сообщений, если размер сообщения меньше размера пакета) перед отправкой следующего.

Абсолютная правильность запроса, подтверждения и повторных попыток достигается ценой пропускной способности. Чем меньшему количеству сообщений мы позволяем находиться «в воздухе» одновременно, тем больше подтверждений нам нужно получить, тем меньше общих сообщений мы можем доставить в Kafka за определенный период времени.

Идемпотентные производители

Чтобы преодолеть строгую сериализацию отправки одного сообщения за раз и ожидания подтверждения, мы можем определить идемпотентных производителей. При использовании идемпотентного производителя каждое сообщение маркируется идентификатором производителя и серийным номером (последовательность, сохраняемая для каждого раздела). Этот составленный ID затем отправляется брокеру вместе с сообщением.

Брокер отслеживает серийный номер для каждого производителя и темы/раздела. Когда приходит новое сообщение, брокер проверяет составленный ID, и если в рамках одного производителя значение равно предыдущему номеру + 1, то новое сообщение подтверждается, в противном случае оно отклоняется. Это обеспечивает гарантию глобального упорядочивания сообщений, позволяя увеличить количество запросов в полете на одно соединение (максимум 5 для Java клиента).

Усложняем: с несколькими производителями

До сих пор мы представляли базовый сценарий только с одним производителем, но реальность Apache Kafka такова, что часто производителей несколько. О каких мелочах нужно знать, если мы хотим быть уверены в конечном результате?

Разные локации, разная задержка

Опять же, сеть неодинакова, и у нескольких производителей, расположенных в очень удаленных местах, может быть разная задержка. Это означает, что порядок может отличаться от порядка, основанного на времени события.

К сожалению, задержки между разными точками на Земле исправить не можем, поэтому нам придется принять этот сценарий.

Пакетирование, дополнительная переменная

Чтобы добиться более высокой пропускной способности, мы можем группировать сообщения. При пакетной обработке мы отправляем сообщения «группами», сводя к минимуму общее количество вызовов и увеличивая соотношение полезной нагрузки к общему размеру сообщения. Но при этом мы снова можем изменить порядок событий. Сообщения в Apache Kafka будут храниться для каждого пакета в зависимости от времени приема пакета. Таким образом, порядок сообщений будет правильным для каждого пакета, но в разных пакетах могут быть разные упорядоченные сообщения.

Теперь, когда есть разные задержки и пакетная обработка, кажется, что наше глобальное предположение об упорядочевании провалится... Итак, почему мы утверждаем, что можем управлять событиями по порядку?

Спаситель: время события

Мы убедились, что первоначальное предположение о том, что Kafka сохраняет порядок сообщений, не на 100% верно. Порядок сообщений зависит от времени приема Kafka, а не от времени генерации события. Но что, если нам очень важен порядок, основанный на времени события?

Ну, мы не можем решить проблему на стороне производства, но мы можем сделать это на стороне потребителя. Все наиболее распространенные инструменты, работающие с Apache Kafka, имеют возможность определять, какое поле использовать в качестве времени события, включая Kafka Streams, Kafka Connect с выделенным преобразованием одиночных сообщений (SMT) для извлечения меток времени и Apache Flink.

Потребители, если они правильно определены, смогут перетасовать порядок сообщений, поступающих из определенной темы Apache Kafka. Давайте проанализируем пример Apache Flink ниже:

CREATE TABLE CPU_IN (
    hostname STRING,
    cpu STRING,
    usage DOUBLE,
    occurred_at BIGINT,
    time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
    )
WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = '',
   'topic' = 'cpu_load_stats_real',
   'value.format' = 'json',
   'scan.startup.mode' = 'earliest-offset'
)

В приведенном выше определении таблицы Apache Flink мы можем заметить:

  • occurred_at: поле определено в исходной теме Apache Kafka во времени unix (тип данных — BIGINT).

  • time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3): преобразует время unix в временную метку Flink.

  • WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND определяет новое time_ltz поле (вычисленное из occurred_at) как время события и определяет порог позднего прибытия событий с максимальной задержкой 10 секунд.

После того как указанная выше таблица определена, time_ltz поле можно использовать для правильного упорядочивания событий и определения окон агрегации, гарантируя, что все события в пределах допустимой задержки включены в расчеты.

INTERVAL '10' SECOND определяет задержку конвейера данных и является штрафом, который нам необходимо включить, чтобы обеспечить правильный прием поздно прибывающих событий. Обратите внимание, однако, что пропускная способность не влияет. У нас может быть столько сообщений в нашем конвейере, сколько мы хотим, но мы «ждем 10 секунд» перед вычислением любого окончательного KPI, чтобы убедиться, что мы включили в картину все события в определенный период времени.

Альтернативный подход, который работает только в том случае, если события содержат полное состояние, состоит в том, чтобы сохранить для определенного ключа (hostname и cpu в приведенном выше примере) максимальное время события, достигнутое на данный момент, и принимать изменения только в том случае, если новое время события больше, чем установленный максимум.

Подведем итоги

Концепция упорядочивания в Kafka может быть сложной, даже если мы включаем только одну тему с одним разделом. В этом посте рассказывается о нескольких распространенных ситуациях, которые могут привести к неожиданному порядку событий. К счастью, такие опции, как ограничение количества передаваемых сообщений или использование идемпотентных производителей, могут помочь добиться упорядочения, соответствующего ожиданиям. В случае нескольких производителей и непредсказуемости сетевой задержки нам остается только возможность исправить общий порядок на стороне потребителя путем правильной обработки времени события, которое необходимо указать в полезной нагрузке.


Если вы хотите глубже разобраться в Apache Kafka, приходите на курс «Apache Kafka для разработчиков».

Вы узнаете типовые шаблоны проектирования, сделаете свое приложение надежнее, получите опыт разработки нескольких приложений, использующих Kafka.

Старт потока — 14 июля.

Tags:
Hubs:
Total votes 17: ↑10 and ↓7+3
Comments24

Articles

Information

Website
slurm.io
Registered
Founded
Employees
51–100 employees
Location
Россия
Representative
Антон Скобин