SpringCloudStream
大约 2 分钟
SpringCloudStream
介绍
- 为微服务应用构建消息驱动能力的框架;
- 引入了发布与订阅、 消费组以及分区这三个核心概念
- 遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)
RabbitMQ Binder
注解使用
- 注解方式,3.1起已弃用,不推荐使用,支持函数式编程模型 如 @Input @Output @StreamListener
- 使用方法见参考
配置参考1
server:
port: 8088
spring:
application:
name: stream_demo
cloud:
stream:
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #配置rabbitmq连接环境
spring:
rabbitmq:
host: **.**.**.** #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: **** #rabbitmq 用户名
password: **** #rabbitmq 密码
virtual-host: / #虚拟路径
bindings: #服务的整合处理
#inputs 对应消费者,outputs 对应生产者
#Stream中的消息通信方式遵循了发布-订阅模式
#在Stream中,处于同一个组的多个消费者是竞争关系,就可以保证消息只被一个服务消费一次,而不同组是可以重复消费的。现在默认分组就是不同的,组流水号不一样。
#消费者宕机:如果未配置group,则消费者上线后无法消费之前的消息(消息丢失);如果配置了group,则消费上线后可以消费之前的消息(消息持久化)
testOutput: #生产者消息输出通道 ---> 消息输出通道 = 生产者相关的定义:Exchange & Queue
destination: exchange-test #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输出通道绑定到RabbitMQ的exchange-test交换器。
content-type: application/json #设置消息的类型,本次为json
default-binder: defaultRabbit #设置要绑定的消息服务的具体设置,默认绑定RabbitMQ
group: testGroup #分组=Queue名称,如果不设置会使用默认的组流水号
testInput: #消费者消息输入通道 ---> 消息输入通道 = 消费者相关的定义:Exchange & Queue
destination: exchange-test #exchange名称,交换模式默认是topic;把SpringCloud Stream的消息输入通道绑定到RabbitMQ的exchange-test交换器。
content-type: application/json
default-binder: defaultRabbit
group: testGroup
配置参考-boot模式
函数式使用
生产者
- Supplier
- StreamBridge
消费者
- Consumer
- Function