引入pom.xml
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
配置client
/** * @author huangdeyao * @date 2019/7/2 12:50 */ public interface StreamClient { /** * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道 */ String STREAM_INPUT = "stream-input"; String STREAM_OUTPUT = "stream-output"; /** * 订阅消息 * * @return */ @Input(StreamClient.STREAM_INPUT) SubscribableChannel input(); /** * 消息发送 * * @return */ @Output(StreamClient.STREAM_OUTPUT) MessageChannel output(); }
接收端
/** * @author huangdeyao * @date 2019/7/2 13:04 */ @Component @EnableBinding(StreamClient.class) @Log4j2 public class StreamReceiver { @StreamListener(StreamClient.STREAM_INPUT) public void receive(Object message) { log.info("stream input receiver: {}", message); } }
发送端
@Autowired StreamClient streamClient; @GetMapping("/send") public String messageWithMQ(String message) { streamClient.output().send(MessageBuilder.withPayload(message).build()); return "ok"; }
遇到的错误
org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'stream_input' defined in com.dy.spring.client.StreamClient: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.dy.spring.client.StreamClient; factoryMethodName=input; initMethodName=null; destroyMethodName=null at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:68) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE] at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:55) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE] at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:92) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE] at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:410) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:389) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:82) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE] at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:44) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_191] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE] at com.dy.spring.SpringDomeApplication.main(SpringDomeApplication.java:13) [classes/:na] Disconnected from the target VM, address: '127.0.0.1:57769', transport: 'socket' Process finished with exit code 1
解决StreamClient 中不能通道名,使用不同的名字
修改public interface StreamClient { /** * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道 */ String STREAM_INPUT = "stream-input"; String STREAM_OUTPUT = "stream-output"; ...... 略 } yml配置,互相指向成一个通道stream-output spring: cloud: stream: bindings: stream-input: destination: stream-output
多实例接收多个消息解决,yml配置group
spring: rabbitmq: port: 5672 addresses: 192.168.177.129 username: guest password: guest cloud: stream: bindings: stream-input: destination: stream-output group: my-stream
- 接收后回调信息
StreamClient.class
/**
* 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
*/
String STREAM_INPUT_2 = "stream-input-2";
String STREAM_OUTPUT_2 = "stream-output-2";
/**
* 订阅消息
*
* @return
*/
@Input(StreamClient.STREAM_INPUT_2)
SubscribableChannel input2();
/**
* 消息发送
*
* @return
*/
@Output(StreamClient.STREAM_OUTPUT_2)
MessageChannel output2();
StreamReceiver.class
@StreamListener(StreamClient.STREAM_INPUT)
@SendTo(StreamClient.STREAM_INPUT_2)
public UserEntity process(UserEntity userEntity) {
log.info("stream input receiver: {}", userEntity.toString());
return userEntity;
}
@StreamListener(StreamClient.STREAM_INPUT_2)
public void process2(UserEntity userEntity) {
log.info("STREAM_INPUT_2 input receiver: {}", userEntity.toString());
}
yml
spring:
rabbitmq:
port: 5672
addresses: 192.168.177.129
username: guest
password: guest
cloud:
stream:
bindings:
stream-input:
destination: stream-output
group: my-stream
content-type: application/json
stream-input-2:
destination: stream-output-2
group: my-stream
content-type: application/json