※当サイトの記事には、広告・プロモーションが含まれます。

Spring BootでApache Kafkaとメッセージのやり取りをしてみる

nazology.net

⇧ amazing...

Queueを扱うシステム

そも、Queueとは?

In computer science, queue is a collection of entities that are maintained in a sequence and can be modified by the addition of entities at one end of the sequence and the removal of entities from the other end of the sequence. By convention, the end of the sequence at which elements are added is called the back, tail, or rear of the queue, and the end at which elements are removed is called the head or front of the queue, analogously to the words used when people line up to wait for goods or services.

https://en.wikipedia.org/wiki/Queue_(abstract_data_type)

Representation of a FIFO (first in, first out) queue

https://en.wikipedia.org/wiki/Queue_(abstract_data_type)

⇧ entityのcollectionであると。う~む、分からん。

で、Javaで利用できるQueueを扱えるシステムは、

⇧ 様々なものがありますと。

Apache Kafkaとは?

公式のドキュメントによると、

kafka.apache.org

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

https://kafka.apache.org/

⇧ 分散イベントストリーミングプラットフォームですと、う~む、全く分からん...

⇧ Topicなるものが中心となってそうなんだけど、

aws.plainenglish.io

⇧ 全体像としては、上図がイメージしやすいかと。

Spring BootでApache Kafkaを使ってみる

とりあえず、使ってみますか。

kagamihoge.hatenablog.com

⇧ 上記サイト様を参考にさせていただいたところ、Apache KafkaのKafka Clusterってのを管理するのにApache ZooKeeperってのが必要らしい。

イメージ的には、Javaを使う場合に限らないと思うけど、

本番環境ではサーバーが3つ必要になる感じなんかな。

ちなみに、ZooKeeperさん

zookeeper.apache.org

ZooKeeper is a distributed, open-source coordination service for distributed applications. It exposes a simple set of primitives that distributed applications can build upon to implement higher level services for synchronization, configuration maintenance, and groups and naming. It is designed to be easy to program to, and uses a data model styled after the familiar directory tree structure of file systems. It runs in Java and has bindings for both Java and C.

https://zookeeper.apache.org/doc/current/zookeeperOver.html

JavaC言語で稼働してる模様。

ZooKeeper用にサーバーが必要なので、仮想マシンかコンテナを用意せざるを得ないってことかと。

ちなみに、

developer.confluent.io

⇧ ZooKeeperの代わりに、KRaftってのを利用する方法もあるらしい、と言うか、

The Kafka community plans to deprecate ZooKeeper in the next release (3.4) and then remove it entirely in version 4.0.

https://developer.confluent.io/learn/kraft/

⇧ ZooKeeperが廃止される予定なんだとか。

ってことは、新規で導入するんであればKRaftってものを使っていった方が良いってことですかね。

今回は、WSL 2(Windows SubSystem for Linux 2)のUbuntuApache Kafkaをインストールしようと思います。

今回の件とは関係ないですが、

ts0818.hatenablog.com

⇧ 上記の記事の時のように、固定IPアドレスを設定して起動することにします。

Apache Kafkaをダウンロード。

wget https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz    

展開(解凍)します。

tar kafka_2.13-3.3.1.tgz    

Apache Kafkaを動かすには、Java 8以上をインストールしてる必要があるようです、インストールされとらんかった...

Javaのインストールを行うも、

インストール失敗で、apt-get updateしろって言われたのでする。

再度、Javaのインストール。

Javaがインストールできたので、Apache Kafkaを起動する。

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"    
 bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties    
bin/kafka-server-start.sh config/kraft/server.properties    

Apache Kafkaのサーバーが起動した模様。

あとは、

developer.mamezou-tech.com

⇧ topicまでは作成しておく必要があるようなので、コマンドプロンプトをもう一つ起ち上げて、WSL 2のUbuntuにログインして、topicを作成。

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

