在上节《kafka在windows上的安装、运行》中我们已在Windows上kafka环境搭建起来了,下面我们继续看我们用java如何调kafka发送和接收消息。
一.项目工程结构
二.详细代码
KafkaUtils.java
package com.bijian.study; import java.util.Arrays; import java.util.Date; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; public class KafkaUtils { private static Producer<String, String> producer; private static Consumer<String, String> consumer; private KafkaUtils() { } /** * 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成 */ static { Properties props = new Properties(); props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } /** * 消费者 */ static { Properties props = new Properties(); props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔 props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(Constants.TOPIC_NAME)); } /** * 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String */ public static void sendMsgToKafka(IpranAlarm msg) { producer.send(new ProducerRecord<String, String>(Constants.TOPIC_NAME, String.valueOf(new Date().getTime()), JSON.toJSONString(msg))); System.out.println("向kafka发送的消息是:" + msg.toString()); } /** * 从kafka上接收对象消息,将json字符串转化为对象,便于获取消息的时候可以使用get方法获取。 */ public static void getMsgFromKafka() { while (true) { ConsumerRecords<String, String> records = KafkaUtils.getKafkaConsumer().poll(100); if (records.count() > 0) { for (ConsumerRecord<String, String> record : records) { JSONObject jsonAlarmMsg = JSON.parseObject(record.value()); IpranAlarm alarmMsg = JSONObject.toJavaObject(jsonAlarmMsg, IpranAlarm.class); System.out.println("从kafka接收到的消息是:" + alarmMsg.toString()); } } } } public static Consumer<String, String> getKafkaConsumer() { return consumer; } public static void closeKafkaProducer() { producer.close(); } public static void closeKafkaConsumer() { consumer.close(); } }
Constants.java
package com.bijian.study; public class Constants { public static String BOOTSTRAP_SERVERS = "localhost:9092"; public static String TOPIC_NAME = "haha"; }
IpranAlarm.java
package com.bijian.study; public class IpranAlarm { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "IpranAlarm [name=" + name + ", age=" + age + "]"; } }
测试类Main.java
package com.bijian.study; public class Main { public static void main(String[] args) { IpranAlarm ipranAlarm = new IpranAlarm(); ipranAlarm.setAge(18); ipranAlarm.setName("bijian"); KafkaUtils.sendMsgToKafka(ipranAlarm); ipranAlarm.setAge(15); ipranAlarm.setName("zhangshan"); KafkaUtils.sendMsgToKafka(ipranAlarm); KafkaUtils.getMsgFromKafka(); } }
运行结果如下:
向kafka发送的消息是:IpranAlarm [name=bijian, age=18] 向kafka发送的消息是:IpranAlarm [name=zhangshan, age=15] 从kafka接收到的消息是:IpranAlarm [name=bijian, age=18] 从kafka接收到的消息是:IpranAlarm [name=zhangshan, age=15]
说明:项目工程依赖的jar包在kafka的libs目录下可以找到(这里的fastjson.jar包除外),在这里是E:\study\bigData\kafka_2.12-2.0.0\libs目录。
参考文章:https://blog.csdn.net/u013144287/article/details/76277295
转载请注明:SuperIT » Java kafka消息的发送与接收