storm从kafka消息队列读取数据进行读写操作
版权声明:“假装、是个有灵魂的程序员” —— Go Big Or Go Home https://blog.csdn.net/u011663149/article/details/85625136
业务场景:
storm+kafka 作为经典组合,storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算。
废话不多说,直接low一个小demo:
Topology:
-
public class KafkaTopology {
-
-
private String topic ;
-
private String zkRoot;
-
private String spoutId;
-
-
public KafkaTopology() {}
-
-
public KafkaTopology(String topic,String zkRoot, String spoutId){
-
this.topic = topic;
-
this.zkRoot = zkRoot;
-
this.spoutId = spoutId;
-
}
-
-
public void run(){
-
//指定zk的地址
-
BrokerHosts brokerHosts = new ZkHosts(“storm1:2181,storm2:2181,storm3:2181”);
-
//配置spout
-
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
-
spoutConfig.forceFromStart = true; //从什么位置读取
-
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
-
-
TopologyBuilder builder = new TopologyBuilder();
-
////设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,
-
//此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
-
builder.setSpout(spoutId, new KafkaSpout(spoutConfig));
-
builder.setBolt(“wordSplit” , new WordSplit()).shuffleGrouping(spoutId);
-
builder.setBolt(“writer”, new WriterBolt(),4).fieldsGrouping(“wordSplit”, new Fields(“word”));
-
-
Config conf = new Config();
-
conf.setNumWorkers(4);
-
conf.setNumAckers(0);
-
conf.setDebug(false);
-
-
//LocalCluster用来将topology提交到本地模拟器运行,方便开发调试
-
LocalCluster localCluster = new LocalCluster();
-
localCluster.submitTopology(“WordCount”, conf, builder.createTopology());
-
-
-
//提交topology到storm集群中运行
-
// StormSubmitter.submitTopology(“wordCount”, conf, builder.createTopology());
-
}
-
-
public static void main(String[] args) {
-
new KafkaTopology(“test”, “/kafka-storm”, “KafkaSpout”).run();
-
}
-
}
WordSplit:
-
public class WordSplit extends BaseBasicBolt {
-
-
-
private static final long serialVersionUID = 1L;
-
-
-
public void execute(Tuple tuple, BasicOutputCollector collector) {
-
String line = tuple.getString(0);
-
String[] words = line.split(” “);
-
for (String word : words) {
-
//判断某字符串是否不为空且长度不为0且不由空白符(whitespace)构成
-
if(StringUtils.isNotBlank(word.trim())){
-
word = word.toUpperCase();
-
collector.emit(new Values(word));
-
}
-
}
-
}
-
-
-
public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
declarer.declare(new Fields(“word”));
-
}
-
-
}
WirterBolt:
-
// 将数据写入文件
-
public class WriterBolt extends BaseBasicBolt{
-
-
private static final long serialVersionUID = 1L;
-
FileWriter writer = null;
-
-
-
public void prepare(Map conf, TopologyContext context) {
-
try {
-
writer = new FileWriter(“e:/”+“wordcount-“ +UUID.randomUUID().toString());
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
-
public void execute(Tuple tuple, BasicOutputCollector collector) {
-
String line = tuple.getString(0);
-
try {
-
writer.write(line);
-
writer.write(“\n”);
-
writer.flush();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
-
public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-
}
-
-
}
本demo主要是standlocal模式,生产环境一些参数的优化不作解释·~~~
转载请注明:SuperIT » storm从kafka消息队列读取数据进行读写操作