微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

rsyslog 接入权限认证的Kafka集群

kafka aide_941 7℃
        

说明:

kafka权限接入包含两种方式,了解具体接入可参考

本案例使用kafka的第二种使用方式,案例如下:

Icon

SparkConf sparkConf = new SparkConf()
    .setAppName("JavaSparkStreamingSQLKafkaExample");

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
    Durations.seconds(30));

//传入参数
String username = "username";
String password = "password";
String brokers = args[0];
String topic = args[1];
String groupId = args[2];
boolean enableSecurity=true;

// 设置要读取的 Topic 列表
Set<String> topicsSet = new HashSet<>();
topicsSet.add(topic);
//kafka 参数设置
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", groupId);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("fetch.min.bytes", "4096");
kafkaParams.put("enable.auto.commit", "false");

// 设置 kafka 安全验证部分
if (enableSecurity) {
  kafkaParams.put("sasl.mechanism", "PLAIN");
  kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
  kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
}

JavaInputDStream<ConsumerRecord<Object, Object>> directStream = KafkaUtils
    .createDirectStream(ssc, LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet,kafkaParams));
//业务操作
JavaDStream<String> lines = directStream.map(new Function<ConsumerRecord<Object, Object>,String>() {
 @Override
 public String call(ConsumerRecord<Object, Object> record) {
 // 获取 Kafka 数据内容
 return record.value().toString();
 }
});

//开始计算

ssc.start();
ssc.awaitTermination();

注意:

1、推荐依赖的jar包

kafka依赖的jar包 版本请使用  0.10.2.1 低版本不支持权限认证

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.1</version>
</dependency>

 

2、申请kafka权限时请说明使用spark消费。

原因:spark 在消费kafka 数据时使用的 Group id 与程序中设置的Group ID不一致,会加前缀:spark-executor-

举例:申请的Group id为 consumer-group, 但是spark excutor中会使用 spark-executor-consumer-group

因此需要同时申请:consumer-group 和  spark-executor-consumer-group

转载请注明:SuperIT » rsyslog 接入权限认证的Kafka集群

转载请注明:SuperIT » rsyslog 接入权限认证的Kafka集群

喜欢 (0)or分享 (0)