Установка и работа с Kafka на Linux

Обновлено и опубликовано Опубликовано:

Используемые термины: KafkaZookeeper, Linux.

В нашей инструкции мы рассмотрим примеры:

  • Установки и настройке Kafka.
  • Конфигурирование, а также запуск сервисов zookeeper и kafka.
  • Выполнение тестовых запросов к брокеру сообщений.

Так как установка Kafka выполняется путем распаковки бинарника, инструкция написана для использования на различных дистрибутивах Linux, например, Ubuntu, Rocky, Astra, РЕД ОС.

Наше ознакомление с программным продуктом разобьем на разделы:

Подготовка системы

Перед тем, как приступить к установке Kafka, выполним предварительную настройку системы. Рассмотрим варианты для дистрибутивов на основе deb и rpm.

Установка служебных пакетов

Для работы нам понадобятся некоторые утилиты:

  • curl — отправка http-запросов, в том числе, для загрузки файлов.
  • tar — распаковка и создание архивов.
  • wget — загрузка файлов по сети.

Установим их заранее.

а) Для систем на основе DEB:

apt update

apt install curl tar wget

б) Для систем на основе RPM:

yum install curl tar wget

Настройка брандмауэра

Для работы с кафкой по сети необходимо открыть порт 9092. В зависимости от утилиты управления брандмауэром наши команды будут отличаться.

а) Для Iptables (как правило, для deb):

iptables -I INPUT -p tcp --dport 9092 -j ACCEPT

Для сохранения правил используем утилиту iptables-persistent:

apt install iptables-persistent

netfilter-persistent save

б) Для Firewalld (как правило, для rpm):

firewall-cmd --permanent --add-port=9092/tcp

firewall-cmd --permanent --reload

Установка Kafka

Как упоминалось выше, установка программного продукта выполняется путем распаковки бинарника, а также настройки и установки необходимых компонентов. Итого, мы должны:

  1. Установить JDK.
  2. Распаковать бинарник Kafka.
  3. Внести правки в конфигурационный файл.
  4. Создать юнит-файлы для запуска сервисов zookeeper и kafka.

Приступим.

Установка OpenJDK

В зависимости от типа Linux наши команды будут немного отличаться.

а) Для систем на основе DEB:

apt install default-jdk

б) Для систем на основе RPM:

yum install java-11-openjdk-devel

OpenJDK установлен.

Смотрим версию java:

java -version

Мы должны увидеть что-то на подобие:

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment ...

Установка и настройка Kafka

Переходим на страницу загрузки Kafka и копируем ссылку на скачиваем приложения:

Копируем ссылку для загрузки архива kafka

Используя скопированную ссылку, скачиваем бинарник:

wget https://downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz

Создадим каталог, куда установим кафку:

mkdir /opt/kafka

Распакуем скачанный архив в созданный каталог:

tar zxf kafka_*.tgz -C /opt/kafka --strip 1

Интернет сообщество сразу рекомендует внести небольшую правку в конфигурационный файл. Откроем его:

vi /opt/kafka/config/server.properties

Добавим строку:

delete.topic.enable = true

* данная директива разрешает ручное удаление темы из кафки.

Настройка запуска kafka

Осталось настроить запуск кафки в качестве сервиса. Мы создадим отдельного пользователя и создадим два юнита systemd — один для zookeeper, второй для kafka.

Для создания пользователя вводим:

useradd -r -c 'Kafka broker user service' kafka

Назначим владельцем созданного пользователя для каталога кафки:

chown -R kafka:kafka /opt/kafka

Создаем первый юнит-файл:

vi /etc/systemd/system/zookeeper.service

[Unit]
Description=Zookeeper Service
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure

[Install]
WantedBy=multi-user.target

Создаем второй файл для кафки:

vi /etc/systemd/system/kafka.service

[Unit]
Description=Kafka Service
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure

[Install]
WantedBy=multi-user.target

Перечитываем конфигурацию systemd, чтобы подхватить изменения:

systemctl daemon-reload

Разрешаем автозапуск сервисов zookeeper и kafka:

systemctl enable zookeeper kafka

Стартуем кафку:

systemctl start kafka

Обратите внимание, нам достаточно запустить службу kafka — благодаря настроенной зависимости, zookeeper стартует автоматически.

Проверить, что нужный нам сервис запустился и работаешь на порту 9092:

ss -tunlp | grep :9092

Тестовый обмен сообщениями

Попробуем немного научиться работать с кафкой и проверить, что сервис работает. Мы создадим тему для сообщений и отправим текст Hello, World from Kafka.

Нам понадобиться три скрипта, которые идут в комплекте с кафкой:

  • kafka-topics.sh — создает тему, куда будем отправлять сообщение.
  • kafka-console-producer.sh — создает обращение издателя, который отправляет сообщение.
  • kafka-console-consumer.sh — формирует запрос к брокеру и получает сообщение.

И так, первой командой мы создаем тему:

/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Test

* где:

  • /opt/kafka — путь, куда была установлена нами кафка.
  • bootstrap-server localhost:9092 — адрес хоста kafka. Предполагается, что мы запускаем нашу команду на том же сервере, где ее и развернули.
  • replication-factor — количество реплик журнала сообщений.
  • partitions — количество разделов в теме.
  • topic Test — в нашем примере мы создадим тему с названием Test.

Теперь отправляем сообщение брокеру:

echo "Hello, World from Kafka" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test

* в данном примере мы отправляем в наш сервер сообщение Hello, World from Kafka.

Попробуем достать сообщение. Выполняем команду:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --from-beginning

* опция from-beginning позволяет увидеть все сообщения, которые были отправлены в брокер до создания подписчика (отправки запроса на чтения данных из кафки).

Мы должны увидеть:

Hello, World from Kafka

При этом мы подключимся к серверу в интерактивном режиме. Не спешим выходить. Откроем вторую сессию SSH и еще раз введем:

echo "Hello, World from Kafka again" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test

Вернемся к предыдущей сессии SSH и мы должны увидеть:

Hello, World from Kafka
Hello, World from Kafka again

Можно считать, что программа минимум выполнена — Kafka установлена и работает.

# DevOps # Linux
Дмитрий Моск — частный мастер
Была ли полезна вам эта инструкция?

Да            Нет