taoCMS是基于php+sqlite/mysql的国内最小(100Kb左右)的功能完善、开源免费的CMS管理系统

Storm-RabbitMQ的使用(一)

2017-04-27

RabbitMQ是一款广为使用的企业级消息队列系统,采用erlang语言开发,支持多种语言的客户端访问,这也是我选择rabbitmq作为我接触的第一个MQ的原因。

在去年的项目,我使用Rabbit用来缓存生产者产生的消息,另一端采用java客户端来获取消息然后进行保存、过滤等处理,每天的消息量峰值在1000条/s,共3条队列,以消费者和生产者加起来只有不到40个,单机4核4G的Linux平台下,未做HA的情况下稳定运行了一年多,最多时曾缓存了近两亿条数据(由于消费者机器断电导致)。

随着业务不断复杂,采用自己开发的java客户端要处理多个并行的逻辑逐渐出现瓶颈和增加了开发的难度,于是,开始采用storm实时流式计算进行后端数据的过滤、合并、保存等处理。

于是在整个业务流程上,rabbitmq的消费者端即为整个storm topology的spout,我们一方面可以自己开发spout,并且在spout中维护rabbitmq的链接,可以采用spring-rabbit进行访问。另一方面,github上流行着一个storm-rabbitmq的开源程序,本文即是采用此方法来使用该框架来集成storm-rabbitmq.

github地址:https://github.com/ppat/storm-rabbitmq

clone到本地后,使用maven进行build并install到本地repository中。需要注意的是,我们需要pom.xml文件中以下项目:

1.rabbitmq版本

2.storm版本,注意现在的storm为apache下的框架最终依赖列表如下:

<dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.6</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
    </dependencies>

进入根目录,执行mvn package install,这样就生成了storm-rabbitmq-0.6.2-SNAPSHOT.jar包到本地repository中,我们可以在其他项目通过如下依赖引用:

<dependency>
    	<groupId>io.latent</groupId>
    	<artifactId>storm-rabbitmq</artifactId>
    	<version>0.6.2-SNAPSHOT</version>
    	<scope>compile</scope>
    </dependency>

开发步骤:

引入storm-rabbitmq-x.jar之后,我们就可以进行开发了。

1.我们首先需要继承backtype.storm.spout.Scheme来反序列化RabbitMQ的消息。如下:

/**
 * 自定义MQ消息的Schema
 * @author adam
 *
 */
public class MyCustomMessageScheme implements backtype.storm.spout.Scheme {

	
	/**
	 * 把MQ中读取的消息反序列化
	 */
	public List<Object> deserialize(byte[] bytes) {
		List objs = new ArrayList();
		
		//直接反序列化为string
		String str = new String(bytes);
		
		//依次返回UUID,String,Number
		objs.add(UUID.randomUUID().toString());
		objs.add(str);
		String numStr = Math.round(Math.random()*8999+1000)+""; 
		objs.add(numStr);
		
		return objs;
	}

	/**
	 * 定义spout输出的Fileds
	 */
	public Fields getOutputFields() {
		//依次返回UUID,String,Number,需要与上述返回的List列表一一对应
		return new Fields("id", "str", "num");
	}

}

2.在Topology的Builder之前,我们就可以创建该Spout使用上面我们定义的schema,如下:

Scheme scheme = new MyCustomMessageScheme();
		RabbitMQSpout spout = new RabbitMQSpout(scheme);

3.创建rabbitmq的连接配置,实际开发过程中,连接参数可以放在resources目录创建一个properties目录,然后动态配置

ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat 
ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
                                                        .queue("your.rabbitmq.queue")
                                                        .prefetch(200)
                                                        .requeueOnFail()
                                                        .build();

这里值得注意的是requeueOnFail()如果打开,如果tuple后期处理失败或未发送ack消息,消息将会重新返回rabbitmq进行排队处理。如果关闭该选项,失败的消息将会从消息队列中移除并且发送RabbitMQ默认的 dead letter exchange (需要在RabbitMQ中配置).

4.添加spout到TopologyBuilder中,设置MaxSpoutPending的值与RabbitMQ预取(prefetch)的值相同,注意MaxSpoutPending应该总是小于prefetch值。如下:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("my-spout", spout)
       .addConfigurations(spoutConfig.asMap())
       .setMaxSpoutPending(200);

这样就相当于spout这个水龙头就有了数据来源,后期我们可以创建bolt用来保存该值到hbase,mysql,redis等存储中,同时我们可以创建bolt用来进行过滤等操作。

后面我会将rabbitmq作为输入源,然后使用hbase进行保存,采用storm-hbase开源程序。

类别:未分组 | 阅读:372774 | 评论:0 | 标签:

想收藏或者和大家分享这篇好文章→

“Storm-RabbitMQ的使用(一)”共有0条留言

发表评论

姓名:

邮箱:

网址:

验证码:

公告

taoCMS发布taoCMS 3.0.2(最后更新21年03月15日),请大家速速升级,欢迎大家试用和提出您宝贵的意见建议。

捐助与联系

☟请使用新浪微博联系我☟

☟在github上follow我☟

标签云