~ Tutorial 5 ~

Feed


Storm WordCount at Most Once Model

In this section, we will show the code for Storm WordCount in at-most-once Model. It is the example in Apache Storm.

RandomSentenceSpout

The Spout is as follows:

package storm.starter.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class RandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;


  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
  }

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    String sentence = sentences[_rand.nextInt(sentences.length)];
    _collector.emit(new Values(sentence));
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}

WordCountTopology

The Bolt and WordCountTopology is as follows:

hduser@master:~$ ./bin/hadoop jar ./share/hadoop/tools/lib/package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;

import java.util.HashMap;
import java.util.Map;

/**
 * This topology demonstrates Storm's stream groupings and multilang capabilities.
 */
public class WordCountTopology {
  public static class SplitSentence extends ShellBolt implements IRichBolt {

    public SplitSentence() {
      super("python", "splitsentence.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSentenceSpout(), 5);

    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(true);


    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();
    }
  }
}

Storm WordCount at Least Once Model

In this section, we will show the code for Storm WordCount in at-least-once Model. It only can be run in the Storm version 0.9.x.

Extend BaseRichSpout to MySpout

The Spout is aimed to read the file from the local disk. We implement the ack() and fail() function in the Spout.

package org.apache.storm.starter.spout;

import org.apache.storm.spout.SpoutOutputCollector; 
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.base.BaseRichSpout; 
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values; 
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
import java.io.File;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.FileNotFoundException; import java.io.FileReader;
import java.io.FileNotFoundException; import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;

public class MySpout extends BaseRichSpout {
  SpoutOutputCollector _collector; //Random _rand;
  FileReader fileReader;
  boolean completed = false; 
  TopologyContext context;
  String sentence; 
  BufferedReader reader;

  // msgId and sentence Map<String,String> messages;
  public long start= System.currentTimeMillis();

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector; 
    this.context = context;
    messages = new HashMap<String, String>();
    try{
      this.fileReader = new FileReader("/home/liuyang/dataset.txt");
    } catch (FileNotFoundException e) {
      throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
    }
    reader = new BufferedReader(fileReader); 
  }

  @Override
  public void nextTuple(){
    BufferedWriter timeRecord; 

    if (completed) {
      return; 
    }

    try{
      //emit a line opon call
      if ((sentence = reader.readLine()) != null){
        if (!sentence.trim().isEmpty()){
          System.out.println("readTheFile "+sentence);
          String msgId = sentence.hashCode()+"_"+System.currentTimeMillis(); 
          _collector.emit(new Values(sentence),msgId); 
          messages.put(msgId,sentence);
        } 
      }
      else {
        completed =true; 
        System.out.println("ComptetedTrue");
        long end = System.currentTimeMillis(); 
        long difference = end - start;
        try {
          timeRecord = new BufferedWriter(new FileWriter("/home/liuyang/time.txt", true));
          timeRecord.write(Long.toString(difference)); 
          timeRecord.newLine();
          timeRecord.close();
        } catch (IOException e) {
      // TODO Auto-generated catch block 
          e.printStackTrace();
          }
        } 
    }catch(Exception e){
      throw new RuntimeException("Error reading tuple",e); 
    }finally{

    } 
  }

  //Ack is not implemented since this is a basic example @Override
  public void ack(Object id) {
    String msgId=(String) id;
    messages.remove(msgId); 
    }

  //You need to specifically resend the message @Override
  public void fail(Object id) {
    String msgId = (String) id;
    if (!messages.containsKey(msgId)){
      throw new RuntimeException("Error, transaction id not found ["+msgId+"]");
    }
    String reSentence = messages.get(msgId); 
    _collector.emit(new Values(reSentence),msgId); 
    System.out.println("failInSpout "+reSentence);
  }


  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence")); 
  }

  @Override
  public Map<String, Object> getComponentConfiguration() {
    return null; 
  }
} 

WordCountTopology

We manually call the ack() function in the second Bolt. The Bolt and WordCountTopology is as follows:

package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector; 
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.TopologyBuilder; 
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.starter.spout.MySpout;
import java.util.HashMap; 
import java.util.Map;
import java.io.File;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.FileNotFoundException; 
import java.io.FileReader;
import java.io.FileNotFoundException; 
import java.io.FileWriter;
import java.io.IOException;


public class MyWordCountTopology {
  public static class SplitSentence extends ShellBolt implements IRichBolt {

    public SplitSentence() { s
      uper("python", "splitsentence.py");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word")); 
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null; 
    }
  }

  public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); BufferedWriter output;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0); 
      Integer count = counts.get(word); 
      if (count == null)
          count = 0; 
        count++;
      counts.put(word, count);

      try {
        output = new BufferedWriter(new FileWriter("/home/liuyang/result.txt", true)); 
        output.write(word.toString()+" t"+count.toString()); 
        output.newLine();
        output.close();
                    // }
      } catch (IOException e) {
      // TODO Auto-generated catch block 
        e.printStackTrace();
      }
      collector.emit(new Values(word, count));
      collector.ack(tuple);
        }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count")); 
    }
  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("spout", new MySpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); 
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
    Config conf = new Config(); 
    conf.setDebug(true);
    if (args != null && args.length > 0) { 
      conf.setNumWorkers(3);
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); 
    }
    else { conf.setMaxTaskParallelism(3);
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown(); 
    }
  } 
}