个性化阅读
专注于IT技术分析

kafka消费者组cli

本文概述

通常,Kafka消费者属于特定的消费者组。使用者组基本上代表了应用程序的名称。为了使用使用者组中的消息,使用了“ -group”命令。

让我们看看消费者将如何消费来自Kafka主题的消息:

步骤1:打开Windows命令提示符。

步骤2:使用“ -group”命令,例如:“ kafka-console-consumer -bootstrap-server localhost:9092 -topic -group <group_name>”。给小组起个名字。按回车。

在上面的快照中,组的名称为“ first_app”。可以看到没有显示任何消息,因为没有为此主题生成新消息。如果将使用“ -from-beginning”命令,则将显示所有先前的消息。

步骤3:要查看一些新消息,请从生产者控制台生成一些即时消息(如上一节所述)。

因此,生产者产生的新消息可以在消费者的控制台中看到。

第4步:但是,只有一个消费者在该组中读取数据。让我们创建更多的消费者以了解消费者群体的力量。为此,请打开一个新终端并键入与以下命令完全相同的使用者命令:

‘kafka-console-consumer.bat-引导服务器127.0.0.1:9092 –topic <主题名称> –group <组名称>”。

在上面的快照中,很明显生产者正在将数据发送到Kafka主题。两个使用者正在使用消息。查看消息的顺序。由于为“ myfirst”主题创建了三个分区(前面已经讨论过),因此仅按该顺序拆分消息。

我们可以在同一组中进一步创建更多使用者,每个使用者将根据分区数使用消息。尝试自己更好地理解。

注意:组ID应该相同,然后只有消息将在使用者之间分配。

但是,如果终止任何使用者,则将分区重新分配给活动使用者,并且这些活动使用者将接收到消息。

因此,以这种方式,消费者组中的各个消费者都消费了来自Kafka主题的消息。

带Key的消费者

生产者在数据中附加了键值后,它将被存储到该指定分区。如果未指定键值,则数据将移动到任何分区。因此,当消费者使用密钥读取消息时,如果未指定密钥,则该消息将显示为空。要使用来自Kafka主题的消息,需要使用’print.key’和’key.seperator’。使用的命令是:

‘kafka-console-consumer -bootstrap-server localhost:9092 -topic <topic_name> –from-beginning -property print.key = true -property key.seperator =,’

使用上述命令,使用者可以使用指定的键读取数据。

有关消费者组的更多信息

‘–from-beginning’命令

该命令用于从头开始阅读消息(前面已经讨论过)。因此,在使用者组中使用它会产生以下输出:

可以注意到,新的使用者组“ second_app”用于从头开始读取消息。如果再执行一次同一命令,它将不会显示任何输出。这是因为偏移量是在Apache Kafka中提交的。因此,一旦消费者组读取了所有直到写入的消息,下一次它将仅读取新消息。

例如,在下面的快照中,当再次使用“ -from-beginning”命令时,仅读取新消息。这是因为所有先前的消息仅在较早时使用。

‘kafka-consumer-groups’命令

此命令提供了整个文档,以列出所有组,描述组,删除使用者信息或重置使用者组偏移量。

它需要引导服务器,客户端才能在使用者组上执行不同的功能。

列出消费者群体

“ -list”命令用于列出Kafka群集中可用的使用者组数量。该命令用作:

‘kafka-consumer-groups.bat -bootstrap-server localhost:9092 -list’。

快照如下所示,其中存在三个消费者组。

描述消费者群

“ –describe”命令用于描述使用者组。该命令用作:

‘kafka-consumer-groups.bat-引导服务器-本地主机:9092-描述组<group_name>’

此命令描述是否存在任何活动的使用者,当前偏移值,滞后值为0-表示该使用者已读取所有数据。

重置偏移

偏移在Apache Kafka中提交。因此,如果用户想再次阅读消息,则需要重置偏移值。 “ Kafka-consumer-groups”命令提供了一个重置​​偏移量的选项。重置偏移值意味着定义用户要从其再次读取消息的位置。它一次仅支持一个消费者组,并且该组不应有任何活动实例。

重置偏移量时,用户需要选择三个参数:

  1. 执行选项
  2. 重置规格
  3. 范围

有两个执行选项可用:

‘-dry-run’:这是默认执行选项。此选项用于计划需要重置的偏移量。

-execute’:此选项用于更新偏移值。

有以下可用的重置规格:

‘-to-datetime’:根据从datetime开始的偏移量重置偏移量。使用的格式为:“ YYYY-MM-DDTHH:mm:SS.sss”。

‘–to-estally’:将偏移量重置为最早的偏移量。

‘–to-latest’:将偏移量重置为最新的偏移量。

‘–shift-‘:通过将当前偏移值偏移’n’来重置偏移。 “ n”的值可以为正或负。

‘–from-file’:将偏移量重置为CSV文件中定义的值。

‘–to-current’:将偏移量重置为当前偏移量。

可以定义两个范围:

‘-all-topics’:重置组中所有可用主题的偏移值。

‘-topics’:仅重置指定主题的偏移值。用户需要指定主题名称以重置偏移值。

让我们尝试看看:

1)使用“-至最早”命令

在上面的快照中,偏移量被重置为新偏移量0。这是因为使用了“ -to-最早”命令,该命令已将偏移量值重置为0。

2)使用-shift-by命令

在第一个快照中,偏移值从“ 0”移到“ 2”。在第二个中,偏移值从“ 2”转换为“ -1”。

注意:要将偏移值移为正数,没有必要使用”符号。默认情况下,仅将其视为正数。


赞(0)
未经允许不得转载:srcmini » kafka消费者组cli

评论 抢沙发

评论前必须登录!