`
wbj0110
  • 浏览: 1557862 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

storm任务示例

阅读更多

 LogProcess.java

package mytest;

 

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.FileReader;

import java.io.FileWriter;

import java.io.IOException;

import java.io.OutputStreamWriter;

import java.io.PrintWriter;

import java.util.Map;

 

import mytest.ThroughputTest.GenSpout;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class LogProcess {

         public static class FileSpout extends BaseRichSpout {

 

                   /**

                    */

                   private static final long serialVersionUID = 1L;

                   private SpoutOutputCollector _collector;

                   private BufferedReader br;

                   private String dataFile;

                  

                   //定义spout文件

                   FileSpout(String dataFile){

                            this.dataFile = dataFile;

                   }

 

                   //定义如何读取spout文件

                   @Override

                   public void open(Map conf, TopologyContext context,

                                     SpoutOutputCollector collector) {

                            // TODO Auto-generated method stub

                            _collector = collector;

                            File csv = new File(dataFile); // log file

                            try {

                                     br = new BufferedReader(new FileReader(csv));

                            } catch (FileNotFoundException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            }

                   }

 

                   //获取下一个tuple的方法

                   @Override

                   public void nextTuple() {

                            // TODO Auto-generated method stub

                            try {

                                    

                                     String line = null;

                                     while ((line = br.readLine()) != null) {

                                               _collector.emit(new Values(line));

                                     }

                            } catch (FileNotFoundException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            } catch (IOException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            }

                   }

 

 

                   @Override

                   public void declareOutputFields(OutputFieldsDeclarer declarer) {

                            // TODO Auto-generated method stub

                            declarer.declare(new Fields("line"));

                   }

                  

         }

        

 

         public static class Process extends BaseRichBolt{

 

                   private String _seperator;

                   private String _outFile;

                   PrintWriter pw;

                   private OutputCollector _collector;

                   private BufferedWriter bw;

                  

                   public Process(String seperator,String outFile) {

                            this._seperator = seperator;

                            this._outFile   = outFile;

                           

                   }

                  

                   //把输出结果保存到外部文件里面。

                   @Override

                   public void prepare(Map stormConf, TopologyContext context,

                                     OutputCollector collector) {

                            // TODO Auto-generated method stub

                            this._collector = collector;

                            File out = new File(_outFile);

                            try {

//                                  br = new BufferedWriter(new FileWriter(out));

                                     bw = new BufferedWriter(new OutputStreamWriter( 

                             new FileOutputStream(out, true))); 

                            } catch (IOException e1) {

                                     // TODO Auto-generated catch block

                                     e1.printStackTrace();

                            }                

                   }

                  

                   //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。

                   @Override

                   public void execute(Tuple input) {

                            // TODO Auto-generated method stub

                            String line = input.getString(0);

//                         System.out.println(line);

                            String[] str = line.split(_seperator);

                            System.out.println(str[2]);

                            try {

                                     bw.write(str[2]+",bkeep"+"\n");

                                     bw.flush();

                            } catch (IOException e) {

                                     // TODO Auto-generated catch block

                                     e.printStackTrace();

                            }

                           

                            _collector.emit(new Values(line));

                   }

 

                   @Override

                   public void declareOutputFields(OutputFieldsDeclarer declarer) {

                            // TODO Auto-generated method stub

                            declarer.declare(new Fields("line"));

                   }

                  

         }

        

         public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{

                   String dataFile = argv[0]; //输入文件

                   String seperator = argv[1];      //分隔符

                   String outFile   = argv[2]; //输出文件

                   boolean distribute = Boolean.valueOf(argv[3]);       //本地模式还是集群模式

                   TopologyBuilder builder = new TopologyBuilder();  //build一个topology

        builder.setSpout("spout", new FileSpout(dataFile), 1);   //指定spout

        builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout");  //指定bolt,包括boltprocessgrouping

        Config conf = new Config();

        if(distribute){

            StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());

        }else{

                 LocalCluster cluster = new LocalCluster();

                 cluster.submitTopology("LogProcess", conf, builder.createTopology());

        }

         }       

}

 

 

运行

[admin@vkvm161064 guandao]$ pwd

/home/admin/guandao

[admin@vkvm161064 guandao]$ ls

out.txt  storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar  tmp.txt

 

输入文件:

[admin@vkvm161064 guandao]$ cat tmp.txt

a,b,c,d

1,2,3,4

A,B,C,D

xx,ff,ff,ss

xx,ff,alibaba,ss

xx,ff,taobao,ss

xx,xx,xx,xx

xx,xx,ll,xx

xx,xx,hero,xx

 

输出文件:

[admin@vkvm161064 guandao]$ cat out.txt

c,bkeep

3,bkeep

C,bkeep

ff,bkeep

alibaba,bkeep

taobao,bkeep

xx,bkeep

ll,bkeep

hero,bkeep

 

提交topology

[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase

 

语法:storm  jar 自己开发的topology   topology_name  inputfile 分隔符   outputfile  true/false(true代表集群运行)

分享到:
评论

相关推荐

    mina2.x开发示例

    参见博客: http://blog.csdn.net/joe_storm/article/details/11041763

    bdelab_erhe1011

    在各种练习中,将通过实际示例演示大数据工程讲座中的概念。 各个任务位于子目录中: (事实/图形模型,Hadoop HDFS,桶) (使用Hadoop MapReduce进行批处理) (使用JCascalog的复杂批处理层管道) (使用...

    ist的matlab代码-bdelab:大数据工程实验室

    在各种练习中,将通过实际示例演示大数据工程讲座中的概念。 各个任务位于子目录中: (事实/图形模型,Hadoop HDFS,桶) (使用Hadoop MapReduce进行批处理) (使用JCascalog的复杂批处理层管道) (使用Apache ...

    Python.Unix和Linux系统管理指南

    介绍了Python语言如何为管理uNIx和Linux服务器提供各种更加有效的任务处理方式。书中各章都提出了具体的管理问题,如并发或数据备份,然后通过Python示例提供了解决方案。通过《Python UNIX和Linux系统管理指南》,...

    PYTHON UNIX和LINUX系统管理指南

    《Python UNIX和Linux系统管理指南》介绍了Python语言如何为管理uNIx和Linux服务器提供各种更加有效的任务处理方式。书中各章都提出了具体的管理问题,如并发或数据备份,然后通过Python示例提供了解决方案。通过...

    java开源包1

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包11

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包2

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包3

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包6

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包5

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包10

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包4

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包8

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包7

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    java开源包9

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    JAVA上百实例源码以及开源项目源代码

     各种EJB之间的调用源码示例,用远程接口的引用访问EJB、函数将被FirstEJB调用,同时它将调用secondEJB 基于JAVA的UDP服务器模型源代码 2个目标文件 摘要:Java源码,网络相关,UDP  基于JAVA的UDP服务器模型源代码...

    java开源包101

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

    Java资源包01

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...

Global site tag (gtag.js) - Google Analytics