`
农村外出务工男JAVA
  • 浏览: 104812 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

storm 如何编写可靠的spout和bolt

阅读更多

一、前言

   对于不使用trident的人来说,使用基本的storm spout,bolt操作,需要理解storm的ack机制,保证消息的完整性,Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。

   怎样才认为消息被完全处理?每个从 Spout发出的 Tuple可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。

   这里我主要给不使用trident实现业务的同事讲如何实现可靠的spout,bolt。

二、实现可靠的spout

   让我们先来看下ISpout接口的几个方法

public class ISpout接口测试 implements ISpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
       /**
        * 1、在任务集群的工作进程内被初始化,提供spout执行所需要的环境
        * 2、conf参数是这个spout的strom配置,提供给拓扑与这台主机上的集群配置一起合并
        * 3、context主要用来获取这个任务在拓扑中的位置信息,包括该任务的id,该任务的组件id,输入和输出消息等
        * 4、collector是收集器,用于从spout发送元祖,收集器是线程安全的,应该作为这个spout对象的实例变量进行保存。
        *
        */
	}

	@Override
	public void close() { 
		 /**
		  * 1、当ISpout关闭时被调用,不能保证close一定被调用,因为在集群中可以使用kill -9 直接杀死工作进程/本地模式除外
		  */
	}

	@Override
	public void activate() {
        /**
         * 当spout从失效模式中激活的时候被调用
         */
	}

	@Override
	public void deactivate() {
      /**
       * 当spout已经失效的时候被调用,在失效期间,nextTuple()方法不会被调用
       */
	}

	@Override
	public void nextTuple() {

		/**
		 * 1、非阻塞,如果没有元祖可以发送,可休眠,不浪费CPU
		 * 2、发送元祖到输出收集器SpoutOutputCollector
		 */
	}

	@Override
	public void ack(Object msgId) {

	  /**
	   * 1、storm断定该spout发送的标识符msgId的元祖已经被成功处理时调用
	   * 2、ack()方法调用后将消息移除队列(之前的消息是挂起的)
	   */
	}

	@Override
	public void fail(Object msgId) {
		/**
		   * 1、storm断定该spout发送的标识符msgId的元祖没有被成功处理时调用
		   * 2、fail()方法调用后将消息放入队列(之前的消息是挂起的)
		   */
	}

}

      那么我们如何实现可靠的spout呢?

   1. 在 nextTuple 函数中调用 emit 函数时需要带一个msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
   2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护,这点比较坑)

   例子:

 

public class 可靠的spout implements ISpout{
	
    private SpoutOutputCollector collector;

	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector  = collector;
	}

	@Override
	public void close() {
		
	}

	@Override
	public void activate() {
		
	}

	@Override
	public void deactivate() {
		
	}

	@Override
	public void nextTuple() {
		String curMsg = "发送消息";
		String msgId = "发送消息";
		//这里我假设MsgId和发送的消息一样,便于维护msgId和消息之间的对应关系
		collector.emit(new Values(curMsg),msgId);
	}

	@Override
	public void ack(Object msgId) {
		
	}

	@Override
	public void fail(Object msgId) {
		String tmp = (String)msgId;   //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
		//消息进行重发
		collector.emit(new Values(tmp), msgId);
	}

 三、实现可靠的bolt

      同样,先看看IBolt接口提供的几个方法

public class IBolt接口测试  implements IBolt{

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		/**
		 * 1、提供bolt运行的一些环境
		 */
	}

	@Override
	public void execute(Tuple input) {
		/**
		 * 1、一次处理一个输入的元祖,元祖对象包括来自哪个组件/流/任务的元数据
		 * 2、IBolt没有立即处理元祖,而是完整的捕获一个元祖并在以后进行处理
		 * 3、如果实现basicBolt则不用手动ack()
		 */
	}

	@Override
	public void cleanup() {
		/**
		 * 1、当一个bolt即将关闭时调用,不能保证一定被调用,集群的kill -9 不行
		 * 
		 */
	}
	
	/**
	 * bolt的生命周期:在客户端主机上创建Ibolt对象,bolt被序列化到拓扑,并提及到nimbus,然后nimbus
	 * 启动工作进程(worker)进行反序列化,调用其prepare()方法开始处理
	 */
}

    那我们如何实现可靠的bolt呢,主要有2种方式

   3.1 继承 BaseBasicBolt


public final class 第一种可靠的bolt extends BaseBasicBolt {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String sentence = tuple.getString(0);
		for (String word : sentence.split("\\s+")) {
			//storm自动ack和fail
			collector.emit(new Values(word));
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

  对于继承BaseBasicBolt的Bolt来说,storm内部已经替我们自动ack和fail了,不需我们手动ack,然而这个抽象类不太使用,使用场景单一。

  3.2 继承 BaseRichBolt

package com.storm.bolt.可靠性;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class 可靠的bolt extends BaseRichBolt  {

	private static final long serialVersionUID = 1L;
	
	OutputCollector _collector;

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this._collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		 String sentence = tuple.getString(0);
	     for(String word: sentence.split("\\s+")) {
	    	// 建立 anchor 树
	         _collector.emit(tuple, new Values(word));  
	     }
	     //手动ack
	    _collector.ack(tuple);  
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

  需要我们自己手动ack,但其适应场景更广泛。

 四、ack原理

    对于每个spout tuple保存一个ack_val值,初始值为0,然后每发射一个tuple或者ack 一个tuple, tuple的 id都要和这个校验值(ack_val)异或,并更新ack_val,如果每个发射出去的tuple都ack了,那么最后ack_val一定是0。
 五、ack流程
    1、spout发射消息生成一个messageId对象{属性Map<RootId,消息ID>}放入pendingMap中,在未超时时间内保留
    2、spout发出消息后给acker bolt(ack其实也是一个特殊的bolt)发射tuple消息 {tuple-id,ack_val,task_id}:
       tupe_id:实际上就是rootId
       ack_val:刚开始为初始值0
       task_id:为spout的id,这样acker才知道是哪个spout发射过来的,如果有多个acker,那么根据task_id哈希取模也能找到对应的acker,保证同一个spout发射出来的消息被同一个acker处理.然后acker bolt从自己的pending对象中新增一条记录{tuple_id,{task_id,ack_val}}
    3、bolt接收到消息后(该bolt可能是第一个也可能是最后一个,原理都一样),发射消息给下一个task的过程中也会构建一个MessageId对象,messageId中会进行消息ID(本身消息id)和接收到的消息ID(上一个bolt或者spout传过来)进行异或得到ack_val发给acker.
    4、acker收的后根据tuple_id从penging中取出旧的ack_val然后进行异或。
    5、继续bolt处理...........
    6、如果最终异或结果为0,调用spout的ack方法,如果失败,调用fail方法。
 5.1 例子:
   1:spout产生一个tuple,初始值0100,同时发送给ack和Bolt1   {acker 值 0100}
    2:bolt1接收spout发送过来的0100消息,经过处理后产生了新消息0010,那么bolt1就讲0100^0010发送给acker  {
             acker值 0100^0010 = 0110
                    0110^0100=0010
           }

   3:bolt2接收bolt1发送过来的消息,没有产生任何消息(直接持久化了),那么Bolt2将bolt1的消息 0010发送给acker  {       
              acker值 0100^0010 = 0110
                    0110^0100=0010
                    0010^0010=0000
                }
   4:acker进行整个流程的异或操作 {acker求最终的异或值}

分享到:
评论

相关推荐

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    vortex-storm:一个Apache Storm Spout和Bolt,用于使用和生成Vortex数据

    DDS提供了业务和关键任务IoT应用程序所需的低延迟数据连接,极高的可靠性和可扩展性。 有关更多信息,请访问 。 支持 这是概念/原型/ alpha的证明,因此没有任何正式支持就被提供。 如果您遇到问题或疑问,我们将...

    storm demo

    代码参考传智播客课程编写,演示了如何使用storm的spout,bolt,Topology

    storm基础框架分析

    前期收到的问题:1、在Topology中我们可以指定spout、bolt的并行度,在提交Topology时Storm如何将spout、bolt自动发布到每个服务器并且控制服务的CPU、磁盘等资源的?2、Storm处理消息时会根据Topology生成一棵消息...

    漫谈大数据第四期-storm

    Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream ...

    Storm如何保证可靠的消息处理

    本文来自于博客园,本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要怎么做,才能充分利用Storm的可靠性。Storm可以保证从Spout发出的每个消息都能被完全处理。Storm的可靠性机制是完全分布式的...

    storm多重grouping的一个例子

    spout与bolt设置多重grouping,

    storm-redis:Storm Bolt 状态管理

    第一个是处理消息重放的Spout 部分,第二个是管理主要处理中间状态的Bolt 部分。 ####脱粒机####卡夫卡用作喷口的数据源。 这使得重播消息变得容易和方便。 并且使用 kafka 不需要(拓扑的)喷口自己跟踪消息。 ...

    Storm大数据实时处理

    快速且可扩展伸缩容错确保消息能够被处理易于设置和操作开源的分布式实时计算系统-最初由NathanMarz开发-使用Java和Clojure编写Storm和Hadoop主要区别是实时和批处理的区别:Storm概念组成:Spout和Bolt组成Topology...

    Storm Real-time Processing Cookbook实例代码

    The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data in some sort of storage, or passes it to some other bolt. You can imagine a ...

    Getting Started with Storm

    Storm is a distributed, reliable, fault-tolerant system for ... You can imagine a Storm cluster as a chain of bolt components that each make some kind of transformation on the data exposed by the spout.

    Storm Real-time Processing Cookbook

    The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data in some sort of storage, or passes it to some other bolt. You can imagine a ...

    第一个Storm应用

    Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。 Bolt中最重要的是execute方法,每当一个tuple传过来时它便...

    storm记录级容错.docx

    storm允许用户在spout中发射一个新的源...在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了

    storm-spring-autowire:使storm支持spring的注入功能

    ), 创建spout/bolt实例(spout/bolt在storm中统称为component)并进行序列化. 2.将序列化的component发送给所有的任务所在的机器 3.在每一个任务上反序列化component. 4.在开始执行任务之前, 先执行component的初始化...

    Spout/Blot编程实例实例详解

    这些Spout和Bolt构成下图所示的一个多节点的有向图。(数据从左向右流动)。整个图是一个Topology,其中有2个Spout和4个Bolot组成。Spout是Storm的Topoloy的入口,数据流都是从Spout进入topology来进行处理。他负责...

Global site tag (gtag.js) - Google Analytics