Kafka 生产者与消费者

生产者(Producer)

发送流程

Loading diagram...

核心配置

配置项说明默认值
bootstrap.serversBroker 地址列表
key.serializerKey 序列化器
value.serializerValue 序列化器
acks确认机制1
retries重试次数2147483647
batch.size批次大小16384
linger.ms等待时间0
buffer.memory缓冲区大小33554432

ACK 机制

acks说明可靠性性能
0不等待确认最低最高
1Leader 确认中等中等
-1/all所有 ISR 确认最高最低

生产者示例

public class ProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // 可靠性配置
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // 幂等性
        
        // 性能配置
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            
            // 1. 异步发送(Fire and Forget)
            producer.send(new ProducerRecord<>("topic", "key", "value"));
            
            // 2. 异步发送(带回调)
            producer.send(new ProducerRecord<>("topic", "key", "value"), 
                (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("partition=%d, offset=%d%n",
                            metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
            
            // 3. 同步发送
            RecordMetadata metadata = producer.send(
                new ProducerRecord<>("topic", "key", "value")).get();
            System.out.println("offset: " + metadata.offset());
        }
    }
}

自定义分区器

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (key == null) {
            // 无 key,轮询
            return ThreadLocalRandom.current().nextInt(numPartitions);
        }
        
        // 有 key,按 key hash
        return Math.abs(key.hashCode()) % numPartitions;
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

// 使用自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

自定义序列化器

public class UserSerializer implements Serializer<User> {
    
    private ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(String topic, User data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing User", e);
        }
    }
}

消费者(Consumer)

消费流程

Loading diagram...

核心配置

配置项说明默认值
bootstrap.serversBroker 地址列表
group.id消费者组 ID
key.deserializerKey 反序列化器
value.deserializerValue 反序列化器
enable.auto.commit自动提交true
auto.commit.interval.ms自动提交间隔5000
auto.offset.reset无位移时的策略latest
max.poll.records单次拉取最大记录数500

auto.offset.reset 策略

说明
earliest从最早的消息开始消费
latest从最新的消息开始消费
none抛出异常

消费者示例

public class ConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // 消费策略
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 手动提交
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("partition=%d, offset=%d, key=%s, value=%s%n",
                        record.partition(), record.offset(), record.key(), record.value());
                    
                    // 处理消息
                    processRecord(record);
                }
                
                // 手动提交位移
                consumer.commitSync();
            }
        }
    }
}

位移提交

自动提交

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

问题:可能重复消费或丢失消息

手动同步提交

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    // 同步提交,阻塞直到完成
    consumer.commitSync();
}

手动异步提交

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    // 异步提交
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed", exception);
        }
    });
}

按分区提交

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
        
        for (ConsumerRecord<String, String> record : partitionRecords) {
            processRecord(record);
        }
        
        // 按分区提交
        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
        consumer.commitSync(Collections.singletonMap(
            partition, new OffsetAndMetadata(lastOffset + 1)));
    }
}

消费者再平衡

当消费者组成员变化时,会触发再平衡(Rebalance)。

再平衡监听器

consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 分区被撤销前调用
        System.out.println("分区被撤销: " + partitions);
        // 提交当前位移
        consumer.commitSync();
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 分区分配后调用
        System.out.println("分区已分配: " + partitions);
    }
});

指定分区消费

// 手动分配分区
TopicPartition partition0 = new TopicPartition("test-topic", 0);
TopicPartition partition1 = new TopicPartition("test-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));

// 从指定位置开始消费
consumer.seek(partition0, 100);  // 从 offset 100 开始
consumer.seekToBeginning(Collections.singletonList(partition1));  // 从头开始
consumer.seekToEnd(Collections.singletonList(partition1));  // 从最新开始

消费者组

分区分配策略

策略说明
RangeAssignor按范围分配,默认策略
RoundRobinAssignor轮询分配
StickyAssignor粘性分配,减少再平衡影响
CooperativeStickyAssignor协作式粘性分配
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    StickyAssignor.class.getName());

消费者组状态

Loading diagram...

小结

  • 生产者通过 acks 配置可靠性级别
  • 生产者支持同步、异步发送
  • 消费者通过消费者组实现并行消费
  • 手动提交位移更可靠
  • 再平衡时需要注意位移提交
  • 可以通过分配策略优化再平衡