RabbitMQ (五)主题(Topic) -摘自网络

虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。 在我们的日志系统中,我们有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…) 这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。 为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

1、 主题转发(Topic Exchange)

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。 绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。 *可以匹配一个标识符。 #可以匹配0个或多个标识符。

2、 图解:

我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。 我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。 可以简单的认为: Q1对所有的橙色动物感兴趣。 Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。 一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,所以会被丢弃。 如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。 另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。 注:主题类型的转发器非常强大,可以实现其他类型的转发器。 当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。 当绑定键中不包含任何#与*时,类似direct类型转发器。

3、 完整的例子

发送端EmitLogTopic.java:

[java] view plaincopyprint?

  1. package com.zhy.rabbit._05_topic_exchange;
  2. import java.util.UUID;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class EmitLogTopic
  7. {
  8. private static final String EXCHANGE_NAME = "topic_logs";
  9. public static void main(String[] argv) throws Exception
  10. {
  11. // 创建连接和频道
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("localhost");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  17. String[] routing_keys = new String[] { "kernal.info", "cron.warning",
  18. "auth.info", "kernel.critical" };
  19. for (String routing_key : routing_keys)
  20. {
  21. String msg = UUID.randomUUID().toString();
  22. channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
  23. .getBytes());
  24. System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
  25. }
  26. channel.close();
  27. connection.close();
  28. }
  29. }
package com.zhy.rabbit._05_topic_exchange;

import java.util.UUID;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "topic");

		String[] routing_keys = new String[] { "kernal.info", "cron.warning",
				"auth.info", "kernel.critical" };
		for (String routing_key : routing_keys)
		{
			String msg = UUID.randomUUID().toString();
			channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
					.getBytes());
			System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
		}

		channel.close();
		connection.close();
	}
}

我们发送了4条消息,分别设置了不同的选择键。

接收端1,ReceiveLogsTopicForKernel.java

[java] view plaincopyprint?

  1. package com.zhy.rabbit._05_topic_exchange;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class ReceiveLogsTopicForKernel
  7. {
  8. private static final String EXCHANGE_NAME = "topic_logs";
  9. public static void main(String[] argv) throws Exception
  10. {
  11. // 创建连接和频道
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("localhost");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 声明转发器
  17. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  18. // 随机生成一个队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. //接收所有与kernel相关的消息
  21. channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
  22. System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
  23. QueueingConsumer consumer = new QueueingConsumer(channel);
  24. channel.basicConsume(queueName, true, consumer);
  25. while (true)
  26. {
  27. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  28. String message = new String(delivery.getBody());
  29. String routingKey = delivery.getEnvelope().getRoutingKey();
  30. System.out.println(" [x] Received routingKey = " + routingKey
  31. + ",msg = " + message + ".");
  32. }
  33. }
  34. }
package com.zhy.rabbit._05_topic_exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopicForKernel
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 随机生成一个队列
		String queueName = channel.queueDeclare().getQueue();

		//接收所有与kernel相关的消息
		channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");

		System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received routingKey = " + routingKey
					+ ",msg = " + message + ".");
		}
	}
}

直接收和Kernel相关的日志消息。

接收端2,ReceiveLogsTopicForCritical.java

[java] view plaincopyprint?

  1. package com.zhy.rabbit._05_topic_exchange;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class ReceiveLogsTopicForCritical
  7. {
  8. private static final String EXCHANGE_NAME = "topic_logs";
  9. public static void main(String[] argv) throws Exception
  10. {
  11. // 创建连接和频道
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("localhost");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 声明转发器
  17. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  18. // 随机生成一个队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 接收所有与kernel相关的消息
  21. channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
  22. System.out
  23. .println(" [*] Waiting for critical messages. To exit press CTRL+C");
  24. QueueingConsumer consumer = new QueueingConsumer(channel);
  25. channel.basicConsume(queueName, true, consumer);
  26. while (true)
  27. {
  28. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  29. String message = new String(delivery.getBody());
  30. String routingKey = delivery.getEnvelope().getRoutingKey();
  31. System.out.println(" [x] Received routingKey = " + routingKey
  32. + ",msg = " + message + ".");
  33. }
  34. }
  35. }
package com.zhy.rabbit._05_topic_exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopicForCritical
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 随机生成一个队列
		String queueName = channel.queueDeclare().getQueue();

		// 接收所有与kernel相关的消息
		channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");

		System.out
				.println(" [*] Waiting for critical messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received routingKey = " + routingKey
					+ ",msg = " + message + ".");
		}
	}
}

只接收致命错误的日志消息。

运行结果:

