From b8f8a519373c246d7de80813ba6069d9147ac661 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Tue, 17 May 2022 08:35:47 +0800 Subject: [PATCH] Add springcloud stream rocketmq sample (#227) --- .../pom.xml | 55 ++++++++++++++++ ...rSpringCloudStreamRocketMQApplication.java | 19 ++++++ .../rocketmq/example/MessageConsume.java | 40 ++++++++++++ .../rocketmq/example/MessageProduce.java | 62 +++++++++++++++++++ .../stream/rocketmq/example/MySink.java | 22 +++++++ .../rocketmq/example/SendMessageDTO.java | 57 +++++++++++++++++ .../src/main/resources/application.properties | 33 ++++++++++ 7 files changed, 288 insertions(+) create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/pom.xml create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/Hippo4jAdapterSpringCloudStreamRocketMQApplication.java create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageConsume.java create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageProduce.java create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MySink.java create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/SendMessageDTO.java create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/resources/application.properties diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/pom.xml b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/pom.xml new file mode 100644 index 00000000..edc4a637 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-example + ${revision} + + hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example + + + 2.2.6.RELEASE + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + ${spring-cloud-starter-stream-rocketmq.version} + + + + org.springframework.boot + spring-boot-starter-json + + + + org.projectlombok + lombok + + + + cn.hippo4j + hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq + ${project.version} + + + + cn.hippo4j + hippo4j-spring-boot-starter + ${project.version} + + + diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/Hippo4jAdapterSpringCloudStreamRocketMQApplication.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/Hippo4jAdapterSpringCloudStreamRocketMQApplication.java new file mode 100644 index 00000000..90d0a4ff --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/Hippo4jAdapterSpringCloudStreamRocketMQApplication.java @@ -0,0 +1,19 @@ +package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example; + +import cn.hippo4j.core.enable.EnableDynamicThreadPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; + +@Slf4j +@SpringBootApplication +@EnableDynamicThreadPool +@EnableBinding({Source.class, MySink.class}) +public class Hippo4jAdapterSpringCloudStreamRocketMQApplication { + + public static void main(String[] args) { + SpringApplication.run(Hippo4jAdapterSpringCloudStreamRocketMQApplication.class, args); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageConsume.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageConsume.java new file mode 100644 index 00000000..acfa4744 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageConsume.java @@ -0,0 +1,40 @@ +package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * Message consume. + */ +@Slf4j +@Component +public class MessageConsume { + + @StreamListener(MySink.INPUT) + public void consumeMessage(@Payload SendMessageDTO message, @Headers Map headers) { + long startTime = System.currentTimeMillis(); + try { + // ignore + } finally { + log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime, JSON.toJSONString(message)); + } + log.info("Input current thread name: {}", Thread.currentThread().getName()); + } + + @StreamListener(MySink.INPUT2) + public void consumeSaveMessage(@Payload SendMessageDTO message, @Headers Map headers) { + long startTime = System.currentTimeMillis(); + try { + // ignore + } finally { + log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime, JSON.toJSONString(message)); + } + log.info("Input2 current thread name: {}", Thread.currentThread().getName()); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageProduce.java new file mode 100644 index 00000000..eef4f62c --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MessageProduce.java @@ -0,0 +1,62 @@ +package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example; + +import com.alibaba.fastjson.JSON; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageConst; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.UUID; + +/** + * Message produce. + */ +@Slf4j +@RestController +@AllArgsConstructor +public class MessageProduce { + + private final MessageChannel output; + + public static final String MESSAGE_CENTER_SEND_MESSAGE_TAG = "framework_message-center_send-message_tag"; + + public static final String MESSAGE_CENTER_SAVE_MESSAGE_TAG = "framework_message-center_save-message_tag"; + + @GetMapping("/message/send") + public String sendMessage() { + int maxSendSize = 10; + for (int i = 0; i < maxSendSize; i++) { + sendMessage(MESSAGE_CENTER_SEND_MESSAGE_TAG); + sendMessage(MESSAGE_CENTER_SAVE_MESSAGE_TAG); + } + return "success"; + } + + private void sendMessage(String tags) { + String keys = UUID.randomUUID().toString(); + SendMessageDTO payload = SendMessageDTO.builder() + .receiver("156011xxx91") + .uid(keys) + .build(); + Message message = MessageBuilder + .withPayload(JSON.toJSONString(payload)) + .setHeader(MessageConst.PROPERTY_KEYS, keys) + .setHeader(MessageConst.PROPERTY_TAGS, tags) + .build(); + long startTime = System.currentTimeMillis(); + boolean sendResult = false; + try { + sendResult = output.send(message, 2000L); + } finally { + log.info("Send status: {}, Keys: {}, Execute time: {} ms, Message: {}", + sendResult, + keys, + System.currentTimeMillis() - startTime, + JSON.toJSONString(payload)); + } + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MySink.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MySink.java new file mode 100644 index 00000000..22640300 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/MySink.java @@ -0,0 +1,22 @@ +package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.messaging.SubscribableChannel; + +/** + * My sink. + */ +public interface MySink extends Sink { + + /** + * Input channel name. + */ + String INPUT2 = "input2"; + + /** + * @return input channel. + */ + @Input(MySink.INPUT2) + SubscribableChannel input2(); +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/SendMessageDTO.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/SendMessageDTO.java new file mode 100644 index 00000000..21fef863 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rocketmq/example/SendMessageDTO.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.Serializable; + +/** + * Send message dto. + */ +@Data +@Accessors(chain = true) +@NoArgsConstructor +@AllArgsConstructor +public class SendMessageDTO implements Serializable { + + private String uid; + + private String receiver; + + public SendMessageDTO uid(String uid) { + this.uid = uid; + return this; + } + + public SendMessageDTO receiver(String receiver) { + this.receiver = receiver; + return this; + } + + public static SendMessageDTO builder() { + return new SendMessageDTO(); + } + + public SendMessageDTO build() { + return this; + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/resources/application.properties b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/resources/application.properties new file mode 100644 index 00000000..42e6f4dc --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example/src/main/resources/application.properties @@ -0,0 +1,33 @@ +server.port=8090 + +spring.profiles.active=dev +spring.dynamic.thread-pool.server-addr=http://localhost:6691 +spring.dynamic.thread-pool.namespace=prescription +spring.dynamic.thread-pool.item-id=dynamic-threadpool-example +spring.dynamic.thread-pool.username=admin +spring.dynamic.thread-pool.password=123456 + +# Please replace the local configuration. +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 + +# === produce === +spring.cloud.stream.bindings.output.content-type=application/json +spring.cloud.stream.bindings.output.destination=framework_message-center_topic +spring.cloud.stream.bindings.output.group=framework_message-center_send-message_pg + +# === consume === +spring.cloud.stream.bindings.input.content-type=application/json +spring.cloud.stream.bindings.input.destination=framework_message-center_topic +spring.cloud.stream.bindings.input.group=framework_message-center_send-message_cg +spring.cloud.stream.bindings.input.consumer.max-attempts=1 +spring.cloud.stream.bindings.input.consumer.concurrency=1 +spring.cloud.stream.rocketmq.bindings.input.consumer.tags=framework_message-center_send-message_tag +spring.cloud.stream.rocketmq.bindings.input.consumer.delay-level-when-next-consume=-1 + +spring.cloud.stream.bindings.input2.content-type=application/json +spring.cloud.stream.bindings.input2.destination=framework_message-center_topic +spring.cloud.stream.bindings.input2.group=framework_message-center_save-message_cg +spring.cloud.stream.bindings.input2.consumer.max-attempts=1 +spring.cloud.stream.bindings.input2.consumer.concurrency=1 +spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=framework_message-center_save-message_tag +spring.cloud.stream.rocketmq.bindings.input2.consumer.delay-level-when-next-consume=-1