RabbitMq 消息订阅模式--原创 彭应智
Rabbit 概念
名词解释
名词解释系小米自己的理解。和书籍、网站等有所出入。仅代表本人理解的
- 生产者 是发送消息的
- 交换器 是负责转发消息的
- 队列 是存储消息的
- 消费者 是接收消息的
发送方
@Configuration
public class MqConfig {
/**
* 获取交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("testSubscribe");
}
}
/**
* 定时器
*/
@Component
@Slf4j
public class Sender {
@Resource
private AmqpTemplate amqpTemplate;
/**
* 定时发消息
* 每五秒发送一次
*/
@Scheduled(cron = "0/5 * * * * ?*")
public void sender(){
Random r = new Random();
Long num = r.nextLong();
log.debug("Sender : " + num);
System.out.println("Sender : " + num);
//往交换机发送消息
amqpTemplate.convertAndSend("testSubscribe","",num);
}
}
订阅方
@Configuration
public class MqConfig {
/**
* 新建消息队列
* @return
*/
@Bean
public Queue AMessage() {
return new Queue("testSubscribe.client02");
}
/**
* 获取交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("testSubscribe");
}
/**
* 绑定交换机
* @param AMessage
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
}
@Component
@Slf4j
public class Listener {
/**
* 监听消息队列
* @param num
*/
@RabbitListener(queues = "testSubscribe.client02")
@RabbitHandler
public void testSubscribe(Long num) {
Thread thread = Thread.currentThread();
log.debug("\n这是:{}\n线程ID:{}\n线程名称:{}\n监听队列为{}\n获得的数据为:{}","client02", thread.getId(), thread.getName(), "testSubscribe",num);
System.out.println(MessageFormat.format("\n这是:{0}\n线程ID:{1}\n线程名称:{2}\n监听队列为{3}\n获得的数据为:{4}","client02", thread.getId(), thread.getName(), "testSubscribe",num));
}
}
总结
我的方案是
发送方 发送方只需要把消息发送给交换机
所以发送方只需要定义交换机并往交换机中发送消息,至于消息最终发到哪些队列无需关心。
接收方 由接收方定义队列和配置交换机
接收定义交换机和队列,并声明交换机和队列的关系。此后便可以从队列中获取消息。
发布/订阅
- 接收方从交换机订阅
- 发送方发布到交换机