2025年4月9日 星期三 乙巳(蛇)年 正月初十 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Java

Java整合Kafka实现生产及消费

时间:09-07来源:作者:点击数:26

前提条件

  • 搭建Kafka环境,参考Kafka集群环境搭建及使用
  • Java环境:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA项目环境
  • 创建maven项目。
  • pom.xml文件中引入kafka依赖。

代码语言:xml

  • <dependencies>
  • <dependency>
  • <groupId>org.apache.kafka</groupId>
  • <artifactId>kafka\_2.11</artifactId>
  • <version>2.1.0</version>
  • </dependency>
  • </dependencies>

创建Topic创建topic命名为testtopic并指定2个分区。

代码语言:bash

  • ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

生产消息

代码语言:java

  • public class Producer {
  • public static void main(String[] args) throws ExecutionException, InterruptedException {
  • // 生产参数配置
  • Properties properties = new Properties();
  • properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  • properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  • properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  • KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
  • int i=0;
  • while (true) {
  • //生产消息
  • Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", "key"+i, "value"+i));
  • //获取生产的数据信息
  • RecordMetadata recordMetadata = future.get();
  • System.out.println("time:"+recordMetadata.timestamp()+" key:"+i+" value:"+i+" partition:"+recordMetadata.partition()+" offset:"+recordMetadata.offset());
  • Thread.sleep(1000);
  • i+=1;
  • }
  • }
  • }

生产者参数配置

代码语言:java

  • // ACK机制,默认为1 (0,1,-1)
  • properties.setProperty(ProducerConfig.ACKS_CONFIG, "");
  • // Socket发送消息缓冲区大小,默认为128K,设置为-1代表操作系统的默认值
  • properties.setProperty(ProducerConfig.SEND_BUFFER_CONFIG, "");
  • // Socket接收消息缓冲区大小,默认为32K,设置为-1代表操作系统的默认值
  • properties.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG, "");
  • // 生产者客户端发送消息的最大值,默认1M
  • properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "");
  • // 发送消息异常时重试次数,默认为0
  • properties.setProperty(ProducerConfig.RETRIES_CONFIG, "");
  • // 重试间隔时间,默认100
  • properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "");
  • // 生产消息自定义分区策略类
  • properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "");
  • // 开启幂等 ,默认true
  • properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "");

更多配置信息查看ProducerConfig类

生产自定义分区策略

  1. 创建分区策略类,实现org.apache.kafka.clients.producer.Partitioner接口,编写具体策略。

代码语言:java

  • public class PartitionPolicy implements Partitioner {
  • private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
  • @Override
  • public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  • List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  • int numPartitions = partitions.size();
  • if (keyBytes == null) {
  • int nextValue = this.nextValue(topic);
  • List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  • if (availablePartitions.size() > 0) {
  • int part = Utils.toPositive(nextValue) % availablePartitions.size();
  • return ((PartitionInfo)availablePartitions.get(part)).partition();
  • } else {
  • return Utils.toPositive(nextValue) % numPartitions;
  • }
  • } else {
  • return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  • }
  • }
  • private int nextValue(String topic) {
  • AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
  • if (null == counter) {
  • counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
  • AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
  • if (currentCounter != null) {
  • counter = currentCounter;
  • }
  • }
  • return counter.getAndIncrement();
  • }
  • @Override
  • public void close() {
  • }
  • @Override
  • public void configure(Map<String, ?> map) {
  • }
  • }
  1. 参数配置。

代码语言:java

  • properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionPolicy.class.getName());

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号

public ProducerRecord(String topic, Integer partition, K key, V value)

代码语言:java

  • Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", 1, "key"+i, "value"+i));

消费消息

代码语言:java

  • public class Consumer {
  • public static void main(String[] args) throws InterruptedException {
  • Properties properties = new Properties();
  • properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  • //约定的编解码
  • properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  • properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  • properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
  • //默认为自动提交
  • properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  • //当设置为自动提交时,默认5秒自动提交
  • //properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
  • //
  • //properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "5000");
  • KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
  • //订阅topic
  • kafkaConsumer.subscribe(Arrays.asList("testtopic"));
  • Set<TopicPartition> assignment = kafkaConsumer.assignment();
  • ConsumerRecords<String, String> records = null;
  • while (assignment.size() == 0) {
  • records = kafkaConsumer.poll(Duration.ofMillis(100));
  • assignment = kafkaConsumer.assignment();
  • }
  • /*//1.根据时间戳获取 offset,设置 offset
  • Map<TopicPartition, Long> offsetsForTimes=new HashMap<>();
  • for (TopicPartition topicPartition : assignment) {
  • offsetsForTimes.put(topicPartition,1669972273941L);
  • }
  • Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(offsetsForTimes);
  • offsetAndTimestampMap.forEach((tp,offsettime)->{
  • kafkaConsumer.seek(tp,offsettime.offset());
  • });*/
  • /*//2.指定从头开始消费
  • kafkaConsumer.seekToBeginning(assignment);*/
  • /*//3.指定从某offset开始消费
  • kafkaConsumer.seek(tp,0);*/
  • while (true) {
  • if (records.isEmpty()) {
  • Thread.sleep(3000);
  • } else {
  • System.out.printf("records count:" + records.count());
  • Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
  • while (iterator.hasNext()) {
  • ConsumerRecord<String, String> record = iterator.next();
  • System.out.println(" time:" + record.timestamp() + " key:" + record.key() + " value:" + record.value() + " partition:" + record.partition() + " offset:" + record.offset());
  • }
  • kafkaConsumer.commitSync();
  • }
  • records = kafkaConsumer.poll(Duration.ofMillis(0));
  • }
  • }
  • }

消费参数配置

代码语言:java

  • // 消费者必须指定一个消费组
  • properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
  • // 消费者每次最多POLL的数量
  • properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
  • // 消费者POLL的时间间隔
  • properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_DOC, "");
  • // 设置是否自动提交,默认为true
  • properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");
  • // 如果是自动提交,默认5s后提交,会发生丢失消息和重复消费情况
  • properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");
  • // 当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。

更多配置信息查看ConsumerConfig类

offset设置方式

如代码所示,设置offset的几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

代码仓库

https://gitee.com/codeWBG/learn_kafka

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