Apache Storm is one of the dinosaurs within the large set of big data technologies – especially in the stream processing area. This article will introduce you to Apache Storm by demonstrating how to create a small project. Many tutorials about streaming technologies use the Twitter API to demonstrate their framework. In this tutorial we will use the public available wiki data stream that continuously sends data whenever an article in one of Wikimedia’s wikis has been changed.

Apache Storm Basics

The most important concepts in Storm are the spouts, the bolts and the topology:

  • Spouts: Are responsible to get data from outside into the Storm Cluster
  • Bolts: Are responsible to do something with the data and potentially send them to the next bolt
  • Topology: Defines how spouts and bolts interact and determines the parallelism

Another important concept is the groupings. Groupings are defined in the topology and tell Storm which data tuple must be distributed to which of the bolt’s tasks. The most important grouping types are:

  • Shuffle Grouping: Tuples are distributed randomly to the bolts‘ task
  • Fields Grouping: Tuples having the same value in a field defined in the topology are sent to the same task
  • All Grouping: All data tuples go to all tasks. Typically used to send configuration data
  • Global Grouping: All tuples go to exactly one task

There are some other groupings that can be found here.

Our Task

That’s enough technical stuff! Let’s get into the problem! Which data is sent to us?

Everytime there is a change in a wiki, the stream sends us JSON data in real time in the following structure:

wiki

So we have a comment, we have the information in which wiki the change was made, and we know if the change was made by a bot. Furthermore we have some other information we don’t care about here.

Let’s define our goals!

We want to know which wikis are the most active ones! And we don’t care about changes made by bots. To make the objectives as clear as a test in school:

Write a Storm application using Java which regulary writes a new .txt file which contains the name of the wikis and the number of changes that happened there. Ignore all bot changes! The text file shall have this format:

https://ms.wikipedia.org 1

https://fr.wikipedia.org 49

https://nl.wikipedia.org 8

https://sv.wikipedia.org 3

https://bs.wikinews.org 1

https://te.wikipedia.org 1

https://zh.wikipedia.org 29

In each line we want to have the name of the wiki and the number of changes.

The Concept

We learned about the topology, spouts and bolts before.

At first we need a spout which receives data from the wiki stream and brings the data in our topology. Let’s call it WikiSpout.

The WikiSpout sends the data tuples to a bolt called RemoveBotsBolt which removes all tuples from the data stream which contain the attribute bot:true.

In the last step the data must be sent to the ServerCounterBolt which finally tracks the number of changes in the wikis. So our topology looks like this:

topology

Let’s start!

The POM

First of all we create a project using this pom.xml

[code language=“xml“] 4.0.0
de.opitz.bigdata
wikireader
0.0.1-SNAPSHOT

org.apache.maven.plugins
maven-compiler-plugin
3.1

1.71.7
1.7



org.apache.storm
storm-core
1.0.2
provided


org.apache.commons
commons-io
1.3.2

[/code]

The Main

Let’s start with our main class:

[code language=“java“]public class Main {
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(2); // We want to use 2 Worker processes
LocalCluster cluster = new LocalCluster(); // We define a LocalCluster which is perfect for developing puposes
TopologyBuilder topologyBuilder = new Topology().build();
cluster.submitTopology(„WikiTopology“, conf, topologyBuilder.createTopology());
}

}[/code]

This class seems to be more or less self-explanatory: We get a configuration, define the number of worker processes we want to use, tell Storm that we are just using a LocalCluster for testing and finally submit our topology.

The Topology

It’s becoming interesting: We define our topology! So let’s go to the class Topology:

[code language=“java“]public class Topology {
public TopologyBuilder build() {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout(„WikiSpout“, new WikiSpout(), 1);
topologyBuilder.setBolt(„RemoveBotsBolt“, new RemoveBotsBolt(), 3).shuffleGrouping(„WikiSpout“);
topologyBuilder.setBolt(„ServerCounterBolt“, new ServerCounterBolt(), 1).globalGrouping(„RemoveBotsBolt“);
return topologyBuilder;
}
}[/code]
At first we tell Storm that we want to have a spout called „WikiSpout“ which implementation is in WikiSpout.java and that we only want one task that executes this spout. That should be enough since the spout just receives data and sends them to the bolt.

In the next line we define our first bolt, the RemoveBotsBolt. We define a parallelism of 3, because we assume that the complexity of this process needs 3 tasks. (Of course that’s not realistic and just for demonstration purposes)

Finally we define a global grouping from the RemoveBotsBolt to ServerCounterBolt. Why a global grouping? If we would use a shuffle grouping and define a parallelism > 1, the data tuples sometimes go to task1 and sometimes to task2 and so on. We cannot seriously count our data tuples since both tasks have their „own“ counter and we have also no database to synchronize the counters in this example.

Additional Question:

What would you do, if there is so much data that one task of ServerCounterBolt cannot deal with this? —-> We have to increase the parallelism, and we have to think about using field-grouping in order to achieve correct results. Try it!

Some Helper Classes

Before we continue we create two small classes which will help us later:

[code language=“java“]public class Util {

public static final String SERVER = „server“;
public static final String USER = „user“;
public static final String TITLE = „title“;
public static final String BOT = „bot“;

}[/code]
and

[code language=“java“]public class Change {
private String serverName;
private String title;
private String user;
private boolean bot;

+ Getters & Setters!

}[/code]

Our Wiki-Spout

At first we need a library that is able to read the wiki stream. We decided to use this socket.io-java-client from Github and compiled it and imported it to the build path.

Now let’s create a class called WikiSpout extending BaseRichSpout.

Before writing any methods we start with defining following variables:

[code language=“java“]
private SpoutOutputCollector collector;
private List changes;
final Lock lock = new ReentrantLock();
[/code]

First of all we need to overwrite the method nexTuple(). This method is invoked continuously by Storm (imagine a while loop invoking this method over and over). The goal of this method is to send all data we have in our list „changes“ to the next bolt:

[code language=“java“]lock.lock();
for (Change change : changes) {
collector.emit(new Values(change.getServerName(), change.getUser(),           change.getTitle(), change.isBot()));
}
changes = new ArrayList();
lock.unlock();[/code]
We needed a lock here, since it is possible that while invoking nextTuple() the List „changes“ is used by another method.

This is the method open(Map arg0, TopologyContext context, SpoutOutputCollector collector):

It is invoked only when the task gets started. At first we create some variables:

[code language=“java“]this.collector = collector;
SocketIO socket = null;
changes = new ArrayList();
[/code]

And then we try to invoke the wiki stream:

[code language=“java“]
try {
socket = new SocketIO(„http://stream.wikimedia.org/rc“);
} catch (MalformedURLException e1) {
throw new RuntimeException(e1);
}[/code]
After we start listening to this socket:

[code language=“java“]socket.connect(new IOCallback() {
public void onMessage(JSONObject json, IOAcknowledge ack) {
}

public void onMessage(String data, IOAcknowledge ack) {
}

public void onError(SocketIOException socketIOException) {
System.out.println(„an Error occured“);
socketIOException.printStackTrace();
}

public void onDisconnect() {
System.out.println(„Connection terminated.“);
}

public void onConnect() {
System.out.println(„Connection established“);
}

public void on(String event, IOAcknowledge ack, Object… args) {
System.out.println(„Server triggered event ‚“ + event + „‚“);
if (args[0] instanceof JSONObject) {
JSONObject changeJson = (JSONObject) args[0];
try {
Change change = new Change();
change.setUser(changeJson.get(„user“).toString());
change.setTitle(changeJson.get(„title“).toString());
change.setServerName(changeJson.get(„server_url“).toString());
change.setBot(Boolean.valueOf(changeJson.get(„bot“).toString()));
lock.lock();
changes.add(change);
lock.unlock();
} catch (JSONException e) {
e.printStackTrace();
}
}
}
});[/code]
The most interesting thing here is the on(…) method. It is invoked whenever a change in a wiki was done. Then we get a JSON Object according to the example described in the text above. Now we extract the data we want from the JSON, create a „Change“ object and add it to the list.

Finally we tell the socket to subscribe to each tuple that is sent by the stream:

[code language=“java“]socket.emit(„subscribe“, „*“);[/code]
The last thing we have to do is to tell Storm what the names of the fields we send to the next bolts are:

[code language=“java“]public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Util.SERVER, Util.USER, Util.TITLE, Util.BOT));
}[/code]
So what did we do? The method on() is continuously invoked when a change in wiki happened and adds a „Change“ object to a list.

Method nextTuple() is continuously invoked by Storm. If the list mentioned above contains elements they get emitted, which finally means that they will be sent to the next bolt(s).

Remove Bots Bolt

The task of the RemoveBotsBolt is to eliminate all tuples that have a flag „true“ for the attribute bots.

So we create a new class, RemoveBotsBolt.java extending from BaseRichBolt.

Before writing a method we define the collector:

[code language=“java“]private OutputCollector collector;
[/code]

In the prepare method, we set it:

[code language=“java“]
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}[/code]
The execute method should look like the following:

[code language=“java“]public void execute(Tuple input) {
Boolean bot = input.getBooleanByField(Util.BOT);
String server = input.getStringByField(Util.SERVER);
if (!bot) {
collector.emit(new Values(server));
}
}[/code]
The method execute(…) is invoked by Storm when a new data tuple comes in. We extract the data from this tuple, and if we get a bot, we do nothing; otherwise we emit the name of the server.

This time we know the next bolt is just interested in the name of the server (for example ‚de.wikipedia.org‘). So we tell Storm the name of our single output field:

[code language=“java“]public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Util.SERVER));
}[/code]

Finally: The Server Counter Bolt

The last class we are writing is called ServerCounterBolt extending from BaseRichBolt.

This time the methodes prepare and declareOutputFields can be left empty.

But we need two variables at the beginning of the class:

[code language=“java“]
private HashMap counts = new HashMap();
private int totalCount = 0;[/code]

  • counts: contains the name of the server and the number of occurences
  • totalCount: contains the total number of tuples that came in

So let’s write a helper method that writes a file having the current date and time in its name:

[code language=“java“]
private void writeFile() {
DateFormat df = new SimpleDateFormat(„MMdd-HH-mm-ss“);
String dateTime = df.format(new Date());
File file = new File(dateTime + „.txt“);
List lines = new ArrayList();
for (String key : counts.keySet()) {
lines.add(key + “ “ + counts.getOrDefault(key, 0));
}

try {
FileUtils.writeLines(file, lines);
} catch (IOException e) {
throw new RuntimeException(e);
}
}[/code]
This method iterates through our „counts“ map and writes its contents to a file.

Finally we have to overwrite the execute-Method:

[code language=“java“]
public void execute(Tuple input) {
totalCount++;
String server = input.getStringByField(Util.SERVER);
Integer count = counts.getOrDefault(server, 0);
count++;
counts.put(server, count);
if (totalCount % 200 == 0) {
writeFile();
}
}[/code]
That’s it!

Now we can start the main class and need to wait a few seconds to get the first file with results.

This article just describes the absolute basics of Storm. Many things, for example acknowledgments are not dealt with here. So have fun to dive deeper into Storm’s technology!

I hope you got a first idea how Storm works.

Alle Beiträge von Lukas Berle

Schreibe einen Kommentar