个性化阅读
专注于IT技术分析

在java中创建kafka生产者(producer)

本文概述

在上一节中,我们学习了创建Kafka项目的基本步骤。现在,在用Java创建Kafka生产者之前,我们需要定义基本的Project依赖项。在我们的项目中,将需要两个依赖项:

  1. 卡夫卡依赖
  2. 记录依存关系,即SLF4J Logger。

设置依赖关系需要执行以下步骤:

步骤1:构建工具Maven包含一个“ pom.xml”文件。 “ pom.xml”是默认XML文件,其中包含有关GroupID,ArtifactID以及Version值的所有信息。用户需要在“ pom.xml”文件中定义所有必要的项目依赖项。转到“ pom.xml”文件。

步骤2:首先,我们需要定义Kafka依赖关系。创建一个'<dependencies> … </ dependencies>’块,在其中定义所需的依赖项。

步骤3:现在,打开网络浏览器并搜索“ Kafka Maven”,如下所示:

单击突出显示的链接,然后选择“ Apache Kafka,Kafka-Clients”存储库。下面的快照中显示了一个示例:

步骤4:根据系统上下载的Kafka版本选择存储库版本。例如,在本教程中,我们使用的是“ Apache Kafka 2.3.0”。因此,我们需要存储库版本2.3.0(突出显示的版本)。

步骤5:点击存储库版本后,将打开一个新窗口。从那里复制依赖关系代码。

由于我们正在使用Maven,因此请复制Maven代码。如果用户使用Gradle,请复制Gradle编写的代码。

步骤6:将复制的代码粘贴到'<dependencies> … </ dependencies>’块中,如下所示:

如果版本号显示为红色,则表示用户错过了启用“自动导入”选项的权限。如果是这样,请转到“视图”>“工具Windows”>“ Maven”。 Maven项目窗口将出现在屏幕的右侧。单击出现在此处的“刷新”按钮。这将启用丢失的自动导入Maven项目。如果颜色变为黑色,则表示已下载缺少的依赖项。用户可以继续下一步。

步骤7:现在,打开Web浏览器并搜索“ SL4J Simple”,然后打开以下快照中所示的突出显示的链接:

一堆存储库将出现。单击适当的存储库。

要了解适当的存储库,请查看Maven项目窗口,并在“依赖关系”下查看slf4j版本。

单击适当的版本并复制代码,然后将其粘贴在Kafka依赖项下方的“ pom.xml”文件中,如下所示:

注意:在代码中添加注释或删除 test 标记行。因为此作用域标签为该依赖项定义了一个有限的作用域,并且我们对所有代码都需要此依赖项,所以不应限制该作用域。

现在,我们已经设置了所有必需的依赖项。让我们尝试“简单的Hello World”示例。

首先,创建一个名为“ com.firstgroupapp.aktutorial”的Java包,并在其下方创建一个Java类。创建Java软件包时,请遵循软件包命名约定。最后,创建“ hello world”程序。

执行’producer1.java’文件后,输出成功显示为’Hello World’。这说明IntelliJ IDEA的成功工作。

创建Java生产者

基本上,创建Java生产者有四个步骤,如前所述:

  1. 创建生产者属性
  2. 创建生产者
  3. 创建生产者记录
  4. 发送数据。

创建生产者属性

Apache Kafka提供了用于创建生产者的各种Kafka属性。要了解每个资源,请访问Apache的官方网站,即“ https://kafka.apache.org”。移至Kafka>文档>配置>生产者配置。

在那里,用户可以了解Apache Kafka提供的所有生产者属性。在这里,我们将讨论所需的属性,例如:

  1. bootstrap.servers:这是端口对的列表,用于建立与Kafka集群的初始连接。用户只能将引导服务器用于建立初始连接。该服务器以host:port,host:port等形式存在。
  2. key.serializer:它是密钥的一种Serializer类,用于实现“ org.apache.kafka.common.serialization.Serializer”接口。
  3. value.serializer:这是一种Serializer类,它实现“ org.apache.kafka.common.serialization.Serializer”接口。

现在,让我们看看IntelliJ IDEA中生产者属性的实现。

当我们创建属性时,它将“ java.util.Properties”导入到代码中。

这样,完成了创建生产者属性的第一步。

创建生产者

要创建一个Kafka生产者,我们只需要创建一个KafkaProducer对象。

KafkaProducer的对象可以创建为:

KafkaProducer<String, String> first_producer = new KafkaProducer<String, String>(properties);

在这里,“ first_producer”是我们选择的生产者的名称。用户可以据此进行选择。

让我们在下面的快照中看到:

创建生产者记录

为了将数据发送到Kafka,用户需要创建一个ProducerRecord。这是因为所有生产者都位于生产者记录内。生产者在此处指定主题名称以及要传递给Kafka的消息。

可以通过以下方式创建ProducerRecord:

ProducerRecord<String, String> record=new ProducerRecord<String, String>("my_first", "Hye Kafka");

在这里,“记录”是用于创建生产者记录的名称,“ my_first”是主题名称,“ Hye Kafka”是消息。用户可以据此进行选择。

让我们在下面的快照中看到:

发送数据

现在,用户已准备好将数据发送到Kafka。生产者只需要按以下方式调用ProducerRecord的对象:

first_producer.send(record);

让我们在下面的快照中看到:

要了解上述代码的输出,请使用以下命令在CLI上打开“ kafka-console-consumer”:

‘kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic my_first -group first_app’

生产者产生的数据是异步的。因此,需要两个附加函数,即flush()和close()(如上图所示)。 flush()将强制生成所有数据,而close()将停止生产器。如果不执行这些功能,则数据将永远不会发送到Kafka,并且消费者将无法读取它。

下面将使用者控制台上的代码输出显示为:

在终端上,用户可以看到各种日志文件。航站楼的最后一行说,卡夫卡生产商已经关闭。因此,该消息将异步显示在使用者控制台上。


赞(0)
未经允许不得转载:srcmini » 在java中创建kafka生产者(producer)

评论 抢沙发

评论前必须登录!