[x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.  [x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.  [x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.  [x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

[*] Waiting for messages about kernel. To exit press CTRL+C  [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

[*] Waiting for critical messages. To exit press CTRL+C  [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。

时间: 04-06

RabbitMQ (五)主题(Topic) -摘自网络的相关文章

Android:控件WebView显示网页 -摘自网络

WebView可以使得网页轻松的内嵌到app里,还可以直接跟js相互调用. webview有两个方法:setWebChromeClient 和 setWebClient setWebClient:主要处理解析,渲染网页等浏览器做的事情 setWebChromeClient:辅助WebView处理Javascript的对话框,网站图标,网站title,加载进度等 WebViewClient就是帮助WebView处理各种通知.请求事件的. 在AndroidManifest.xml设置访问网络权限:

tomcat解压版安装(摘自网络)

配置Tomcat[解压版] 选择解压版的Tomcat的理由是可以让我们使用多个Tomcat,但是配置上就会出现一些问题,需要我们手动进行更改配置.我的Tomcat版本是:apache-tomcat-6.0.16.zip 给个链接:http://tomcat.apache.org 下载完成后,解压到C:\Tomcat6(设置你的路径),检查环境变量JAVA_HOME是否存在:一般JDK安装时会自动设置,如果没有就创建,JAVA_HOME的值设为JDK的安装根路径. 环境变量位置:我的电脑 -> 右

SQL 阻塞(摘自网络)

/* 所谓的「阻塞」,是指当一个数据库会话中的事务,正在锁定其他会话事务想要读取或修改的资源, 造成这些会话发出的请求进入等待的状态.SQL Server 默认会让被阻塞的请求无限期地一直等待, 直到原来的事务释放相关的锁,或直到它超时 (根据 SET LOCK_TIMEOUT ).服务器关闭. 进程被杀死.一般的系统中,偶尔有短时间的阻塞是正常且合理的:但若设计不良的程序,就可能导致长时间的阻塞, 这样就不必要地锁定了资源,而且阻塞了其他会话欲读取或更新的需求.遇到这种情况,可能就需要手工排除

Android之旅十五 android中的网络操作

android中的网络操作和java里面没有什么区别,java里面的很多网络操作方法都可以搬到android中去使用,主要几个点: 1.post和get请求的区别,大家可以在网上查阅有关资料进行了解,get主要以向地址中拼接字符串参数发送到服务器,长度有限制,并且请求参数暴露在地址栏中,不怎么安全:post则主要是将请求参数转换为相应的http协议请求体发送到服务器,相比get方式,参数的长度没有限制,并且参数信息不会暴露给用户: 2.我们在java web里面通过浏览器以post方式发送数据,

Windows Azure Service Bus (4) 主题(Topic) 使用VS2013开发Service Bus Topic

<Windows Azure Platform 系列文章目录> 在笔者之前的文章中Windows Azure Service Bus (1) 基础 介绍了Service Bus支持主题(Topic).如下图: 当2个客户端同时订阅了相同的主题(Topic).当向这个Topic发送消息的时候,2个客户端会同时收到该消息. 笔者模拟一个在线聊天室的场景: 1.创建一个Windows Console命令行项目,编写相应的代码 2.运行项目,要求输入聊天室名称(即订阅了相同的主题Topic) 3.当2

字符串相似度计算的方法,使用SQL以及C#实现,本文非原创摘自网络(.NET SQL技术交流群入群206656202需注明博客园)

1 CREATE function get_semblance_By_2words 2 ( 3 @word1 varchar(50), 4 @word2 varchar(50) 5 ) 6 returns nvarchar(4000) 7 as 8 begin 9 declare @re int 10 declare @maxLenth int 11 declare @i int,@l int 12 declare @tb1 table(child varchar(50)) 13 declare

C#制作ActiveX控件及部署升级(摘自网络)

使用C#开发ActiveX控件 控件开发.制作CAB包.签名.部署 ActiveX控件以前也叫做OLE控件,它是微软IE支持的一种软件组件或对象,可以将其插入到Web页面中,实现在浏览器端执行动态程序功能,以增强浏览器端的动态处理能力.通常ActiveX控件都是用C++或VB语言开发,本文介绍另一种方式,在.NET Framework平台上,使用C#语言开发ActiveX控件. 虽然本文通篇都在讲如何使用C#语言开发ActiveX控件,但我并不极力推荐使用这种技术,因为该技术存在明显的局限,即需

RabbitMQ三种Exchange模式(fanout,direct,topic)的特性 -摘自网络

RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout,direct,topic,header header模式在实际使用中较少,本文只对前三种模式进行比较. 性能排序:fanout > direct >> topic.比例大约为11:10:6 一.Direct Exchange 任何发送到Direct Exchange的消息都会被转发到RouteKey中指定

【c#】RabbitMQ学习文档(五)Topic(主题。通配符模式)

(本实例都是使用的Net的客户端,使用C#编写),说明,中文方括号[]表示名词. 在上一个教程中,我们改进了我们的日志记录系统. 没有使用只能够进行虚拟广播的[Fanout]交换机,而是使用了[Direct]类型的交换机,这样做就可以让我们有可能选择性地接收日志. 虽然使用[Direct]类型的[消息交换机]改进了我们的系统,但它仍然有限制 - 它不能基于多个标准进行路由选择. 在我们的日志记录系统中,我们可能不仅要根据严重性订阅日志,还可以基于发出日志的源进行订阅. 您可能会从syslog u