RabbitMq 消息订阅模式--原创 彭应智

Rabbit 概念

名词解释

名词解释系小米自己的理解。和书籍、网站等有所出入。仅代表本人理解的

  • 生产者 是发送消息的
  • 交换器 是负责转发消息的
  • 队列 是存储消息的
  • 消费者 是接收消息的

发送方

@Configuration
public class MqConfig {

    /**
     * 获取交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("testSubscribe");
    }


}
/**
 * 定时器
 */
@Component
@Slf4j
public class Sender {
    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 定时发消息
     * 每五秒发送一次
     */
    @Scheduled(cron = "0/5 * * * * ?*")
    public void sender(){
        Random r = new Random();
        Long num = r.nextLong();
        log.debug("Sender : " + num);
        System.out.println("Sender : " + num);
        //往交换机发送消息
        amqpTemplate.convertAndSend("testSubscribe","",num);
    }
}

订阅方

@Configuration
public class MqConfig {

    /**
     * 新建消息队列
     * @return
     */
    @Bean
    public Queue AMessage() {
        return new Queue("testSubscribe.client02");
    }

    /**
     * 获取交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("testSubscribe");
    }

    /**
     * 绑定交换机
     * @param AMessage
     * @param fanoutExchange
     * @return
     */
    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
}
@Component
@Slf4j
public class Listener {
    /**
     * 监听消息队列
     * @param num
     */
    @RabbitListener(queues = "testSubscribe.client02")
    @RabbitHandler
    public void testSubscribe(Long num) {
        Thread thread = Thread.currentThread();
        log.debug("\n这是:{}\n线程ID:{}\n线程名称:{}\n监听队列为{}\n获得的数据为:{}","client02", thread.getId(), thread.getName(), "testSubscribe",num);
        System.out.println(MessageFormat.format("\n这是:{0}\n线程ID:{1}\n线程名称:{2}\n监听队列为{3}\n获得的数据为:{4}","client02", thread.getId(), thread.getName(), "testSubscribe",num));
    }
}

总结

我的方案是

  • 发送方 发送方只需要把消息发送给交换机

    所以发送方只需要定义交换机并往交换机中发送消息,至于消息最终发到哪些队列无需关心。

  • 接收方 由接收方定义队列和配置交换机

    接收定义交换机和队列,并声明交换机和队列的关系。此后便可以从队列中获取消息。

发布/订阅

  • 接收方从交换机订阅
  • 发送方发布到交换机