Make sure that you are in Kafka folder:
cd <path to Kafka folder>
Bellow example is Localhost, but you can do with any server if the true port is opened. Default Kafka port is 9092. You can change port number in config/server.properties
.
1- Topics Kafka command
1.1- Create new topic
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1 \
--create \
more options:
- replication-factor: <= number of Brokers in your Kafka cluster. This mean data’s “copies” number.
- partitions: the number of partitions for this topic. More partitions = faster. But don’t create too many partitions, it makes your server slow.
If replication-factor > 1 and partitions > 1, every broker leader some partitions.
--replication-factor 2 \
--partitions 2 \
Good choice for Basic Kafka Cluster is 3 broker servers with --replication-factor 3
Note: When a Producer send message to a Topic-name, if topic is not exist, it automatic create with default config in config/server.properties
. So, you should create topic by hand.
1.2- Check topic informations
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1 \
--describe \
result:
Topic: topic1 TopicId: Mf6o9PJnQgmCQ9_URwuCtw PartitionCount: 2 ReplicationFactor: Configs: segment.bytes=1073741824
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
above is a informations table:
- Partition: PartitionID (integer starting at 0)
- Leader: BrokerID (integer)
- Replicas: List of BrokerIDs are working
- Isr: List of BrokerIDs In Sync
1.3- Delete topic
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1 \
--delete \
If delete fail, you can delete 2 dirs (look path log.dirs in config/server.properties
and dataDir in config/zookeeper.properties), all data be remove.
1.4- Alter number of partitions on Topic
Change to 5 partions
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--alter --topic myTopicName \
--partitions 5
Then, if you want your producer send messages to all 5 partitions:
// Define:
const myPartitions = [0,1,2,3,4]
var pointer = 0
/** */
pointer = pointer < myPartitions.length -1 ? (pointer + 1) : 0
await producer.send({
topic: myTopicName,
messages: [
{ key: key, value: JSON.stringify(message), headers: { time: '' + Date.now() }, partition: pointer }
],
})
All consumers in one GroupID automatically rebalance
2- Write the events (send messages into the topic)
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic1 \
Then you write somethings…
3- Read the events (read messages from the topic)
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic1 \
more options:
- from-beginning: read all message in the topic
- group: read by a group name, from where “we” leaving (all consumer unread). This options will disable –from-beginning.
- max-messages: Number of messages you will read then exit
- partition: Select partition you want to read. Just on topic, not work with a group
--from-beginning \
--group mygroup1 \
--max-messages 10 \
--partition 0 \
4- Consumer group
4.1- Create a consumer group
Group create in read event command
4.2 – List all group
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list \
4.3- Check consumer-group status:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group myGroupName \
result:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroupName topicName 0 25889 25889 0 consumerId /10.5.11.18 clientId
myGroupName topicName 1 - 0 - consumerId /10.5.11.18 clientId
4.4- Change offset:
Change current-offset from 4.3 by reduce 999 (comback).
First, we need to stop current client in this myGroupName (your apps). Then try it:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--reset-offsets --shift-by -999 \
--topic myTopic \
--group myGroupName \
--execute
Or to datetime (ISO: UTC+0):
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--reset-offsets --to-datetime "2020-02-26T21:00:00.000+0000" \
--topic myTopic \
--group myGroupName \
--execute
Some other options:
- –to-earliest
- –to-latest
- If topic have many partitions, can specify partition: myTopic:0,1,2
<to be continued…>