что такое kafka и как она работает

Apache Kafka: что это и как работает

Про Apache Kafka хоть раз слышали почти все опытные разработчики серверных приложений. Им пользуются компании, которые гоняют в своих системах очень большие объемы данных, считается, что простым разработчикам он не нужен.

Давайте посмотрим, что такое Apache Kafka, как он работает и кому пригодится.

Нервная система бэкенда: зачем нужен Apache Kafka

Современные серверные приложения сложны, многоярусны и включают множество компонентов и сервисов. Архитекторы программного обеспечения выделяют в отдельные модули все, что можно: рассылку SMS, системы сбора статистики, подсистемы авторизации.

Зачем? Во-первых, можно разбить огромные тяжелые задачи на маленькие кусочки и решать частями. Во-вторых, это позволяет распределить нагрузку и добавить отказоустойчивости.

Но таким распределенным системам нужно как-то передавать данные между собой. В этом месте на сцене появляются системы обмена сообщениями (брокеры сообщений, диспетчеры сообщений). Это некая разветвленная система труб, в которую с одного конца можно бросить, например, контейнер с сообщениями, а с другого конца его кто-то получит и прочитает. К таким системам относят и Apache Kafka.

Система коммуникаций между сервисами, если она грамотно выстроена, позволяет компонентам ставить друг другу задачи, сообщать об изменениях в системе и уведомлять заинтересованные части логики приложения о своих состояниях.

Почему установка Apache Kafka — лучший выбор

Как и любая дополнительная система, механизм обмена сообщениями добавляет сложности и создает ряд дополнительных проблем в обслуживании вашего приложения:

Для устранения этих проблем и был создан Apache Kafka — сверхнадежная сверхмасштабируемая сверхгибкая система обмена сообщениями внутри бэкенд-приложений.

Описание Apache Kafka

Брокер сообщений Kafka — распределенная система. Его серверы объединяются в кластеры. Хранение и пересылка сообщений идет параллельно на разных серверах, а это дает большую надежность и отказоустойчивость. Даже при выходе из строя нескольких машин, сообщения все еще будут пересылаться и обрабатываться.

Также сервис легко масштабируется горизонтально. То есть, для наращивания мощности Apache Kafka достаточно вводить в строй дополнительные серверы. Это самый простой в реализации способ масштабирования систем, но подходит он не для всех. Например, с базами данных такой подход не работает — непонятно, что делать с записями в таблицах на новых серверах. Kafka же изначально заточен под взрывной рост производительности.

Еще один плюс — консистентность данных. Записи в Apache Kafka хранятся в виде журнала коммитов. Это выглядит как очередь сообщений, в которую можно добавлять записи, а вот удалять или модицифировать — нет. Такой подход дает огромную надежность и простоту изменения любых состояний — всегда понятно, что, как и в какой последовательности менялось.

Apache Kafka: обзор возможностей

Итак, у нас есть крутая, масштабируемая, производительная и гибкая система передачи сообщений. И вот как мы можем ее использовать:

Источник

Kafka и микросервисы: обзор

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Всем привет. В этой статье я расскажу, почему мы в Авито девять месяцев назад выбрали Kafka, и что она из себя представляет. Поделюсь одним из кейсов использования — брокер сообщений. И напоследок поговорим о том, какие плюсы мы получили от применения подхода Kafka as a Service.

Проблема

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Для начала немного контекста. Некоторое время назад мы начали уходить от монолитной архитектуры, и сейчас в Авито уже несколько сотен различных сервисов. Они имеют свои хранилища, свой стек технологий и отвечают за свою часть бизнес-логики.

Одна из проблем с большим числом сервисов — коммуникации. Сервис А часто хочет узнать информацию, которой располагает сервис Б. В этом случае сервис А обращается к сервису Б через синхронный API. Сервис В хочет знать, что происходит у сервисов Г и Д, а те, в свою очередь, интересуются сервисами А и Б. Когда таких «любопытных» сервисов становится много, связи между ними превращаются в запутанный клубок.

