1. 连不上远程的kafka

listeners=PLAINTEXT://localhost:9092
改为
listeners=PLAINTEXT://172.20.191.83:9092

在客户端与服务端的 /etc/hosts 中增加 172.20.191.83 zk1

连接服务器时,即可指定ip为 zk1

2. kafka日志分段(.log文件)及日志文件索引机制(偏移量索引、时间戳索引)

在kafka数据存储的目录下,进入topic文件目录,可以看到多个文件,如下,从文件名可以看出,.log、.index、.timeindex文件一一对应:

root@Win11:/opt/kafka/logs/quickstart-events-1# ll
total 60
drwxr-xr-x  2 root root     4096 May 15 10:34 ./
drwxr-xr-x 69 root root     4096 May 15 11:03 ../
-rw-r--r--  1 root root 10485760 May 15 11:00 00000000000000000000.index
-rw-r--r--  1 root root    38112 May 15 11:00 00000000000000000000.log
-rw-r--r--  1 root root 10485756 May 15 11:00 00000000000000000000.timeindex
-rw-r--r--  1 root root        8 May 15 10:34 leader-epoch-checkpoint

文件说明

文件类别 作用
.index 消息的物理地址的偏移量索引文件
.timeindex 映射时间戳和相对offset的时间戳索引文件
.log 日志文件(消息存储文件)
.snapshot 对幂等型或者事务型producer所生成的快照文件
leader-epoch-checkpoint 保存了每一任leader开始写入消息时的offset, 会定时更新

参数配置

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824(1G)

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

日志文件查看

输出批消息日志

kafka-dump-log --files 00000000000001778276.log

输出内容

baseOffset: 391 lastOffset: 391 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 37536 CreateTime: 1652583638947 size: 96 magic: 2 compresscodec: NONE crc: 4165909692 isvalid: true
baseOffset: 392 lastOffset: 392 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 37632 CreateTime: 1652583638949 size: 96 magic: 2 compresscodec: NONE crc: 2071119199 isvalid: true

输出具体数据

kafka-dump-log --files 00000000000001778276.log --print-data-log

输出内容

baseOffset: 396 lastOffset: 396 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 38016 CreateTime: 1652583638956 size: 96 magic: 2 compresscodec: NONE crc: 3080963842 isvalid: true
| offset: 396 isValid: true crc: null keySize: 7 valueSize: 21 CreateTime: 1652583638956 baseOffset: 396 lastOffset: 396 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 96 magic: 2 compressType: NONE position: 38016 sequence: -1 headerKeys: [] key: Message payload: 这是第199条数据

偏移量索引文件查看

命令

kafka-dump-log --files 00000000000001778276.index

输出内容

Dumping 00000000000001778276.index
offset: 1778431 position: 44169
offset: 1778446 position: 79128
offset: 1778511 position: 86925
offset: 1778529 position: 120070
offset: 1778592 position: 129379
offset: 1778608 position: 161537
offset: 1778672 position: 169791
offset: 1778694 position: 202354
offset: 1778760 position: 213640

Kafka的数据都是按序插入的,offset也是按序增长的,因此很适合用二分查找定位指定偏移量的数据(网上也都是说用的二分查找):

  1. 根据二分查找,定位数据所在的logindex文件
  2. 通过二分查找,定位最大不大于指定偏移量的offset
  3. 根据查找到的offset,定位到批消息位置
  4. 在批消息中通过position定位到指定数

时间戳索引文件查看

命令

kafka-dump-log --files 00000000000001778276.timeindex

输出内容

Dumping 00000000000001778276.timeindex
timestamp: 1645765272680 offset: 1778427
timestamp: 1645765272681 offset: 1778442
timestamp: 1645765275680 offset: 1778507
timestamp: 1645765275681 offset: 1778521
timestamp: 1645765278680 offset: 1778583
timestamp: 1645765278681 offset: 1778597
timestamp: 1645765281680 offset: 1778667

