Установка и работа с Kafka на Linux

Используемые термины: Kafka, Zookeeper, Linux.
В нашей инструкции мы рассмотрим примеры:
- Установки и настройке Kafka.
- Конфигурирование, а также запуск сервисов zookeeper и kafka.
- Выполнение тестовых запросов к брокеру сообщений.
Так как установка Kafka выполняется путем распаковки бинарника, инструкция написана для использования на различных дистрибутивах Linux, например, Ubuntu, Rocky, Astra, РЕД ОС.
Наше ознакомление с программным продуктом разобьем на разделы:
Предварительная настройка системы
Установка и настройка Kafka
Инсталляция OpenJava
Загрузка и распаковка Kafka
Настройка systemd для автозапуска сервиса
Отправка и чтение сообщений брокеру для теста системы
Подготовка системы
Перед тем, как приступить к установке Kafka, выполним предварительную настройку системы. Рассмотрим варианты для дистрибутивов на основе deb и rpm.
Установка служебных пакетов
Для работы нам понадобятся некоторые утилиты:
- curl — отправка http-запросов, в том числе, для загрузки файлов.
- tar — распаковка и создание архивов.
- wget — загрузка файлов по сети.
Установим их заранее.
а) Для систем на основе DEB:
apt update
apt install curl tar wget
б) Для систем на основе RPM:
yum install curl tar wget
Настройка брандмауэра
Для работы с кафкой по сети необходимо открыть порт 9092. В зависимости от утилиты управления брандмауэром наши команды будут отличаться.
а) Для Iptables (как правило, для deb):
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
Для сохранения правил используем утилиту iptables-persistent:
apt install iptables-persistent
netfilter-persistent save
б) Для Firewalld (как правило, для rpm):
firewall-cmd --permanent --add-port=9092/tcp
firewall-cmd --permanent --reload
Установка Kafka
Как упоминалось выше, установка программного продукта выполняется путем распаковки бинарника, а также настройки и установки необходимых компонентов. Итого, мы должны:
- Установить JDK.
- Распаковать бинарник Kafka.
- Внести правки в конфигурационный файл.
- Создать юнит-файлы для запуска сервисов zookeeper и kafka.
Приступим.
Установка OpenJDK
В зависимости от типа Linux наши команды будут немного отличаться.
а) Для систем на основе DEB:
apt install default-jdk
б) Для систем на основе RPM:
yum install java-11-openjdk-devel
OpenJDK установлен.
Смотрим версию java:
java -version
Мы должны увидеть что-то на подобие:
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment ...
Установка и настройка Kafka
Переходим на страницу загрузки Kafka и копируем ссылку на скачиваем приложения:
Используя скопированную ссылку, скачиваем бинарник:
wget https://downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
Создадим каталог, куда установим кафку:
mkdir /opt/kafka
Распакуем скачанный архив в созданный каталог:
tar zxf kafka_*.tgz -C /opt/kafka --strip 1
Интернет сообщество сразу рекомендует внести небольшую правку в конфигурационный файл. Откроем его:
vi /opt/kafka/config/server.properties
Добавим строку:
delete.topic.enable = true
* данная директива разрешает ручное удаление темы из кафки.
Настройка запуска kafka
Осталось настроить запуск кафки в качестве сервиса. Мы создадим отдельного пользователя и создадим два юнита systemd — один для zookeeper, второй для kafka.
Для создания пользователя вводим:
useradd -r -c 'Kafka broker user service' kafka
Назначим владельцем созданного пользователя для каталога кафки:
chown -R kafka:kafka /opt/kafka
Создаем первый юнит-файл:
vi /etc/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper Service
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
Создаем второй файл для кафки:
vi /etc/systemd/system/kafka.service
[Unit]
Description=Kafka Service
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
Перечитываем конфигурацию systemd, чтобы подхватить изменения:
systemctl daemon-reload
Разрешаем автозапуск сервисов zookeeper и kafka:
systemctl enable zookeeper kafka
Стартуем кафку:
systemctl start kafka
Обратите внимание, нам достаточно запустить службу kafka — благодаря настроенной зависимости, zookeeper стартует автоматически.
Проверить, что нужный нам сервис запустился и работаешь на порту 9092:
ss -tunlp | grep :9092
Тестовый обмен сообщениями
Попробуем немного научиться работать с кафкой и проверить, что сервис работает. Мы создадим тему для сообщений и отправим текст Hello, World from Kafka.
Нам понадобиться три скрипта, которые идут в комплекте с кафкой:
- kafka-topics.sh — создает тему, куда будем отправлять сообщение.
- kafka-console-producer.sh — создает обращение издателя, который отправляет сообщение.
- kafka-console-consumer.sh — формирует запрос к брокеру и получает сообщение.
И так, первой командой мы создаем тему:
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Test
* где:
- /opt/kafka — путь, куда была установлена нами кафка.
- bootstrap-server localhost:9092 — адрес хоста kafka. Предполагается, что мы запускаем нашу команду на том же сервере, где ее и развернули.
- replication-factor — количество реплик журнала сообщений.
- partitions — количество разделов в теме.
- topic Test — в нашем примере мы создадим тему с названием Test.
Теперь отправляем сообщение брокеру:
echo "Hello, World from Kafka" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test
* в данном примере мы отправляем в наш сервер сообщение Hello, World from Kafka.
Попробуем достать сообщение. Выполняем команду:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --from-beginning
* опция from-beginning позволяет увидеть все сообщения, которые были отправлены в брокер до создания подписчика (отправки запроса на чтения данных из кафки).
Мы должны увидеть:
Hello, World from Kafka
При этом мы подключимся к серверу в интерактивном режиме. Не спешим выходить. Откроем вторую сессию SSH и еще раз введем:
echo "Hello, World from Kafka again" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test
Вернемся к предыдущей сессии SSH и мы должны увидеть:
Hello, World from Kafka
Hello, World from Kafka again
Можно считать, что программа минимум выполнена — Kafka установлена и работает.