LogProcess.java
package mytest;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Map;
import mytest.ThroughputTest.GenSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LogProcess {
public static class FileSpout extends BaseRichSpout {
/**
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector _collector;
private BufferedReader br;
private String dataFile;
//定义spout文件
FileSpout(String dataFile){
this.dataFile = dataFile;
}
//定义如何读取spout文件
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
_collector = collector;
File csv = new File(dataFile); // log file
try {
br = new BufferedReader(new FileReader(csv));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//获取下一个tuple的方法
@Override
public void nextTuple() {
// TODO Auto-generated method stub
try {
String line = null;
while ((line = br.readLine()) != null) {
_collector.emit(new Values(line));
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
}
public static class Process extends BaseRichBolt{
private String _seperator;
private String _outFile;
PrintWriter pw;
private OutputCollector _collector;
private BufferedWriter bw;
public Process(String seperator,String outFile) {
this._seperator = seperator;
this._outFile = outFile;
}
//把输出结果保存到外部文件里面。
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this._collector = collector;
File out = new File(_outFile);
try {
// br = new BufferedWriter(new FileWriter(out));
bw = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(out, true)));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
//blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String line = input.getString(0);
// System.out.println(line);
String[] str = line.split(_seperator);
System.out.println(str[2]);
try {
bw.write(str[2]+",bkeep"+"\n");
bw.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
_collector.emit(new Values(line));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
}
public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
String dataFile = argv[0]; //输入文件
String seperator = argv[1]; //分隔符
String outFile = argv[2]; //输出文件
boolean distribute = Boolean.valueOf(argv[3]); //本地模式还是集群模式
TopologyBuilder builder = new TopologyBuilder(); //build一个topology
builder.setSpout("spout", new FileSpout(dataFile), 1); //指定spout
builder.setBolt("bolt", new Process(seperator,outFile),1).shuffleGrouping("spout"); //指定bolt,包括bolt、process和grouping
Config conf = new Config();
if(distribute){
StormSubmitter.submitTopology("LogProcess", conf, builder.createTopology());
}else{
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogProcess", conf, builder.createTopology());
}
}
}
运行
[admin@vkvm161064 guandao]$ pwd
/home/admin/guandao
[admin@vkvm161064 guandao]$ ls
out.txt storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar tmp.txt
输入文件:
[admin@vkvm161064 guandao]$ cat tmp.txt
a,b,c,d
1,2,3,4
A,B,C,D
xx,ff,ff,ss
xx,ff,alibaba,ss
xx,ff,taobao,ss
xx,xx,xx,xx
xx,xx,ll,xx
xx,xx,hero,xx
输出文件:
[admin@vkvm161064 guandao]$ cat out.txt
c,bkeep
3,bkeep
C,bkeep
ff,bkeep
alibaba,bkeep
taobao,bkeep
xx,bkeep
ll,bkeep
hero,bkeep
提交topology
[admin@vkvm161064 guandao]$ storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jarSHOT-jar-with-dependencies.jar mytest.LogProcess /home/admin/guandao/tmp.txt , /home/admin/guandao/out.txt fase
语法:storm jar 自己开发的topology topology_name inputfile 分隔符 outputfile true/false(true代表集群运行)
相关推荐
参见博客: http://blog.csdn.net/joe_storm/article/details/11041763
在各种练习中,将通过实际示例演示大数据工程讲座中的概念。 各个任务位于子目录中: (事实/图形模型,Hadoop HDFS,桶) (使用Hadoop MapReduce进行批处理) (使用JCascalog的复杂批处理层管道) (使用...
在各种练习中,将通过实际示例演示大数据工程讲座中的概念。 各个任务位于子目录中: (事实/图形模型,Hadoop HDFS,桶) (使用Hadoop MapReduce进行批处理) (使用JCascalog的复杂批处理层管道) (使用Apache ...
介绍了Python语言如何为管理uNIx和Linux服务器提供各种更加有效的任务处理方式。书中各章都提出了具体的管理问题,如并发或数据备份,然后通过Python示例提供了解决方案。通过《Python UNIX和Linux系统管理指南》,...
《Python UNIX和Linux系统管理指南》介绍了Python语言如何为管理uNIx和Linux服务器提供各种更加有效的任务处理方式。书中各章都提出了具体的管理问题,如并发或数据备份,然后通过Python示例提供了解决方案。通过...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
各种EJB之间的调用源码示例,用远程接口的引用访问EJB、函数将被FirstEJB调用,同时它将调用secondEJB 基于JAVA的UDP服务器模型源代码 2个目标文件 摘要:Java源码,网络相关,UDP 基于JAVA的UDP服务器模型源代码...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java...