From d513def4873601d4e1e7697bdb31843da05cd172 Mon Sep 17 00:00:00 2001 From: heqijun Date: Thu, 12 Jun 2025 17:37:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BA=8A=E6=9E=B6=E4=BD=A0=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E8=80=85=EF=BC=8C=E6=8E=A5=E6=94=B6=E5=86=99=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E9=98=9F=E5=88=97=E4=B8=AD=E7=9A=84=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/RestHighLevelClientConfig.java | 41 +++++++++++++++++++ .../search/mq/SmsWriteLogListener.java | 32 +++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java create mode 100644 beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java diff --git a/beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java b/beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java new file mode 100644 index 0000000..d3b0ee0 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java @@ -0,0 +1,41 @@ +package com.mashibing.search.config; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author heqijun + * @ClassName: RestHighLevelClient + * @Description: TODO(这里用一句话描述这个类的作用) + * @date 2025/6/12 17:09 + */ + +@Configuration +public class RestHighLevelClientConfig { + + @Value("${elasticsearch.hostAndPorts}") + private String[] hostAndPorts; + + @Bean + public RestHighLevelClient restHighLevelClient() { + + HttpHost[] httpHost = new HttpHost[hostAndPorts.length]; + for (int i = 0; i < httpHost.length; i++) { + String[] hostAndPort = hostAndPorts[i].split(":"); + httpHost[i] = new HttpHost(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + } + + RestClientBuilder restClientBuilder = RestClient.builder(httpHost); + + RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder); + + return restHighLevelClient; + } + + +} diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java new file mode 100644 index 0000000..7eaa2df --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java @@ -0,0 +1,32 @@ +package com.mashibing.search.mq; + +import com.mashibing.common.constant.RabbitMQConstant; +import com.mashibing.common.pojo.StandardSubmit; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author heqijun + * @ClassName: SmsWriteLogListener + * @Description: 搜索模块监听写日志队列 + * @date 2025/6/12 17:31 + */ + +@Slf4j +@Component +public class SmsWriteLogListener { + + @RabbitListener(queues = {RabbitMQConstant.SMS_WRITE_LOG}) + public void process(StandardSubmit submit, Channel channel, Message message) throws IOException { + + log.info("接收到存储日志的信息,submit={}", submit); + + //手动ack + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } +}