跳至主要內容

SpringCloudStream

酷风大约 2 分钟

SpringCloudStream

介绍

  • 为微服务应用构建消息驱动能力的框架;
  • 引入了发布与订阅、 消费组以及分区这三个核心概念
    • 遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)

RabbitMQ Binder

注解使用

  • 注解方式,3.1起已弃用,不推荐使用,支持函数式编程模型 如 @Input @Output @StreamListener
  • 使用方法见参考

配置参考1

server:
  port: 8088
spring:
  application:
    name: stream_demo
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于binding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbitmq连接环境
            spring:
              rabbitmq:
                host: **.**.**.**    #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: ****       #rabbitmq 用户名
                password: ****       #rabbitmq 密码
                virtual-host: /       #虚拟路径
      bindings:        #服务的整合处理
        #inputs 对应消费者,outputs 对应生产者
        #Stream中的消息通信方式遵循了发布-订阅模式
        #在Stream中,处于同一个组的多个消费者是竞争关系,就可以保证消息只被一个服务消费一次,而不同组是可以重复消费的。现在默认分组就是不同的,组流水号不一样。
        #消费者宕机:如果未配置group,则消费者上线后无法消费之前的消息(消息丢失);如果配置了group,则消费上线后可以消费之前的消息(消息持久化)
        testOutput:    #生产者消息输出通道 ---> 消息输出通道 = 生产者相关的定义:Exchange & Queue
          destination: exchange-test          #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输出通道绑定到RabbitMQ的exchange-test交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit       #设置要绑定的消息服务的具体设置,默认绑定RabbitMQ
          group: testGroup                    #分组=Queue名称,如果不设置会使用默认的组流水号
        testInput:     #消费者消息输入通道 ---> 消息输入通道 = 消费者相关的定义:Exchange & Queue
          destination: exchange-test          #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输入通道绑定到RabbitMQ的exchange-test交换器。
          content-type: application/json
          default-binder: defaultRabbit
          group: testGroup

配置参考-boot模式

函数式使用

  • 参考

  • 生产者

    • Supplier
    • StreamBridge
  • 消费者

    • Consumer
    • Function