1. 连不上远程的kafka
将
listeners=PLAINTEXT://localhost:9092
改为
listeners=PLAINTEXT://172.20.191.83:9092
在客户端与服务端的 /etc/hosts 中增加 172.20.191.83 zk1
连接服务器时,即可指定ip为 zk1
2. kafka日志分段(.log文件)及日志文件索引机制(偏移量索引、时间戳索引)
在kafka数据存储的目录下,进入topic文件目录,可以看到多个文件,如下,从文件名可以看出,.log、.index、.timeindex文件一一对应:
root@Win11:/opt/kafka/logs/quickstart-events-1# ll
total 60
drwxr-xr-x 2 root root 4096 May 15 10:34 ./
drwxr-xr-x 69 root root 4096 May 15 11:03 ../
-rw-r--r-- 1 root root 10485760 May 15 11:00 00000000000000000000.index
-rw-r--r-- 1 root root 38112 May 15 11:00 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 15 11:00 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 May 15 10:34 leader-epoch-checkpoint
文件说明
文件类别 | 作用 |
---|---|
.index | 消息的物理地址的偏移量索引文件 |
.timeindex | 映射时间戳和相对offset的时间戳索引文件 |
.log | 日志文件(消息存储文件) |
.snapshot | 对幂等型或者事务型producer所生成的快照文件 |
leader-epoch-checkpoint | 保存了每一任leader开始写入消息时的offset, 会定时更新 |
参数配置
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824(1G)
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
日志文件查看
输出批消息日志
kafka-dump-log --files 00000000000001778276.log
输出内容
baseOffset: 391 lastOffset: 391 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 37536 CreateTime: 1652583638947 size: 96 magic: 2 compresscodec: NONE crc: 4165909692 isvalid: true
baseOffset: 392 lastOffset: 392 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 37632 CreateTime: 1652583638949 size: 96 magic: 2 compresscodec: NONE crc: 2071119199 isvalid: true
输出具体数据
kafka-dump-log --files 00000000000001778276.log --print-data-log
输出内容
baseOffset: 396 lastOffset: 396 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 38016 CreateTime: 1652583638956 size: 96 magic: 2 compresscodec: NONE crc: 3080963842 isvalid: true
| offset: 396 isValid: true crc: null keySize: 7 valueSize: 21 CreateTime: 1652583638956 baseOffset: 396 lastOffset: 396 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 96 magic: 2 compressType: NONE position: 38016 sequence: -1 headerKeys: [] key: Message payload: 这是第199条数据
偏移量索引文件查看
命令
kafka-dump-log --files 00000000000001778276.index
输出内容
Dumping 00000000000001778276.index
offset: 1778431 position: 44169
offset: 1778446 position: 79128
offset: 1778511 position: 86925
offset: 1778529 position: 120070
offset: 1778592 position: 129379
offset: 1778608 position: 161537
offset: 1778672 position: 169791
offset: 1778694 position: 202354
offset: 1778760 position: 213640
Kafka的数据都是按序插入的,offset也是按序增长的,因此很适合用二分查找定位指定偏移量的数据(网上也都是说用的二分查找):
- 根据二分查找,定位数据所在的log和index文件
- 通过二分查找,定位最大且不大于指定偏移量的offset
- 根据查找到的offset,定位到批消息位置
- 在批消息中通过position定位到指定数
时间戳索引文件查看
命令
kafka-dump-log --files 00000000000001778276.timeindex
输出内容
Dumping 00000000000001778276.timeindex
timestamp: 1645765272680 offset: 1778427
timestamp: 1645765272681 offset: 1778442
timestamp: 1645765275680 offset: 1778507
timestamp: 1645765275681 offset: 1778521
timestamp: 1645765278680 offset: 1778583
timestamp: 1645765278681 offset: 1778597
timestamp: 1645765281680 offset: 1778667
3. 如何在指定位置消费
public static void main(String[] args) {
Properties p = new Properties();
//连接到服务器
// p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kserver1:19092,kserver2:19092,kserver3:19092");
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
p.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, ConsumerConfig.DEFAULT_ALLOW_AUTO_CREATE_TOPICS);
//反序列化
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//批量消费数量
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
//添加 group id
p.put(ConsumerConfig.GROUP_ID_CONFIG, "20220514");
KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(p);
ArrayList<String> topics = new ArrayList<>();
topics.add("quickstart-events");
kafkaConsumer.subscribe(topics);
//start consume
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
//获取消费者分区分配信息
assignment = kafkaConsumer.assignment();
}
//指定从100位置开始消费
for (TopicPartition topicPartition : assignment) {
//partition 1 从300 offset开始消费
if (topicPartition.partition() == 1) {
kafkaConsumer.seek(topicPartition, 300);
}
}
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
4. 自定义消息对象
User
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private String username;
private String sex;
}
UserDecoder
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.example.pojo.User;
import org.example.util.SerializeUtil;
import java.util.Map;
@Slf4j
public class UserDecoder implements Deserializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public User deserialize(String topic, byte[] data) {
return (User) SerializeUtil.deserialize(data, User.class);
}
@Override
public User deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void close() {
Deserializer.super.close();
}
}
UserEncoder
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.example.pojo.User;
import org.example.util.SerializeUtil;
import java.util.Map;
public class UserEncoder implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, User data) {
return SerializeUtil.serialize(data);
}
@Override
public byte[] serialize(String topic, Headers headers, User data) {
return Serializer.super.serialize(topic, headers, data);
}
@Override
public void close() {
Serializer.super.close();
}
}
SerializeUtil
import java.io.*;
@Slf4j
public class SerializeUtil {
public static byte[] serialize(Object o) {
ObjectOutputStream oos;
ByteArrayOutputStream baos;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(o);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static <T> Object deserialize(byte[] bytes, Class<T> className) {
ByteArrayInputStream bais;
T tmpObj;
try {
bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
tmpObj = (T) ois.readObject();
return tmpObj;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.example.cfg.UserEncoder;
import org.example.pojo.User;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ProducerTest {
final static String topic = "quickstart-events";
final static String serverList = "172.20.191.83:9092";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
p.put(ProducerConfig.ACKS_CONFIG, "all");
p.put(ProducerConfig.RETRIES_CONFIG, "3");
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserEncoder.class.getName());
KafkaProducer<String, User> producer = new KafkaProducer<>(p);
String msgStr = "这是第%d条数据";
for (int i = 1; i < 300; i++) {
Future<RecordMetadata> message =
producer.send(
new ProducerRecord<String, User>(topic, "User" + i, new User("user" + i, "sex" + i)
), (metadata, exception) ->
System.out.println(
"插入完成: offset:" + metadata.offset() + " partition: " + metadata.partition()));
message.get();
}
}
}
Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.cfg.UserDecoder;
import org.example.pojo.User;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class ConsumerTest {
final static String serverList = "172.20.191.83:9092";
// final static String serverList = "zk1:9092";
public static void main(String[] args) {
Properties p = new Properties();
//连接到服务器
// p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kserver1:19092,kserver2:19092,kserver3:19092");
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
p.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, ConsumerConfig.DEFAULT_ALLOW_AUTO_CREATE_TOPICS);
//反序列化
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDecoder.class.getName());
//批量消费数量
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
//添加 group id
p.put(ConsumerConfig.GROUP_ID_CONFIG, "20220514");
KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(p);
ArrayList<String> topics = new ArrayList<>();
topics.add("quickstart-events");
kafkaConsumer.subscribe(topics);
//start consume
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
//获取消费者分区分配信息
assignment = kafkaConsumer.assignment();
}
//指定从100位置开始消费
for (TopicPartition topicPartition : assignment) {
//partition 1 从300 offset开始消费
// if (topicPartition.partition() == 1) {
kafkaConsumer.seek(topicPartition, 0);
// }
}
while (true) {
ConsumerRecords<String, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, User> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value().toString());
}
}
}
}
5. 删除Topic
./bin/kafka-topics --delete --zookeeper 【zookeeper server】 --topic 【topic name】