mirror of https://github.com/longtai-cn/hippo4j
Add springcloud stream rocketmq sample (#227)
parent
24bb82c491
commit
b8f8a51937
@ -0,0 +1,55 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-example</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example</artifactId>
|
||||
|
||||
<properties>
|
||||
<spring-cloud-starter-stream-rocketmq.version>2.2.6.RELEASE</spring-cloud-starter-stream-rocketmq.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
||||
<version>${spring-cloud-starter-stream-rocketmq.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-json</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-spring-boot-starter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@ -0,0 +1,19 @@
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example;
|
||||
|
||||
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.messaging.Source;
|
||||
|
||||
@Slf4j
|
||||
@SpringBootApplication
|
||||
@EnableDynamicThreadPool
|
||||
@EnableBinding({Source.class, MySink.class})
|
||||
public class Hippo4jAdapterSpringCloudStreamRocketMQApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Hippo4jAdapterSpringCloudStreamRocketMQApplication.class, args);
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.messaging.handler.annotation.Headers;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Message consume.
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MessageConsume {
|
||||
|
||||
@StreamListener(MySink.INPUT)
|
||||
public void consumeMessage(@Payload SendMessageDTO message, @Headers Map headers) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
// ignore
|
||||
} finally {
|
||||
log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime, JSON.toJSONString(message));
|
||||
}
|
||||
log.info("Input current thread name: {}", Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
@StreamListener(MySink.INPUT2)
|
||||
public void consumeSaveMessage(@Payload SendMessageDTO message, @Headers Map headers) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
// ignore
|
||||
} finally {
|
||||
log.info("Keys: {}, Msg id: {}, Execute time: {} ms, Message: {}", headers.get("rocketmq_KEYS"), headers.get("rocketmq_MESSAGE_ID"), System.currentTimeMillis() - startTime, JSON.toJSONString(message));
|
||||
}
|
||||
log.info("Input2 current thread name: {}", Thread.currentThread().getName());
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Message produce.
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@AllArgsConstructor
|
||||
public class MessageProduce {
|
||||
|
||||
private final MessageChannel output;
|
||||
|
||||
public static final String MESSAGE_CENTER_SEND_MESSAGE_TAG = "framework_message-center_send-message_tag";
|
||||
|
||||
public static final String MESSAGE_CENTER_SAVE_MESSAGE_TAG = "framework_message-center_save-message_tag";
|
||||
|
||||
@GetMapping("/message/send")
|
||||
public String sendMessage() {
|
||||
int maxSendSize = 10;
|
||||
for (int i = 0; i < maxSendSize; i++) {
|
||||
sendMessage(MESSAGE_CENTER_SEND_MESSAGE_TAG);
|
||||
sendMessage(MESSAGE_CENTER_SAVE_MESSAGE_TAG);
|
||||
}
|
||||
return "success";
|
||||
}
|
||||
|
||||
private void sendMessage(String tags) {
|
||||
String keys = UUID.randomUUID().toString();
|
||||
SendMessageDTO payload = SendMessageDTO.builder()
|
||||
.receiver("156011xxx91")
|
||||
.uid(keys)
|
||||
.build();
|
||||
Message<?> message = MessageBuilder
|
||||
.withPayload(JSON.toJSONString(payload))
|
||||
.setHeader(MessageConst.PROPERTY_KEYS, keys)
|
||||
.setHeader(MessageConst.PROPERTY_TAGS, tags)
|
||||
.build();
|
||||
long startTime = System.currentTimeMillis();
|
||||
boolean sendResult = false;
|
||||
try {
|
||||
sendResult = output.send(message, 2000L);
|
||||
} finally {
|
||||
log.info("Send status: {}, Keys: {}, Execute time: {} ms, Message: {}",
|
||||
sendResult,
|
||||
keys,
|
||||
System.currentTimeMillis() - startTime,
|
||||
JSON.toJSONString(payload));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example;
|
||||
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.messaging.Sink;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
/**
|
||||
* My sink.
|
||||
*/
|
||||
public interface MySink extends Sink {
|
||||
|
||||
/**
|
||||
* Input channel name.
|
||||
*/
|
||||
String INPUT2 = "input2";
|
||||
|
||||
/**
|
||||
* @return input channel.
|
||||
*/
|
||||
@Input(MySink.INPUT2)
|
||||
SubscribableChannel input2();
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq.example;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Send message dto.
|
||||
*/
|
||||
@Data
|
||||
@Accessors(chain = true)
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class SendMessageDTO implements Serializable {
|
||||
|
||||
private String uid;
|
||||
|
||||
private String receiver;
|
||||
|
||||
public SendMessageDTO uid(String uid) {
|
||||
this.uid = uid;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SendMessageDTO receiver(String receiver) {
|
||||
this.receiver = receiver;
|
||||
return this;
|
||||
}
|
||||
|
||||
public static SendMessageDTO builder() {
|
||||
return new SendMessageDTO();
|
||||
}
|
||||
|
||||
public SendMessageDTO build() {
|
||||
return this;
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
server.port=8090
|
||||
|
||||
spring.profiles.active=dev
|
||||
spring.dynamic.thread-pool.server-addr=http://localhost:6691
|
||||
spring.dynamic.thread-pool.namespace=prescription
|
||||
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
|
||||
spring.dynamic.thread-pool.username=admin
|
||||
spring.dynamic.thread-pool.password=123456
|
||||
|
||||
# Please replace the local configuration.
|
||||
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
|
||||
|
||||
# === produce ===
|
||||
spring.cloud.stream.bindings.output.content-type=application/json
|
||||
spring.cloud.stream.bindings.output.destination=framework_message-center_topic
|
||||
spring.cloud.stream.bindings.output.group=framework_message-center_send-message_pg
|
||||
|
||||
# === consume ===
|
||||
spring.cloud.stream.bindings.input.content-type=application/json
|
||||
spring.cloud.stream.bindings.input.destination=framework_message-center_topic
|
||||
spring.cloud.stream.bindings.input.group=framework_message-center_send-message_cg
|
||||
spring.cloud.stream.bindings.input.consumer.max-attempts=1
|
||||
spring.cloud.stream.bindings.input.consumer.concurrency=1
|
||||
spring.cloud.stream.rocketmq.bindings.input.consumer.tags=framework_message-center_send-message_tag
|
||||
spring.cloud.stream.rocketmq.bindings.input.consumer.delay-level-when-next-consume=-1
|
||||
|
||||
spring.cloud.stream.bindings.input2.content-type=application/json
|
||||
spring.cloud.stream.bindings.input2.destination=framework_message-center_topic
|
||||
spring.cloud.stream.bindings.input2.group=framework_message-center_save-message_cg
|
||||
spring.cloud.stream.bindings.input2.consumer.max-attempts=1
|
||||
spring.cloud.stream.bindings.input2.consumer.concurrency=1
|
||||
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=framework_message-center_save-message_tag
|
||||
spring.cloud.stream.rocketmq.bindings.input2.consumer.delay-level-when-next-consume=-1
|
Loading…
Reference in new issue