Make sure that you are in Kafka folder:

cd <path to Kafka folder>

Bellow example is Localhost, but you can do with any server if the true port is opened. Default Kafka port is 9092. You can change port number in config/server.properties.

1- Topics Kafka command

1.1- Create new topic

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1  \
--create \

more options:

  • replication-factor: <= number of Brokers in your Kafka cluster. This mean data’s “copies” number.
  • partitions: the number of partitions for this topic. More partitions = faster. But don’t create too many partitions, it makes your server slow.

If replication-factor > 1 and partitions > 1, every broker leader some partitions.

--replication-factor 2 \
--partitions 2 \

Good choice for Basic Kafka Cluster is 3 broker servers with --replication-factor 3

Note: When a Producer send message to a Topic-name, if topic is not exist, it automatic create with default config in config/server.properties. So, you should create topic by hand.

1.2- Check topic informations

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1  \
--describe \

result:

Topic: topic1 TopicId: Mf6o9PJnQgmCQ9_URwuCtw PartitionCount: 2 ReplicationFactor: Configs: segment.bytes=1073741824
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1

above is a informations table:

  • Partition: PartitionID (integer starting at 0)
  • Leader: BrokerID (integer)
  • Replicas: List of BrokerIDs are working
  • Isr: List of BrokerIDs In Sync

1.3- Delete topic

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1  \
--delete \

If delete fail, you can delete 2 dirs (look path log.dirs in config/server.properties and dataDir in config/zookeeper.properties), all data be remove.

1.4- Alter number of partitions on Topic

Change to 5 partions

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--alter --topic myTopicName  \
--partitions 5

Then, if you want your producer send messages to all 5 partitions:

// Define:
const myPartitions = [0,1,2,3,4]
var pointer = 0
/** */
pointer = pointer < myPartitions.length -1 ? (pointer + 1) : 0
await producer.send({
    topic: myTopicName,
    messages: [
        { key: key, value: JSON.stringify(message), headers: { time: '' + Date.now() }, partition: pointer }
    ],
})

All consumers in one GroupID automatically rebalance

2- Write the events (send messages into the topic)

bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic1  \

Then you write somethings…

3- Read the events (read messages from the topic)

bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic1  \

more options:

  • from-beginning: read all message in the topic
  • group: read by a group name, from where “we” leaving (all consumer unread). This options will disable –from-beginning.
  • max-messages: Number of messages you will read then exit
  • partition: Select partition you want to read. Just on topic, not work with a group
--from-beginning \
--group mygroup1 \
--max-messages 10 \
--partition 0 \

4- Consumer group

4.1- Create a consumer group

Group create in read event command

4.2 – List all group

bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list \

4.3- Check consumer-group status:

bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group myGroupName \

result:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroupName topicName 0 25889 25889 0 consumerId /10.5.11.18     clientId
myGroupName topicName 1 - 0 - consumerId /10.5.11.18 clientId 

4.4- Change offset:

Change current-offset from 4.3 by reduce 999 (comback).

First, we need to stop current client in this myGroupName (your apps). Then try it:

bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--reset-offsets --shift-by -999 \
--topic myTopic \
--group myGroupName \
--execute

Or to datetime (ISO: UTC+0):

bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--reset-offsets --to-datetime "2020-02-26T21:00:00.000+0000" \
--topic myTopic \
--group myGroupName \
--execute

Some other options:

  • –to-earliest
  • –to-latest
  • If topic have many partitions, can specify partition: myTopic:0,1,2

<to be continued…>

DMCA.com Protection Status


Leave a Reply

Your email address will not be published. Required fields are marked *