diff --git a/xjs-study/springcloud-project/pom.xml b/xjs-study/springcloud-project/pom.xml index 00e295fb..942da8fd 100644 --- a/xjs-study/springcloud-project/pom.xml +++ b/xjs-study/springcloud-project/pom.xml @@ -12,6 +12,7 @@ 微服务组件 eureka + stream springcloud-project diff --git a/xjs-study/springcloud-project/stream/pom.xml b/xjs-study/springcloud-project/stream/pom.xml new file mode 100644 index 00000000..8e924550 --- /dev/null +++ b/xjs-study/springcloud-project/stream/pom.xml @@ -0,0 +1,25 @@ + + + + springcloud-project + com.xjs + 1.0 + + 4.0.0 + pom + Stream消息流 + + stream-producer-8600 + stream-consumer-8601 + + + stream + + + 11 + 11 + + + diff --git a/xjs-study/springcloud-project/stream/stream-consumer-8601/pom.xml b/xjs-study/springcloud-project/stream/stream-consumer-8601/pom.xml new file mode 100644 index 00000000..5d499661 --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-consumer-8601/pom.xml @@ -0,0 +1,42 @@ + + + + stream + com.xjs + 1.0 + + 4.0.0 + Stream消费者8600 + + stream-consumer-8601 + + + 11 + 11 + + + + + org.springframework.cloud + spring-cloud-starter-netflix-eureka-client + + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + + org.springframework.boot + spring-boot-starter-test + + + + org.springframework.boot + spring-boot-starter-web + + + + diff --git a/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/java/com/xjs/StreamConsumerApp8601.java b/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/java/com/xjs/StreamConsumerApp8601.java new file mode 100644 index 00000000..a9441aa2 --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/java/com/xjs/StreamConsumerApp8601.java @@ -0,0 +1,18 @@ +package com.xjs; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; + +/** + * @author xiejs + * @since 2022-05-26 + */ +@SpringBootApplication +@EnableDiscoveryClient +public class StreamConsumerApp8601 { + + public static void main(String[] args) { + SpringApplication.run(StreamConsumerApp8601.class, args); + } +} diff --git a/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/java/com/xjs/service/StreamConsumerService.java b/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/java/com/xjs/service/StreamConsumerService.java new file mode 100644 index 00000000..796db832 --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/java/com/xjs/service/StreamConsumerService.java @@ -0,0 +1,20 @@ +package com.xjs.service; + +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Service; + +import java.util.function.Consumer; + +/** + * @author xiejs + * @since 2022-05-26 + */ + +@Service +public class StreamConsumerService { + + @Bean + public Consumer myChannel() { + return message -> System.out.println("消息:"+message); + } +} diff --git a/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/resources/application.yml b/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/resources/application.yml new file mode 100644 index 00000000..d3f26edb --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-consumer-8601/src/main/resources/application.yml @@ -0,0 +1,35 @@ +server: + port: 8601 + +spring: + application: + name: stream-consumer + + cloud: + stream: + binders: #绑定mq服务信息 + rabbitBinder: #名称定义,用于后面的关联 + type: rabbit + environment: #mq环境配置 + spring: + rabbitmq: + host: 192.168.23.128 + port: 5672 + username: guest + password: guest + + bindings: # 关联整合通道和binder对象 + myChannel-in-0: # output是我们定义的通道名称,此处不能乱改 + destination: exchange # 要使用的Exchange名称(消息队列主题名称) + content-type: application/json # application/json # 消息类型设置,比如json + binder: rabbitBinder # 关联MQ服务 + group: default +eureka: + client: + serviceUrl: # eureka server的路径 + defaultZone: http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka + instance: + prefer-ip-address: true #使用ip注册 + instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port} #实例名称 + + diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/pom.xml b/xjs-study/springcloud-project/stream/stream-producer-8600/pom.xml new file mode 100644 index 00000000..e3664a9d --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/pom.xml @@ -0,0 +1,42 @@ + + + + stream + com.xjs + 1.0 + + 4.0.0 + Stream生产者8600 + + stream-producer-8600 + + + 11 + 11 + + + + + org.springframework.cloud + spring-cloud-starter-netflix-eureka-client + + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + + org.springframework.boot + spring-boot-starter-test + + + + org.springframework.boot + spring-boot-starter-web + + + + diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/StreamProducerApp8600.java b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/StreamProducerApp8600.java new file mode 100644 index 00000000..2e05b9d9 --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/StreamProducerApp8600.java @@ -0,0 +1,18 @@ +package com.xjs; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; + +/** + * @author xiejs + * @since 2022-05-26 + */ +@SpringBootApplication +@EnableDiscoveryClient +public class StreamProducerApp8600 { + + public static void main(String[] args) { + SpringApplication.run(StreamProducerApp8600.class, args); + } +} diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/controller/MessageProducerController.java b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/controller/MessageProducerController.java new file mode 100644 index 00000000..edcf3056 --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/controller/MessageProducerController.java @@ -0,0 +1,26 @@ +package com.xjs.controller; + +import com.xjs.service.IMessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author xiejs + * @since 2022-05-26 + */ +@RestController +@RequestMapping("message") +public class MessageProducerController { + @Autowired + private IMessageProducer iMessageProducer; + + + @GetMapping + public String message() { + iMessageProducer.sendMessage(); + + return "xaxa"; + } +} diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/service/IMessageProducer.java b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/service/IMessageProducer.java new file mode 100644 index 00000000..dc75f06d --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/service/IMessageProducer.java @@ -0,0 +1,14 @@ +package com.xjs.service; + +/** + * 消息生产者 + * + * @author xiejs + * @since 2022-05-26 + */ +public interface IMessageProducer { + + void sendMessage(); + + +} diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/service/impl/MessageProducerImpl.java b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/service/impl/MessageProducerImpl.java new file mode 100644 index 00000000..b34d893b --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/java/com/xjs/service/impl/MessageProducerImpl.java @@ -0,0 +1,31 @@ +package com.xjs.service.impl; + +import com.xjs.service.IMessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import java.util.UUID; + +/** + * 消息生产者实现 + * + * @author xiejs + * @since 2022-05-26 + */ +@Service +public class MessageProducerImpl implements IMessageProducer { + + @Autowired + private StreamBridge streamBridge; + + + @Override + public void sendMessage() { + //向mq中发送消息(并不是直接操作mq,而是操作Spring Cloud Stream) + String message = UUID.randomUUID().toString(); + streamBridge.send("myChannel-out-0", MessageBuilder.withPayload(message).build()); + + } +} diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/resources/application.yml b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/resources/application.yml new file mode 100644 index 00000000..818d490b --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/src/main/resources/application.yml @@ -0,0 +1,35 @@ +server: + port: 8600 + +spring: + application: + name: stream-producer + + cloud: + stream: + binders: #绑定mq服务信息 + rabbitBinder: #名称定义,用于后面的关联 + type: rabbit + environment: #mq环境配置 + spring: + rabbitmq: + host: 192.168.23.128 + port: 5672 + username: guest + password: guest + + bindings: # 关联整合通道和binder对象 + myChannel-out-0: # output是我们定义的通道名称,此处不能乱改 + destination: exchange # 要使用的Exchange名称(消息队列主题名称) + content-type: application/json # application/json # 消息类型设置,比如json + binder: rabbitBinder # 关联MQ服务 + group: default +eureka: + client: + serviceUrl: # eureka server的路径 + defaultZone: http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka + instance: + prefer-ip-address: true #使用ip注册 + instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port} #实例名称 + + diff --git a/xjs-study/springcloud-project/stream/stream-producer-8600/src/test/java/com/xjs/service/impl/MessageProducerImplTest.java b/xjs-study/springcloud-project/stream/stream-producer-8600/src/test/java/com/xjs/service/impl/MessageProducerImplTest.java new file mode 100644 index 00000000..2d6ccf36 --- /dev/null +++ b/xjs-study/springcloud-project/stream/stream-producer-8600/src/test/java/com/xjs/service/impl/MessageProducerImplTest.java @@ -0,0 +1,24 @@ +package com.xjs.service.impl; + +import com.xjs.StreamProducerApp8600; +import com.xjs.service.IMessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author xiejs + * @since 2022-05-26 + */ +@SpringBootTest(classes = StreamProducerApp8600.class) +class MessageProducerImplTest { + + @Autowired + private IMessageProducer iMessageProducer; + + @org.junit.jupiter.api.Test + void sendMessage() { + iMessageProducer.sendMessage(); + } +}