`
wbj0110
  • 浏览: 1546128 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

storm教程

阅读更多

基础


Storm是一个分布式,可靠的,容错好的,用于处理数据流的系统。整个处理工作有不同的可靠组件来完成,每个组件负责一个简单的具体任务。Storm集群的输入数据流由一个成为spout的组件处理。spout再将数据传给bolt组件,其中会对数据做某种变换。bolt或者持久化数据到某个存储介质中,或者将数据发送到其他的bolt。你可以将Storm集群想像成一个由bolt组成的链条,每个bolt都对spout产生的数据进行某种变换。

这里给出一个例子用于说明这个概念。昨晚,当播音员开始谈论政治家及他们对不同事情的看法时,我正在看新闻。他们重复着不同的名字,我就知道是否每个名字都被提到了相同的次数,或者在提到的名字中是否有偏见存在。

将播音员说话时的字幕想像成一个输入数据流。你就拥有一个从文件(socket, HTTP, 或者其他方式)读取输入的’spout’。每当文本行到达,spout将它们丢给一个将文本行切割成单词的bolt。单词流接着传给另一bolt,这里会将每一个单词与预先设置好的政治家的名字进行比较。当名字匹配时,第二个bolt就会将数据库的那个名字相应的计数器的值加一。无论何时你想看这些结果,只需要查询数据库,该数据库是随着数据的到来而实时更新的。所有的组件(spoutbolt)和它们之间的联系方式成为Topology(见图 1-1)。

fig_1_1

现在考虑定义每个boltspout在整个集群的并行度,这样一来就可以扩展topology。很神奇,对不对?虽然这是一个简单的例子,但是对于Storm的牛叉可见一斑。

Storm主要用于哪些场景?

  • 流处理正如在前面的例子中提到的,有别于其他的处理系统,Storm不需要中间队列
  • 连续计算连续发送数据到客户端,这样就能实时更新和展示数据,例如网站统计
  • DRP调用方便并行化CPU密集计算

Storm的组件

Storm集群中,节点都组织到一个不断运行主节点。

Storm集群中有两种节点:master节点和worker节点。Master运行一个成为Nimbus的进程,用于在集群中发布代码,分配任务到到每个工作节点,监视任务的失败。工作节点运行一个称为Supervisor的进程,执行拓扑的一部分。Storm中的拓扑运行在不同机器上的多个工作节点上。

由于Storm将集群状态保存在ZooKeeper或者本地磁盘上,守护进程是无状态的,可以挂掉或者重启,而不会影响系统的正确性(见图 1-2

fig_1_2

底层,Storm使用了zeromq,一个高级的,可嵌入的网络库,它提供了一些使得Storm成为可能的特性。下面是一些zeromq的特性

  • 作为并行框架的socket库
  • 在集群产品和超级计算中比TCP快
  • 进程间携带消息,IPC,TCP和多播
  • 用于可扩展的多核消息传输应用的异步I/O
  • 以fanout,pubsub,pipeline,reque-replay的方式完成N对N的链接

Storm的属性

除了上面所有的概念和考虑,有一些很nice的属性使得Storm变得很特殊。

易于编程

如果你曾经做个实时处理相关的,你就知道它是多么的操蛋。使用Storm,你会发现复杂性指数级下降

支持多种编程语言

虽然可以很容易的使用基于JVM的语言进行开发,当时Storm支持任何语言,只要实现一个小的中间库

容错性

Storm集群会知道工作进程挂掉,有必要的时候会重新分配任务。

扩展性

对于扩展所要做的就是增加机器到集群中。当新增的机器可用时,Storm就会分配任务给它们。

可靠性

所有的消息都保证被处理至少一遍。如果发生错误,消息有可能处理多遍,但是可以保证不会丢失任何消息。

速度是Storm设计中的考虑的一个关键因素。

事务

提供了精确的一次消息语义用于计算。

开始


本章,我们会创建一个storm工程和第一个strom topology。

接下来我们假设你安装的JRE的版本最低为1.6。我们推荐使用Oracle提供的JRE,可以从这里下载。

工作模式

在开始之前,有必要明白strom的工作模式。storm有两种工作方式。

本地模式

在 本地模式中,storm拓扑本地机器上的单个JVM中。该模式用于开发,测试和调试,因为这是查看所有拓扑组件工作的最简单的方式。在这个模式中,可以通 过调整参数来查看拓扑在不同的strom配置环境中是如何运行的。为了在本地模式中运行拓扑,需要下载storm的开发依赖,即开发和测试拓扑所需要的所 有东东。不久在我们创建第一个storm工程的时候,就会知道如何来做。

在本地模式中运行拓扑类似于在storm集群中运行。除了需要保证所有的组件是线程安全的,因为当拓扑部署到远程模式中,可能运行在不同的没有直接通信或者共享内存的物理主机上的不同JVM上。

本章中的所有例子中,我们都运行在本地模式中。

远程模式

远程模式中,拓扑被提交到storm集群中,给集群由运行在不同机器上的许多进程组成。远程模式不显示调试信息,这就是为什么它成为生产环境。然而,在单台开发机上是可以建立一个storm集群的,而且在部署到生产环境前这样做是一个很好的做法,这样可以保证在生产环境中运行拓扑不会出问题。

第六章我们会介绍更多关于远程模式,并在附录B中会展示如何建立集群。

HELLO WORLD STORM

在 这个工程中,我们将会创建一个简单的拓扑用于数单词。可以将其视为storm版的hello world。然而,这是一个灰常niubility的拓扑,因为可以扩展到任意大小,并且稍作修改后甚至可以用来建立一个统计系统。例如,我们可以修改这 个工程来寻找Twitter上主题的变化趋势。

创建一个这样的拓扑,需要一个用来读取单词的spout,第一个bolt用来归一化单词,第二个bolt用来数单词,具体如图2-1所示。

fig_2_1

可以从https://github.com/storm-book/examples-ch02-getting_started/zipball/master将源代码以ZIP文件的形式下载下来。

如果你使用git(一个分布式的版本控制和源码管理工具),你可以在打算保存源码的目录下运行命令git clone git@github.com:storm-book/examples-ch02-getting_started.git

检查JAVA的安装

建立环境的第一步就是检查当前运行的java的版本。打开终端并运行命令java -version。我们可以看到类似下面的内容:

java -version
java version "1.6.0_26"
Java(TM) SE Runtime Environment (build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)

如果不是,请检查java的安装。(具体可以见< http://www.java.com/download/>)

创建工程

在建立工程前,先建立一个存放工程的文件夹(就像建立一个java应用那样)。这个文件夹用来存放工程的源码。

接下来,我们需要下载storm的依赖:一系列的需要放到classpath的jar包。可以用下面的两种方式之一完成:

  • 下载依赖,解压,添加到classpath
  • 使用apache maven

在定义工程结构之前,我们需要建立一个pom.xml(project object model)文件,该文件描述了依赖,打包,源码等信息。我们将使用nathanmarz (https://github.com/nathanmarz/)提供的依赖和maven仓库。依赖可以从这里找到https://github.com/nathanmarz/storm/wiki/Maven

使用依赖,我们需要在pom.xml文件中指明运行拓扑所需的必要组件。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>storm.book</groupId>
  <artifactId>Getting-Started</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
          <compilerVersion>1.6</compilerVersion>
        </configuration>
      </plugin>
         </plugins>
  </build>
  <repositories>
        <!-- Repository where we can found the storm dependencies  -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
  </repositories>
  <dependencies>
        <!-- Storm Dependency -->
        <dependency>
          <groupId>storm</groupId>
          <artifactId>storm</artifactId>
          <version>0.6.0</version>
       </dependency>
  </dependencies>
</project>

开始的几行指定工程名和版本。然后增加了一个编译插件,用于高速Maven源码需要使用Java 1.6编译。接着定义了仓库(对于同一个工程Maven支持多个仓库)。clojars就是storm依赖所在的仓库。Maven会自动下载在本地模式中运行storm所需的子依赖。

应用将会有下面的结构,典型的Maven java工程:

our-application-folder/
    ├── pom.xml
    └── src
        └── main
            └── java
            |   ├── spouts
            |   └── bolts
            └── resources

java目录下的文件夹中是我们的源码,需要处理的单词文件在resou文件夹中。

创建第一个拓扑

要建立我们的第一个拓扑,我们需要实现运行数单词需要的类。有可能本例的部分在目前看来不太清楚,我们在后续的章节会进一步解释。

Spout

WordReader spout实现了接口IRichSpout。更多细节在第四章给出。WordReader负责读取文件,并提供每一个行给bolt。

例2-1给出了该类的完整代码(我们会在后续的例子中分析这段代码的每一部分)。

例2-1 src/main/java/spouts/WordReader.java

package spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;

import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader implements IRichSpout {
        private SpoutOutputCollector collector;
        private FileReader fileReader;
        private boolean completed = false;
        private TopologyContext context;
        public boolean isDistributed() {return false;}
        public void ack(Object msgId) {
                System.out.println("OK:"+msgId);
        }
        public void close() {}
        public void fail(Object msgId) {
                System.out.println("FAIL:"+msgId);
        }
        /**
         * The only thing that the methods will do It is emit each
         * file line
         */
        public void nextTuple() {
                /**
                 * The nextuple it is called forever, so if we have been readed the file
                 * we will wait and then return
                 */
                if(completed){
                        try {
                                Thread.sleep(1000);
                        } catch (InterruptedException e) {
                                //Do nothing
                        }
                        return;
                }
                String str;
                //Open the reader
                BufferedReader reader = new BufferedReader(fileReader);
                try{
                        //Read all lines
                        while((str = reader.readLine()) != null){
                                /**
                                 * By each line emmit a new value with the line as a their
                                 */
                                this.collector.emit(new Values(str),str);
                        }
                }catch(Exception e){
                        throw new RuntimeException("Error reading tuple",e);
                }finally{
                        completed = true;
            }
        }
        /**
         * We will create the file and get the collector object
         */
        public void open(Map conf, TopologyContext context,
                        SpoutOutputCollector collector) {
                try {
                        this.context = context;
                        this.fileReader = new FileReader(conf.get("wordsFile").toString());
                } catch (FileNotFoundException e) {
                        throw new RuntimeException("Error reading file 
["+conf.get("wordFile")+"]");
                }
                this.collector = collector;
        }
        /**
         * Declare the output field "word"
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
        }
}

在任何一个spout中首先被调用的方法是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。接收的参数分别是TopologyContext,包含了所有拓扑相关的数据;conf对象,在定义拓扑的时候指定;SpoutOutputCollector发射需要处理的数据到bolts。下面的代码块就是open方法的实现

 public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
    try {
        this.context = context;
        this.fileReader = new FileReader(conf.get("wordsFile").toString());
    } catch (FileNotFoundException e) {
        throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
    }
    this.collector = collector;
}

在该方法中,我们创建了一个reader,用来读取文件。接下来需要实现方法public void nextTuple(),在该方法中我们会发射需要bolts处理的数据。在我们的例子中,该方法会读取文件,每读一行发射一个值。

public void nextTuple() {
    if(completed){
        try {
                Thread.sleep(1);
        } catch (InterruptedException e) {
         //Do nothing
        }
        return;
    }
    String str;
    BufferedReader reader = new BufferedReader(fileReader);
    try{
        while((str = reader.readLine()) != null){
                this.collector.emit(new Values(str));
        }
    }catch(Exception e){
        throw new RuntimeException("Error reading tuple",e);
    }finally{
        completed = true;
    }
}

方法nextTuple()跟方法ack()fail()会在同一个循环中周期性的被调用。在没有任务可做的时候需要放弃对线程的控制权,这样其他的方法才有机会被调用。所以nextTuple的第一行检查处理是否完成。如果完成,它就会沉睡至少1ms来降低处理的负载。如果有任务需要处理,文件的每一行都会被读到一个value中,然后发射出去。

Bolts

现在,我们已经拥有了一个读取文件,并且每一行发射一个tuple的spout。还要设计两个bolts来处理刚才发射的tuple(见图2-1)。这些bolts实现了接口backtype.storm.topology.IRichBolt

bolt中最重要的方法是void execute(Tuple input),每接收到一个tuple就会调用该方法。对于每个接收的tuple,bolt可以发射出多个后续的tuple。

一个bolt或者spout可以发射出足够多的所需的tuple。当方法nextTuple或者execute被调用时,它们可能会发射出0,1或者许多个tuple。我们会在第五章做更多说明。

第一个bolt,wordNormalizer,负责接收每一行,并归一化。它将每一行文本切割成单词,转成小写并去除两边的空格。首先,需要声明bolt的输出参数:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

这里指明bolt会发射出一个名为word的域。

接下来,我们实现方法public void execute(Tuple tuple),用出来输入的tuples。

public void execute(Tuple input) {
    String sentence = input.getString(0);
    String[] words = sentence.split(" ");
    for(String word : words){
        word = word.trim();
        if(!word.isEmpty()){
            word = word.toLowerCase();
            //Emit the word
            collector.emit(new Values(word));
        }
    }
    // Acknowledge the tuple
    collector.ack(input);
}

第一行从tuple中读取值。值可以根据位置或者名字来获取。值在处理结束后,使用collector对象发射出去。在每一个tuple都被处理后,collector对象的ack()方法会被调用一表明整个处理过程成功结束。如果tuple没有被处理,那么应该调用fail()方法。

例2-2 给出该类的完整代码

例2-2 src/main/java/bolts/WordNormalizer.java

package bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt {
    private OutputCollector collector;
    public void cleanup() {}
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
            //Emit the word
                List a = new ArrayList();
                a.add(input);
                collector.emit(a,new Values(word));
            }
        }
    // Acknowledge the tuple
        collector.ack(input);
    }
    public void prepare(Map stormConf, TopologyContext context,
        OutputCollector collector) {
        this.collector = collector;
    }
    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

该类中,我们给出了一个在单次execute方法调用中发射多个tuple的例子。如果该方法接收了一行句子This is the Storm book,那么在单次execute方法调用中,它将会发射5个新tuple。

第二个bolt,WordCounter,负责对单词计数。当拓扑运行结束后(当cleanup()方法被调用),会给出每个单词的计数值。

这是一个不发射任何东西的bolt的示例。在这里,数据被添加到一个map中,但是在实际中bolt可以把数据存到数据库。

package bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounter implements IRichBolt {
        Integer id;
        String name;
        Map<String, Integer> counters;
        private OutputCollector collector;
        /**
         * At the end of the spout (when the cluster is shutdown
         * We will show the word counters
         */
        @Override
        public void cleanup() {
                System.out.println("-- Word Counter ["+name+"-"+id+"] --");
                for(Map.Entry<String, Integer> entry : counters.entrySet()){
                        System.out.println(entry.getKey()+": "+entry.getValue());
                }
        }
        /**
         * On each word We will count
         */
        @Override
        public void execute(Tuple input) {
                String str = input.getString(0);
                /**
                 * If the word dosn't exist in the map we will create
                 * this, if not We will add 1
                 */
                if(!counters.containsKey(str)){
                        counters.put(str, 1);
                }else{
                        Integer c = counters.get(str) + 1;
                        counters.put(str, c);
                }
                //Set the tuple as Acknowledge
                collector.ack(input);
        }
        /**
         * On create
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
               this.counters = new HashMap<String, Integer>();
               this.collector = collector;
               this.name = context.getThisComponentId();
               this.id = context.getThisTaskId();
       }
       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

execute方法使用一个map来收集单词并计数。当拓扑终止的时候,调用cleanup()方法并打印出计数器map。(这里只是一个示例,实际中应该使用cleanup方法在拓扑终止的时候来关闭连接以及其他的资源。)

main方法

在main方法中,需要创建一个LocalCluster对象,这可以使得我们在本地测试拓扑。结合Config对象,LocalCluster允许设置不同的集群配置。例如,如果使用了一个全局变量或者类变量,那么在不同数目的工作节点中测试拓扑就发现错误。(在第三章会详细说明)

拓扑所有的节点都应该独立的运行,进程之间没有共享数据(例如,没有全局变量或者类变量),因为当一个拓扑被部署到集群后,这些进程有可能运行在不同的物理主机上。

使用TopologyBuilder来创建拓扑,它指明了节点是如何安排的以及数据是如何交换的。

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

spout和bolts使用shuffleGroupings连接。这种分组类型指明消息从源节点以随机的方式发送到目标节点。

接下来,创建一个包含拓扑配置的Config对象,在拓扑运行的时候这些配置会和集群的配置合并,并通过prepare方法发送到所有的节点。

Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);

设置属性wordsFile为spout读取的文件名,因为在开发环境中,所以设置debug属性为true。当debug为true的时候,Storm会打印出节点间交换的所有信息,以及其他一些有用的调试信息,这些有助于理解拓扑是如何运行的。

正如之前所提到的,我们会使用LocalCluster来运行拓扑。在生产环境中,拓扑会一直运行下去,但是在这个例子中,我们只会让它运行几秒钟,这样我们很快就会看到结果了。

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();

使用createTopologysubmitTopology来创建并运行拓扑,沉睡2秒钟(拓扑运行在不同的线程中),然后通过关闭集群来停止拓扑。

完整的代码见例2-3

例2-3 src/main/java/TopologyMain.java

import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;
public class TopologyMain {
        public static void main(String[] args) throws InterruptedException {
        //Topology definition
                TopologyBuilder builder = new TopologyBuilder();
                builder.setSpout("word-reader",new WordReader());
                builder.setBolt("word-normalizer", new WordNormalizer())
                .shuffleGrouping("word-reader");
                builder.setBolt("word-counter", new WordCounter(),2)
                .fieldsGrouping("word-normalizer", new Fields("word"));
        //Configuration
                Config conf = new Config();
                conf.put("wordsFile", args[0]);
                conf.setDebug(false);
        //Topology run
                conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("Getting-Started-Toplogie", conf, 
                    builder.createTopology());
                Thread.sleep(1000);
                cluster.shutdown();
        }
}

运行结果

已经准备运行我们的第一个拓扑了!如果文件src/main/resources/words.txt中每行一个单词,那么用下面的命令运行拓扑:

mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"

例如,如果words.txt的内容如下:

Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great

在日志中,我们会看到下面的输出:

is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1

在这个例子中,每个节点只用了一个实例。但是如果日志文件灰常大,会怎么样呢?可以简单调整节点数来并行化处理。在这个例子中,可以创建两个WordCounter实例。

builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");

再次运行程序,可以看到:

-- Word Counter [word-counter-2] --
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
-- Word Counter [word-counter-3] --
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1

Niubility!改变并行数是如此简单(当然了,在实际中,每个实例都会运行在各自的机器上。)但是还是有个问题:单词isgreat在每个WordCounter实例中被计算了一次。为什么?当使用shuffleGrouping时,就等于告诉storm以一种随机的方式发射消息到bolt的实例。在这个例子中,理想的做法是同一个单词发送到同一个wordCounter。要这么做,可以将shuffleGrouping("word-normalizer")改成fieldsGrouping("word-normalizer",new Fields("word"))。试着改一下,再次运行并确认结果。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics