微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

Java kafka消息的发送与接收

kafka aide_941 12℃

  在上节《kafka在windows上的安装、运行》中我们已在Windows上kafka环境搭建起来了,下面我们继续看我们用java如何调kafka发送和接收消息。

一.项目工程结构

 

二.详细代码

KafkaUtils.java

复制代码
package com.bijian.study;

import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class KafkaUtils {

    private static Producer<String, String> producer;
    private static Consumer<String, String> consumer;

    private KafkaUtils() {

    }

    /**
     * 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成
     */
    static {
        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    /**
     * 消费者
     */
    static {
        Properties props = new Properties();
        props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(Constants.TOPIC_NAME));
    }

    /**
     * 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String
     */
    public static void sendMsgToKafka(IpranAlarm msg) {
        producer.send(new ProducerRecord<String, String>(Constants.TOPIC_NAME, String.valueOf(new Date().getTime()), JSON.toJSONString(msg)));
        System.out.println("向kafka发送的消息是:" + msg.toString());
    }

    /**
     * 从kafka上接收对象消息,将json字符串转化为对象,便于获取消息的时候可以使用get方法获取。
     */
    public static void getMsgFromKafka() {
        while (true) {
            ConsumerRecords<String, String> records = KafkaUtils.getKafkaConsumer().poll(100);
            if (records.count() > 0) {
                for (ConsumerRecord<String, String> record : records) {
                    JSONObject jsonAlarmMsg = JSON.parseObject(record.value());
                    IpranAlarm alarmMsg = JSONObject.toJavaObject(jsonAlarmMsg, IpranAlarm.class);
                    System.out.println("从kafka接收到的消息是:" + alarmMsg.toString());
                }
            }
        }
    }

    public static Consumer<String, String> getKafkaConsumer() {
        return consumer;
    }

    public static void closeKafkaProducer() {
        producer.close();
    }

    public static void closeKafkaConsumer() {
        consumer.close();
    }
}
复制代码

Constants.java

复制代码
package com.bijian.study;

public class Constants {

    public static String BOOTSTRAP_SERVERS = "localhost:9092";
    public static String TOPIC_NAME = "haha";
}
复制代码

IpranAlarm.java

复制代码
package com.bijian.study;

public class IpranAlarm {

    private String name;
    private int age;

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "IpranAlarm [name=" + name + ", age=" + age + "]";
    }
}
复制代码

测试类Main.java

package com.bijian.study;

public class Main {

    public static void main(String[] args) {

        IpranAlarm ipranAlarm = new IpranAlarm();
        ipranAlarm.setAge(18);
        ipranAlarm.setName("bijian");
        KafkaUtils.sendMsgToKafka(ipranAlarm);
        ipranAlarm.setAge(15);
        ipranAlarm.setName("zhangshan");
        KafkaUtils.sendMsgToKafka(ipranAlarm);

        KafkaUtils.getMsgFromKafka();
    }
}

运行结果如下:

向kafka发送的消息是:IpranAlarm [name=bijian, age=18]
向kafka发送的消息是:IpranAlarm [name=zhangshan, age=15]
从kafka接收到的消息是:IpranAlarm [name=bijian, age=18]
从kafka接收到的消息是:IpranAlarm [name=zhangshan, age=15]

 

说明:项目工程依赖的jar包在kafka的libs目录下可以找到(这里的fastjson.jar包除外),在这里是E:\study\bigData\kafka_2.12-2.0.0\libs目录。

 

参考文章:https://blog.csdn.net/u013144287/article/details/76277295

转载请注明:SuperIT » Java kafka消息的发送与接收

喜欢 (2)or分享 (0)