3. 如何在指定位置消费

    public static void main(String[] args) {
        Properties p = new Properties();

        //连接到服务器
//        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kserver1:19092,kserver2:19092,kserver3:19092");
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
        p.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, ConsumerConfig.DEFAULT_ALLOW_AUTO_CREATE_TOPICS);
        //反序列化
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //批量消费数量
        p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        //添加 group id
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "20220514");

        KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(p);

        ArrayList<String> topics = new ArrayList<>();
        topics.add("quickstart-events");
        kafkaConsumer.subscribe(topics);


        //start consume
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            //获取消费者分区分配信息
            assignment = kafkaConsumer.assignment();
        }

        //指定从100位置开始消费
        for (TopicPartition topicPartition : assignment) {
            //partition 1 从300 offset开始消费
            if (topicPartition.partition() == 1) {
                kafkaConsumer.seek(topicPartition, 300);
            }
        }

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }

4. 自定义消息对象

User

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {

    private static final long serialVersionUID = 1L;

    private String username;
    private String sex;
}

UserDecoder


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.example.pojo.User;
import org.example.util.SerializeUtil;

import java.util.Map;
@Slf4j
public class UserDecoder implements Deserializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Deserializer.super.configure(configs, isKey);
    }

    @Override
    public User deserialize(String topic, byte[] data) {
        return (User) SerializeUtil.deserialize(data, User.class);
    }

    @Override
    public User deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Deserializer.super.close();
    }
}

UserEncoder


import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.example.pojo.User;
import org.example.util.SerializeUtil;

import java.util.Map;

public class UserEncoder implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, User data) {
        return SerializeUtil.serialize(data);
    }

    @Override
    public byte[] serialize(String topic, Headers headers, User data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

SerializeUtil


import java.io.*;

@Slf4j
public class SerializeUtil {
    public static byte[] serialize(Object o) {
        ObjectOutputStream oos;
        ByteArrayOutputStream baos;

        try {
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            byte[] bytes = baos.toByteArray();
            return bytes;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static <T> Object deserialize(byte[] bytes, Class<T> className) {
        ByteArrayInputStream bais;
        T tmpObj;
        try {
            bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bais);
            tmpObj = (T) ois.readObject();
            return tmpObj;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }
}

Producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.example.cfg.UserEncoder;
import org.example.pojo.User;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerTest {

    final static String topic = "quickstart-events";
    final static String serverList = "172.20.191.83:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
        p.put(ProducerConfig.ACKS_CONFIG, "all");
        p.put(ProducerConfig.RETRIES_CONFIG, "3");
        p.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserEncoder.class.getName());

        KafkaProducer<String, User> producer = new KafkaProducer<>(p);

        String msgStr = "这是第%d条数据";
        for (int i = 1; i < 300; i++) {
            Future<RecordMetadata> message =
                    producer.send(
                            new ProducerRecord<String, User>(topic, "User" + i, new User("user" + i, "sex" + i)
                            ), (metadata, exception) ->
                                    System.out.println(
                                            "插入完成: offset:" + metadata.offset() + " partition: " + metadata.partition()));
            message.get();
        }
    }
}

Consumer

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.cfg.UserDecoder;
import org.example.pojo.User;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

public class ConsumerTest {

    final static String serverList = "172.20.191.83:9092";
//    final static String serverList = "zk1:9092";


    public static void main(String[] args) {
        Properties p = new Properties();

        //连接到服务器
//        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kserver1:19092,kserver2:19092,kserver3:19092");
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
        p.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, ConsumerConfig.DEFAULT_ALLOW_AUTO_CREATE_TOPICS);
        //反序列化
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDecoder.class.getName());
        //批量消费数量
        p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        //添加 group id
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "20220514");

        KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(p);

        ArrayList<String> topics = new ArrayList<>();
        topics.add("quickstart-events");
        kafkaConsumer.subscribe(topics);


        //start consume
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            //获取消费者分区分配信息
            assignment = kafkaConsumer.assignment();
        }

        //指定从100位置开始消费
        for (TopicPartition topicPartition : assignment) {
            //partition 1 从300 offset开始消费
//            if (topicPartition.partition() == 1) {
            kafkaConsumer.seek(topicPartition, 0);
//            }
        }

        while (true) {
            ConsumerRecords<String, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, User> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value().toString());
            }
        }
    }
}

5. 删除Topic

./bin/kafka-topics --delete --zookeeper 【zookeeper server】  --topic 【topic name】