で、WSL 2(Windows SubSystem for Linux 2)だけの問題なのか、

docs.conduktor.io

stackoverflow.com

Apache Kafkaの設定ファイルを編集するか、Windows側でプロキシのポートフォワードする必要があるらしい。

今回は、Windows側でプロキシのポートフォワードする方法で対応しました。

ここまで、用意できたら、Java側からメッセージが送れるようになるらしいので、Java側の準備を進めます。

Eclipseを起動して、「ファイル(F)」>「新規(N)」>「Spring スターター・プロジェクト(Spring Initializr)」を選択。

「名前」を適当に入力し、ビルドツールにGradleを使うことにするので、「タイプ:」を「Gradle - Groovy」にして、「次へ(N)>」を押下。

「Spring for Apache Kafka」の依存関係を追加で。

で、プロジェクトが作成されたらば、Apache KafkaにJavaのオブジェクトとかを送りたい場合は、jackson-databindの依存関係を追加する必要があるらしいので追加。

■/kafka-example/build.gradle

plugins {
	id 'java'
	id 'org.springframework.boot' version '2.7.8-SNAPSHOT'
	id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
	maven { url 'https://repo.spring.io/milestone' }
	maven { url 'https://repo.spring.io/snapshot' }
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.kafka:spring-kafka'
	implementation 'com.fasterxml.jackson.core:jackson-databind'
	compileOnly 'org.projectlombok:lombok'
	developmentOnly 'org.springframework.boot:spring-boot-devtools'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

tasks.named('test') {
	useJUnitPlatform()
}

そしたらば、application.propertiesに設定を追加。

■/kafka-example/src/main/resources/application.properties

spring.kafka.bootstrap-servers=10.255.255.16:9092
spring.kafka.consumer.group-id=myGroup

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
    

IPアドレスは、ご自身の環境に合わせてください。DockerでコンテナとかでApache Kafkaを動かしてる場合は、localhostでもつながるらしいけど。

ファイルの配置などは以下のような感じになりました。

■/kafka-example/src/main/java/com/example/demo/dto/SampleDto.java

package com.example.demo.dto;

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleDto implements Serializable {
  private String id;
  private String text;
}

■/kafka-example/src/main/java/com/example/demo/publisher/SamplePublisher.java

package com.example.demo.publisher;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.example.demo.dto.SampleDto;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class SamplePublisher {
	KafkaTemplate<String, SampleDto> kafkaTemplate;
	
	public void publish(SampleDto sampleDto) {
		kafkaTemplate.send("sample-topic", sampleDto);
	}
}

■/kafka-example/src/main/java/com/example/demo/consumer/SampleConsumer.java

package com.example.demo.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.example.demo.dto.SampleDto;

@Component
public class SampleConsumer {

	@KafkaListener(topics = "sample-topic")
	public void recieveMessage(SampleDto sampleDto) {
		System.out.println(sampleDto);
	}
}

■/kafka-example/src/main/java/com/example/demo/controller/SampleController.java

package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.demo.dto.SampleDto;
import com.example.demo.publisher.SamplePublisher;

@RestController
@RequestMapping("sample")
public class SampleController {

	@Autowired
	private SamplePublisher samplePublisher;
	
	@GetMapping
	public String sample() {
		SampleDto sampleDto = new SampleDto("000000000001", "kafka test");
		samplePublisher.publish(sampleDto);
		return "OK";
		
	}
}

で、保存して、Spring Bootを実行で。

ブラウザで、Controllerクラスのエンドポイントにアクセス。

Spring BootのアプリケーションからApache Kafka側にメッセージが送られ、

Spring Boot側でApache Kafka側からの応答を受け取れました。

というわけで、Spring BootとApache Kafkaでメッセージのやり取りができました。

ただ、イマイチ、Apache Kafkaの上手い活用方法が分からんですな...

毎度モヤモヤ感が半端ない...

今回はこのへんで。