При этом в любой момент сервис А может стать недоступен. И что делать в этом случае сервису Б и всем остальным завязанным на него сервисам? А если для выполнения бизнес-операции необходимо совершить цепочку последовательных синхронных вызовов, вероятность отказа всей операции становится еще выше (и она тем выше, чем длиннее эта цепочка).

Выбор технологии

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Окей, проблемы понятны. Устранить их можно, сделав централизованную систему обмена сообщениями между сервисами. Теперь каждому из сервисов достаточно знать только про эту систему обмена сообщениями. В дополнение сама система должна быть отказоустойчивой и горизонтально масштабируемой, а также в случае аварий копить в себе буфер обращения для последующей их обработки.

Давайте теперь выберем технологию, на которой будет реализована доставка сообщений. Для этого сперва поймем, чего мы от нее ожидаем:

Также нам критически важно было выбрать максимально масштабируемую и надежную систему с высокой пропускной способностью (не менее 100k сообщений по несколько килобайт в секунду).

На этом этапе мы распрощались с RabbitMQ (сложно сохранять стабильным на высоких rps), PGQ от SkyTools (недостаточно быстрый и плохо масштабируемый) и NSQ (не персистентный). Все эти технологии у нас в компании используются, но под решаемую задачу они не подошли.

Далее мы начали смотреть на новые для нас технологии — Apache Kafka, Apache Pulsar и NATS Streaming.

Первым отбросили Pulsar. Мы решили, что Kafka и Pulsar — довольно похожие между собой решения. И несмотря на то, что Pulsar проверен крупными компаниями, новее и предлагает более низкую latency (в теории), мы решили из этих двух оставить Kafka, как de facto стандарт для таких задач. Вероятно, мы вернемся к Apache Pulsar в будущем.

И вот остались два кандидата: NATS Streaming и Apache Kafka. Мы довольно подробно изучили оба решения, и оба они подошли под задачу. Но в итоге мы побоялись относительной молодости NATS Streaming (и того, что один из основных разработчиков, Tyler Treat, решил уйти из проекта и начать свой собственный — Liftbridge). При этом Clustering режим NATS Streaming не давал возможности сильного горизонтального масштабирования (вероятно, это уже не проблема после добавления partitioning режима в 2017 году).

Тем не менее, NATS Streaming – крутая технология, написанная на Go и имеющая поддержку Cloud Native Computing Foundation. В отличие от Apache Kafka, ей не нужен Zookeeper для работы (возможно, скоро можно будет сказать то же самое и о Kafka), так как внутри она реализует RAFT. При этом NATS Streaming проще в администрировании. Мы не исключаем, что в дальнейшем ещё вернемся к этой технологии.

И всё-таки на сегодняшний день нашим победителем стала Apache Kafka. На наших тестах она показала себя достаточно быстрой (более миллиона сообщений в секунду на чтение и на запись при объеме сообщений 1 килобайт), достаточно надежной, хорошо масштабируемой и проверенной опытом в проде крупными компаниями. Кроме этого, Kafka поддерживает как минимум несколько крупных коммерческих компаний (мы, например, пользуемся Confluent версией), а также Kafka имеет развитую экосистему.

Обзор Kafka

Перед тем как начать, сразу порекомендую отличную книгу — «Kafka: The Definitive Guide» (есть и в русском переводе, но термины немного ломают мозг). В ней можно найти информацию, необходимую для базового понимания Kafka и даже немного больше. Сама документация от Apache и блог от Confluent также отлично написаны и легко читаются.

Итак, давайте посмотрим на то, как устроена Kafka с высоты птичьего полета. Базовая топология Kafka состоит из producer, consumer, broker и zookeeper.

Broker

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

За хранение ваших данных отвечает брокер (broker). Все данные хранятся в бинарном виде, и брокер мало знает про то, что они из себя представляют, и какова их структура.

Каждый логический тип событий обычно находится в своем отдельном топике (topic). Например, событие создания объявления может попадать в топик item.created, а событие его изменения — в item.changed. Топики можно рассматривать как классификаторы событий. На уровне топика можно задать такие конфигурационные параметры, как:

В свою очередь, каждый топик разбивается на одну и более партицию (partition). Именно в партиции в итоге попадают события. Если в кластере более одного брокера, то партиции будут распределены по всем брокерам равномерно (насколько это возможно), что позволит масштабировать нагрузку на запись и чтение в один топик сразу на несколько брокеров.

На диске данные для каждой партиции хранятся в виде файлов сегментов, по умолчанию равных одному гигабайту (контролируется через log.segment.bytes). Важная особенность — удаление данных из партиций (при срабатывании retention) происходит как раз сегментами (нельзя удалить одно событие из партиции, можно удалить только целый сегмент, причем только неактивный).

Zookeeper

Zookeeper выполняет роль хранилища метаданных и координатора. Именно он способен сказать, живы ли брокеры (посмотреть на это глазами zookeeper можно через zookeeper-shell командой ls /brokers/ids ), какой из брокеров является контроллером ( get /controller ), находятся ли партиции в синхронном состоянии со своими репликами ( get /brokers/topics/topic_name/partitions/partition_number/state ). Также именно к zookeeper сперва пойдут producer и consumer, чтобы узнать, на каком брокере какие топики и партиции хранятся. В случаях, когда для топика задан replication factor больше 1, zookeeper укажет, какие партиции являются лидерами (в них будет производиться запись и из них же будет идти чтение). В случае падения брокера именно в zookeeper будет записана информация о новых лидер-партициях (с версии 1.1.0 асинхронно, и это важно).

В более старых версиях Kafka zookeeper отвечал и за хранение оффсетов, но сейчас они хранятся в специальном топике __consumer_offsets на брокере (хотя вы можете по-прежнему использовать zookeeper для этих целей).

Самым простым способом превратить ваши данные в тыкву является как раз потеря информации с zookeeper. В таком сценарии понять, что и откуда нужно читать, будет очень сложно.

Producer

Producer — это чаще всего сервис, осуществляющий непосредственную запись данных в Apache Kafka. Producer выбирает topic, в котором будут храниться его тематические сообщения, и начинает записывать в него информацию. Например, producer’ом может быть сервис объявлений. В таком случае он будет отправлять в тематические топики такие события, как «объявление создано», «объявление обновлено», «объявление удалено» и т.д. Каждое событие при этом представляет собой пару ключ-значение.

По умолчанию все события распределяются по партициям топика round-robin`ом, если ключ не задан (теряя упорядоченность), и через MurmurHash (ключ), если ключ присутствует (упорядоченность в рамках одной партиции).

Здесь сразу стоит отметить, что Kafka гарантирует порядок событий только в рамках одной партиции. Но на самом деле часто это не является проблемой. Например, можно гарантированно добавлять все изменения одного и того же объявления в одну партицию (тем самым сохраняя порядок этих изменений в рамках объявления). Также можно передавать порядковый номер в одном из полей события.

Consumer

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Consumer отвечает за получение данных из Apache Kafka. Если вернуться к примеру выше, consumer’ом может быть сервис модерации. Этот сервис будет подписан на топик сервиса объявлений, и при появлении нового объявления будет получать его и анализировать на соответствие некоторым заданным политикам.

Apache Kafka запоминает, какие последние события получил consumer (для этого используется служебный топик __consumer__offsets ), тем самым гарантируя, что при успешном чтении consumer не получит одно и то же сообщение дважды. Тем не менее, если использовать опцию enable.auto.commit = true и полностью отдать работу по отслеживанию положения consumer’а в топике на откуп Кафке, можно потерять данные. В продакшен коде чаще всего положение консьюмера контролируется вручную (разработчик управляет моментом, когда обязательно должен произойти commit прочитанного события).

В тех случаях, когда одного consumer недостаточно (например, поток новых событий очень большой), можно добавить еще несколько consumer, связав их вместе в consumer group. Consumer group логически представляет из себя точно такой же consumer, но с распределением данных между участниками группы. Это позволяет каждому из участников взять свою долю сообщений, тем самым масштабируя скорость чтения.

Результаты тестирования

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Здесь не буду писать много пояснительного текста, просто поделюсь полученными результатами. Тестирование проводилось на 3 физических машинах (12 CPU, 384GB RAM, 15k SAS DISK, 10GBit/s Net), брокеры и zookeeper были развернуты в lxc.

Тестирование производительности

В ходе тестирования были получены следующие результаты.

Тестирование отказоустойчивости

В ходе тестирования были получены следующие результаты (3 брокера, 3 zookeeper).

Kafka as a service

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Мы убедились, что Kafka — отличная технология, которая позволяет решить поставленную перед нами задачу (реализацию брокера сообщений). Тем не менее, мы решили запретить сервисам напрямую обращаться к Kafka и закрыли ее сверху сервисом data-bus. Зачем мы это сделали? На самом деле есть целых несколько причин.

Data-bus забрал на себя все задачи, связанные с интеграцией с Kafka (реализация и настройка consumer’ов и producer’ов, мониторинг, алертинг, логирование, масштабирование и т.д.). Таким образом, интеграция с брокером сообщений происходит максимально просто.

Data-bus позволил абстрагироваться от конкретного языка или библиотеки для работы с Kafka.

Data-bus позволил другим сервисам абстрагироваться от слоя хранения. Может быть, в какой-то момент мы поменяем Kafka на Pulsar, и при этом никто ничего не заметит (все сервисы знают только про API data-bus).

Data-bus взял на себя валидацию схем событий.

С помощью data-bus реализована аутентификация.

Под прикрытием data-bus мы можем без даунтайма, незаметно обновлять версии Kafka, централизованно вести конфигурации producer’ов, consumer’ов, брокеров и т.д.

Data-bus позволил добавлять необходимые нам фичи, которых нет в Kafka (такие как аудит топиков, контроль за аномалиями в кластере, создание DLQ и т.д.).

Data-bus позволяет реализовать failover централизованно для всех сервисов.

На данный момент для начала отправки событий в брокер сообщений достаточно подключить небольшую библиотеку в код своего сервиса. Это всё. У вас появляется возможность писать, читать и масштабироваться одной строчкой кода. Вся реализация скрыта от вас, наружу торчит только несколько ручек типа размера батча. Под капотом сервис data-bus поднимает в Kubernetes нужное количество инстансов producer’ов и consumer’ов и подкладывает им нужную конфигурацию, но все это для вашего сервиса прозрачно.

Конечно, серебряной пули не бывает, и у такого подхода есть свои ограничения.

В нашем случае плюсы перевесили минусы, и решение прикрыть брокер сообщений отдельным сервисом оправдалось. За год эксплуатации у нас не было никаких серьезных аварий и проблем.

Источник

Apache Kafka для чайников

Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.

Зачем?

Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.

Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.

Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.

Установка и настройка ZooKeeper и Apache Kafka на Windows 10

Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.

Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0
что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает
Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;

Запускаем командную строку и пишем команду:

Если всё сделано правильно, вы увидите примерно следующее.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads

В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.

Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:

Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:

Теперь всё должно стартануть нормально.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:

Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:

Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.

Работа с kafka из IDEA

Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

В итоге файл pom.xml должен выглядеть так:

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.

Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.

Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.

У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.

Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.

Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.

Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.
что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает
Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.
что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Усложняем проект

Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.

Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture >. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.

Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:

Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.

Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.

Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.

Отправим сообщение. В консоль будет выведена следующая строка:

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.

Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.

что такое kafka и как она работает. Смотреть фото что такое kafka и как она работает. Смотреть картинку что такое kafka и как она работает. Картинка про что такое kafka и как она работает. Фото что такое kafka и как она работает

Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.

Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *