前言:
昨天有朋友聊天说,我写的前面三篇太简单了,没有太多深入的东西。好吧,这说明我的目的达到了。我写这个系列的原因就是为了面向应用,进一步细化为两点:
1. 以例子说话,由简入深,一步步了解如何在Storm上开发应用,不会读起来吃力;
2. 对于一些原理性的东西,不去过于深究,只要记住Storm是这样实现的,开发的时候加以利用或规避。
在明白了这些基础的东西以后,如果对于原理性的东西Storm是如何实现的感兴趣,可以再去看源代码也不迟。毕竟这部分对开发应用的帮助并不直接。我认为,不必每个用Storm的人都必须了解Storm底层是如何实现的,当然,我会尝试在适当的位置插入相关原理性解释的链接,有兴趣可以直接去看看。就此原因,我把标题改成“Storm应用系列”。
注:转帖请注明,原帖地址:
http://blog.csdn.net/xeseo/article/details/17750379
Component
Spout
在前面基本例子中,我们实现了一个RandomSpout,来看看其类图
- Spout的最顶层抽象是ISpout接口。
open方法是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。
close方法在该spout关闭前执行,但是并不能得到保证其一定被执行。spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。
activate和deactivate :一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。
nextTuple 用来发射数据。
ack(Object)
传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。
fail(Object)
同ack,只不过是tuple处理失败时执行。
我们的RandomSpout 由于继承了BaseRichSpout,所以不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。
结论:
通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。
Bolt
- IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文
- execute接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果
- cleanup 同ISpout的close方法,在关闭前调用。同样不保证其一定执行。
- public class ExclaimRichBolt extends BaseRichBolt {
- private OutputCollector collector;
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void execute(Tuple tuple) {
- this.collector.emit(tuple, new Values(tuple.getString(0)+"!"));
- this.collector.ack(tuple);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("after_excl"));
- }
- }
- //builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
- builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout");
http://blog.csdn.net/xeseo/article/details/17750379
相关推荐
Storm中spout和bolt之间发送和接收数据的java源代码实例
写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地...
The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data in some sort of storage, or passes it to some other bolt. You can imagine a ...
The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data in some sort of storage, or passes it to some other bolt. You can imagine a ...
Storm is a distributed, reliable, fault-tolerant system for ... You can imagine a Storm cluster as a chain of bolt components that each make some kind of transformation on the data exposed by the spout.
Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream ...
代码参考传智播客课程编写,演示了如何使用storm的spout,bolt,Topology
DDS是用于以数据为中心的连接的中间件协议和API标准,并且是唯一能够满足物联网(IoT)的高级要求的标准。 DDS提供了业务和关键任务IoT应用程序所需的低延迟数据连接,极高的可靠性和可扩展性。 有关更多信息,请...
快速且可扩展伸缩容错确保消息能够被处理易于设置和操作开源的分布式实时计算系统-最初由NathanMarz开发-使用Java和Clojure编写Storm和Hadoop主要区别是实时和批处理的区别:Storm概念组成:Spout和Bolt组成Topology...
storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级...
这是一个示例存储库,展示了如何设置 Java 项目以使用KafkaSpout ,它现在是 Storm 0.9.3 版的一部分。 在撰写本文时在线的其他示例要么是错误的(过时的),要么展示了如何在本地模式下使用 KafkaSpout 但不是针对...
喷口处理Windows上的应用程序之间共享Spout纹理的处理库
spout与bolt设置多重grouping,
Storm Kafka Integration架包,包含storm.kafka.KafkaSpout、import、storm.kafka.SpoutConfig、import storm.kafka.StringScheme、import storm.kafka.ZkHosts等
第一个是处理消息重放的Spout 部分,第二个是管理主要处理中间状态的Bolt 部分。 ####脱粒机####卡夫卡用作喷口的数据源。 这使得重播消息变得容易和方便。 并且使用 kafka 不需要(拓扑的)喷口自己跟踪消息。 ...
02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API 简单开发测试 08.storm-kafka...
并细分为20个章节,其中“基础知识”6章、“安装与部署”4章、“研发与维护”4章、“进阶知识”5章、“企业应用”1章,分别介绍了Storm的基本原理、Topology组件、Spout组件、Bolt组件、ZooKeeper集群、Storm的安装...