引入pom.xml
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
- 配置client
/**
* @author huangdeyao
* @date 2019/7/2 12:50
*/
public interface StreamClient {
/**
* 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
*/
String STREAM_INPUT = "stream-input";
String STREAM_OUTPUT = "stream-output";
/**
* 订阅消息
*
* @return
*/
@Input(StreamClient.STREAM_INPUT)
SubscribableChannel input();
/**
* 消息发送
*
* @return
*/
@Output(StreamClient.STREAM_OUTPUT)
MessageChannel output();
}
- 接收端
/**
* @author huangdeyao
* @date 2019/7/2 13:04
*/
@Component
@EnableBinding(StreamClient.class)
@Log4j2
public class StreamReceiver {
@StreamListener(StreamClient.STREAM_INPUT)
public void receive(Object message) {
log.info("stream input receiver: {}", message);
}
}
- 发送端
@Autowired
StreamClient streamClient;
@GetMapping("/send")
public String messageWithMQ(String message) {
streamClient.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
- 遇到的错误
org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'stream_input' defined in com.dy.spring.client.StreamClient: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.dy.spring.client.StreamClient; factoryMethodName=input; initMethodName=null; destroyMethodName=null
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:68) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:55) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:92) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:410) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:389) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:82) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:44) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_191]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at com.dy.spring.SpringDomeApplication.main(SpringDomeApplication.java:13) [classes/:na]
Disconnected from the target VM, address: '127.0.0.1:57769', transport: 'socket'
Process finished with exit code 1
解决StreamClient 中不能通道名,使用不同的名字
修改
public interface StreamClient {
/**
* 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
*/
String STREAM_INPUT = "stream-input";
String STREAM_OUTPUT = "stream-output";
...... 略
}
yml配置,互相指向成一个通道stream-output
spring:
cloud:
stream:
bindings:
stream-input:
destination: stream-output
- 多实例接收多个消息解决,yml配置group
spring:
rabbitmq:
port: 5672
addresses: 192.168.177.129
username: guest
password: guest
cloud:
stream:
bindings:
stream-input:
destination: stream-output
group: my-stream
- 接收后回调信息
StreamClient.class
/**
* 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
*/
String STREAM_INPUT_2 = "stream-input-2";
String STREAM_OUTPUT_2 = "stream-output-2";
/**
* 订阅消息
*
* @return
*/
@Input(StreamClient.STREAM_INPUT_2)
SubscribableChannel input2();
/**
* 消息发送
*
* @return
*/
@Output(StreamClient.STREAM_OUTPUT_2)
MessageChannel output2();
StreamReceiver.class
@StreamListener(StreamClient.STREAM_INPUT)
@SendTo(StreamClient.STREAM_INPUT_2)
public UserEntity process(UserEntity userEntity) {
log.info("stream input receiver: {}", userEntity.toString());
return userEntity;
}
@StreamListener(StreamClient.STREAM_INPUT_2)
public void process2(UserEntity userEntity) {
log.info("STREAM_INPUT_2 input receiver: {}", userEntity.toString());
}
yml
spring:
rabbitmq:
port: 5672
addresses: 192.168.177.129
username: guest
password: guest
cloud:
stream:
bindings:
stream-input:
destination: stream-output
group: my-stream
content-type: application/json
stream-input-2:
destination: stream-output-2
group: my-stream
content-type: application/json