import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.tools.OffsetManager import java.{util = > ju}
object Test { private def buildDefaultConfiguration(brokers : String, groupId : String) : ju.HashMap[String, Object] = { val props = new ju.HashMap[String, Object] props.put(ConsumerConfig.BOOTSTRAP _ SERVERS _ CONFIG, brokers) props.put(ConsumerConfig.CLIENT _ ID _ CONFIG, "monitor-test-2" ) props.put(ConsumerConfig.GROUP _ ID _ CONFIG, groupId) props.put(ConsumerConfig.AUTO _ COMMIT _ INTERVAL _ MS _ CONFIG, "1000" ) props.put(ConsumerConfig.AUTO _ OFFSET _ RESET _ CONFIG, "latest" ) props.put(ConsumerConfig.SESSION _ TIMEOUT _ MS _ CONFIG, "30000" ) props.put(ConsumerConfig.KEY _ DESERIALIZER _ CLASS _ CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer" ) props.put(ConsumerConfig.VALUE _ DESERIALIZER _ CLASS _ CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer" ) props } private def addSecure(userName : String, password : String,props : ju.HashMap[String, Object]) : ju.HashMap[String, Object] = { props.put( "security.protocol" , "SASL_PLAINTEXT" ) props.put( "sasl.mechanism" , "PLAIN" ) props.put( "sasl.jaas.config" , "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + userName + "\" password=\"" + password + "\";" ) props } def main(args : Array[String]) : Unit = { val brokers = "kafka 集群列表" val topic = "设置的Topic" val username = "xxx" val password = "xxxxx" val groupId = "需要设置的group" val kafkaParam = addSecure(username, password, buildDefaultConfiguration(brokers, groupId)) val manager = OffsetManager(topic, kafkaParam) // new KafkaConsumer(new Properties()) manager.resetOffsetToLatest() } } |
转载请注明:SuperIT » kafka 设置消费group为最新offset