Storm WordCount at Most Once Model
In this section, we will show the code for Storm WordCount in at-most-once Model. It is the example in Apache Storm.
RandomSentenceSpout
The Spout is as follows:
package storm.starter.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence));
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordCountTopology
The Bolt and WordCountTopology is as follows:
hduser@master:~$ ./bin/hadoop jar ./share/hadoop/tools/lib/package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;
import java.util.HashMap;
import java.util.Map;
/**
* This topology demonstrates Storm's stream groupings and multilang capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
Storm WordCount at Least Once Model
In this section, we will show the code for Storm WordCount in at-least-once Model. It only can be run in the Storm version 0.9.x.
Extend BaseRichSpout to MySpout
The Spout is aimed to read the file from the local disk. We implement the ack() and fail() function in the Spout.
package org.apache.storm.starter.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
import java.io.File;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.FileNotFoundException; import java.io.FileReader;
import java.io.FileNotFoundException; import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
public class MySpout extends BaseRichSpout {
SpoutOutputCollector _collector; //Random _rand;
FileReader fileReader;
boolean completed = false;
TopologyContext context;
String sentence;
BufferedReader reader;
// msgId and sentence Map<String,String> messages;
public long start= System.currentTimeMillis();
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.context = context;
messages = new HashMap<String, String>();
try{
this.fileReader = new FileReader("/home/liuyang/dataset.txt");
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
reader = new BufferedReader(fileReader);
}
@Override
public void nextTuple(){
BufferedWriter timeRecord;
if (completed) {
return;
}
try{
//emit a line opon call
if ((sentence = reader.readLine()) != null){
if (!sentence.trim().isEmpty()){
System.out.println("readTheFile "+sentence);
String msgId = sentence.hashCode()+"_"+System.currentTimeMillis();
_collector.emit(new Values(sentence),msgId);
messages.put(msgId,sentence);
}
}
else {
completed =true;
System.out.println("ComptetedTrue");
long end = System.currentTimeMillis();
long difference = end - start;
try {
timeRecord = new BufferedWriter(new FileWriter("/home/liuyang/time.txt", true));
timeRecord.write(Long.toString(difference));
timeRecord.newLine();
timeRecord.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
}
}
//Ack is not implemented since this is a basic example @Override
public void ack(Object id) {
String msgId=(String) id;
messages.remove(msgId);
}
//You need to specifically resend the message @Override
public void fail(Object id) {
String msgId = (String) id;
if (!messages.containsKey(msgId)){
throw new RuntimeException("Error, transaction id not found ["+msgId+"]");
}
String reSentence = messages.get(msgId);
_collector.emit(new Values(reSentence),msgId);
System.out.println("failInSpout "+reSentence);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
WordCountTopology
We manually call the ack() function in the second Bolt. The Bolt and WordCountTopology is as follows:
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.starter.spout.MySpout;
import java.util.HashMap;
import java.util.Map;
import java.io.File;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
public class MyWordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() { s
uper("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); BufferedWriter output;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
try {
output = new BufferedWriter(new FileWriter("/home/liuyang/result.txt", true));
output.write(word.toString()+" t"+count.toString());
output.newLine();
output.close();
// }
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
collector.emit(new Values(word, count));
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MySpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else { conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}