引入pom.xml

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
  1. 配置client

    /**
     * @author huangdeyao
     * @date 2019/7/2 12:50
     */
    public interface StreamClient {
     /**
      * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
      */
     String STREAM_INPUT = "stream-input";
     String STREAM_OUTPUT = "stream-output";
    
     /**
      * 订阅消息
      *
      * @return
      */
     @Input(StreamClient.STREAM_INPUT)
     SubscribableChannel input();
    
     /**
      * 消息发送
      *
      * @return
      */
     @Output(StreamClient.STREAM_OUTPUT)
     MessageChannel output();
    }
  2. 接收端

    /**
     * @author huangdeyao
     * @date 2019/7/2 13:04
     */
    @Component
    @EnableBinding(StreamClient.class)
    @Log4j2
    public class StreamReceiver {
    
     @StreamListener(StreamClient.STREAM_INPUT)
     public void receive(Object message) {
         log.info("stream input receiver: {}", message);
     }
    }
    
  3. 发送端

     @Autowired
     StreamClient streamClient;
    
     @GetMapping("/send")
     public String messageWithMQ(String message) {
         streamClient.output().send(MessageBuilder.withPayload(message).build());
         return "ok";
     }
  4. 遇到的错误

    org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'stream_input' defined in com.dy.spring.client.StreamClient: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.dy.spring.client.StreamClient; factoryMethodName=input; initMethodName=null; destroyMethodName=null
     at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:68) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
     at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:55) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
     at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:92) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
     at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:410) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:389) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:82) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
     at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:44) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
     at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_191]
     at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
     at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
     at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
     at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
     at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
     at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
     at com.dy.spring.SpringDomeApplication.main(SpringDomeApplication.java:13) [classes/:na]
    
    Disconnected from the target VM, address: '127.0.0.1:57769', transport: 'socket'
    
    Process finished with exit code 1
    
    解决StreamClient 中不能通道名,使用不同的名字
    修改
    public interface StreamClient {
     /**
      * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
      */
     String STREAM_INPUT = "stream-input";
     String STREAM_OUTPUT = "stream-output";
     
    ...... 略
    }
    yml配置,互相指向成一个通道stream-output
    spring:
      cloud:
     stream:
       bindings:
         stream-input:
           destination: stream-output
  5. 多实例接收多个消息解决,yml配置group

    spring:
      rabbitmq:
     port: 5672
     addresses: 192.168.177.129
     username: guest
     password: guest
      cloud:
     stream:
       bindings:
         stream-input:
           destination: stream-output
           group: my-stream
  6. 接收后回调信息
StreamClient.class
    /**
     * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
     */
    String STREAM_INPUT_2 = "stream-input-2";
    String STREAM_OUTPUT_2 = "stream-output-2";
  /**
     * 订阅消息
     *
     * @return
     */
    @Input(StreamClient.STREAM_INPUT_2)
    SubscribableChannel input2();

    /**
     * 消息发送
     *
     * @return
     */
    @Output(StreamClient.STREAM_OUTPUT_2)
    MessageChannel output2();
StreamReceiver.class
    @StreamListener(StreamClient.STREAM_INPUT)
    @SendTo(StreamClient.STREAM_INPUT_2)
    public UserEntity process(UserEntity userEntity) {
        log.info("stream input receiver: {}", userEntity.toString());
        return userEntity;
    }

    @StreamListener(StreamClient.STREAM_INPUT_2)
    public void process2(UserEntity userEntity) {
        log.info("STREAM_INPUT_2  input receiver: {}", userEntity.toString());
    }
yml
spring:
  rabbitmq:
    port: 5672
    addresses: 192.168.177.129
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        stream-input:
          destination: stream-output
          group: my-stream
          content-type: application/json
        stream-input-2:
          destination: stream-output-2
          group: my-stream
          content-type: application/json

项目地址

reference

Spring Cloud Stream如何消费自己生产的消息?

最后修改:2020 年 02 月 13 日
如果觉得我的文章对你有用,请随意赞赏