diff --git a/beacon-api/src/main/java/com/mashibing/api/filter/impl/SignCheckFilter.java b/beacon-api/src/main/java/com/mashibing/api/filter/impl/SignCheckFilter.java index 312f2ed..f7d9e0a 100644 --- a/beacon-api/src/main/java/com/mashibing/api/filter/impl/SignCheckFilter.java +++ b/beacon-api/src/main/java/com/mashibing/api/filter/impl/SignCheckFilter.java @@ -42,7 +42,6 @@ public class SignCheckFilter implements CheckFilter { private final String SIGN_ID = "id"; - @Override public void check(StandardSubmit submit) { log.info("【接口模块-校验签名】 校验ing…………"); diff --git a/beacon-cache/src/main/java/com/mashibing/cache/controller/CacheController.java b/beacon-cache/src/main/java/com/mashibing/cache/controller/CacheController.java index 4ea4e9d..4f50b31 100644 --- a/beacon-cache/src/main/java/com/mashibing/cache/controller/CacheController.java +++ b/beacon-cache/src/main/java/com/mashibing/cache/controller/CacheController.java @@ -152,5 +152,13 @@ public class CacheController { return result; } + @PostMapping(value = "/cache/keys/{pattern}") + public Set keys(@PathVariable String pattern){ + log.info("【缓存模块】 keys方法,根据pattern查询key的信息 pattern = {}" ,pattern); + Set keys = redisTemplate.keys(pattern); + log.info("【缓存模块】 keys方法,根据pattern查询key的信息 pattern = {},查询出全部的key信息 keys = {}" ,pattern,keys); + return keys; + } + } diff --git a/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java b/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java index 71aa11b..8448ffe 100644 --- a/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java +++ b/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java @@ -29,7 +29,7 @@ public interface RabbitMQConstants { String SMS_PUSH_REPORT = "sms_push_report_topic"; /** - * 策略模块推送消息到短信网关模块的队列前缀名称 + * 策略模块推送消息到短信网关模块的队列前缀名称,后面需要追加通道的id */ String SMS_GATEWAY = "sms_gateway_topic_"; diff --git a/beacon-monitor/pom.xml b/beacon-monitor/pom.xml new file mode 100644 index 0000000..2275daa --- /dev/null +++ b/beacon-monitor/pom.xml @@ -0,0 +1,61 @@ + + + + beacon-cloud + com.mashibing + 1.0-SNAPSHOT + + 4.0.0 + + beacon-monitor + + + + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + com.mashibing + beacon-common + 1.0-SNAPSHOT + + + + com.xuxueli + xxl-job-core + 2.3.1 + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.cloud + spring-cloud-starter-openfeign + + + + org.springframework.boot + spring-boot-starter-mail + + + + + \ No newline at end of file diff --git a/beacon-monitor/src/main/java/com/mashibing/monitor/MonitorStarterApp.java b/beacon-monitor/src/main/java/com/mashibing/monitor/MonitorStarterApp.java new file mode 100644 index 0000000..a5ecbd5 --- /dev/null +++ b/beacon-monitor/src/main/java/com/mashibing/monitor/MonitorStarterApp.java @@ -0,0 +1,21 @@ +package com.mashibing.monitor; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.cloud.openfeign.EnableFeignClients; + +/** + * @author 郑大仙丶 + * @version V1.0.0 + */ +@SpringBootApplication +@EnableDiscoveryClient +@EnableFeignClients +public class MonitorStarterApp { + + public static void main(String[] args) { + SpringApplication.run(MonitorStarterApp.class,args); + } + +} diff --git a/beacon-monitor/src/main/java/com/mashibing/monitor/client/CacheClient.java b/beacon-monitor/src/main/java/com/mashibing/monitor/client/CacheClient.java new file mode 100644 index 0000000..4ec8db8 --- /dev/null +++ b/beacon-monitor/src/main/java/com/mashibing/monitor/client/CacheClient.java @@ -0,0 +1,18 @@ +package com.mashibing.monitor.client; + +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; + +import java.util.Set; + +/** + * @author 郑大仙丶 + * @version V1.0.0 + */ +@FeignClient(value = "beacon-cache") +public interface CacheClient { + + @PostMapping(value = "/cache/keys/{pattern}") + Set keys(@PathVariable String pattern); +} diff --git a/beacon-monitor/src/main/java/com/mashibing/monitor/config/XxlJobConfig.java b/beacon-monitor/src/main/java/com/mashibing/monitor/config/XxlJobConfig.java new file mode 100644 index 0000000..6a79a70 --- /dev/null +++ b/beacon-monitor/src/main/java/com/mashibing/monitor/config/XxlJobConfig.java @@ -0,0 +1,55 @@ +package com.mashibing.monitor.config; + +import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author 郑大仙丶 + * @version V1.0.0 + */ +@Configuration +@Slf4j +public class XxlJobConfig { + + @Value("${xxl.job.admin.addresses}") + private String adminAddresses; + + @Value("${xxl.job.accessToken}") + private String accessToken; + + @Value("${xxl.job.executor.appname}") + private String appname; + + @Value("${xxl.job.executor.address}") + private String address; + + @Value("${xxl.job.executor.ip}") + private String ip; + + @Value("${xxl.job.executor.port}") + private int port; + + @Value("${xxl.job.executor.logpath}") + private String logPath; + + @Value("${xxl.job.executor.logretentiondays}") + private int logRetentionDays; + + + @Bean + public XxlJobSpringExecutor xxlJobExecutor() { + log.info(">>>>>>>>>>> xxl-job config init."); + XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); + xxlJobSpringExecutor.setAdminAddresses(adminAddresses); + xxlJobSpringExecutor.setAppname(appname); + xxlJobSpringExecutor.setIp(ip); + xxlJobSpringExecutor.setPort(port); +// xxlJobSpringExecutor.setAccessToken(accessToken); + xxlJobSpringExecutor.setLogPath(logPath); + xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); + return xxlJobSpringExecutor; + } +} diff --git a/beacon-monitor/src/main/java/com/mashibing/monitor/task/MonitorQueueMessageCountTask.java b/beacon-monitor/src/main/java/com/mashibing/monitor/task/MonitorQueueMessageCountTask.java new file mode 100644 index 0000000..350afcf --- /dev/null +++ b/beacon-monitor/src/main/java/com/mashibing/monitor/task/MonitorQueueMessageCountTask.java @@ -0,0 +1,92 @@ +package com.mashibing.monitor.task; + +import com.mashibing.common.constant.RabbitMQConstants; +import com.mashibing.monitor.client.CacheClient; +import com.mashibing.monitor.util.MailUtil; +import com.rabbitmq.client.Channel; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.Connection; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.mail.MessagingException; +import java.io.IOException; +import java.util.Set; + +/** + * 监控队列中的消息个数,如果队列消息超过10000条,直接发送短信,通知。 + * @author 郑大仙丶 + * @version V1.0.0 + */ +@Component +@Slf4j +public class MonitorQueueMessageCountTask { + + String text = "

您的队列消息队列堆积了,队名为%s,消息个数为%s"; + + // 队列消息限制 + private final long MESSAGE_COUNT_LIMIT = 10000; + + // 查询队列名称的固定pattern + private final String QUEUE_PATTERN = "channel:*"; + + // 得到需要截取channelID的索引地址 + private final int CHANNEL_ID_INDEX = QUEUE_PATTERN.indexOf("*"); + + // 注入RabbitMQ的ConnectionFactory + @Autowired + private ConnectionFactory connectionFactory; + + @Autowired + private CacheClient cacheClient; + + @Autowired + private MailUtil mailUtil; + + @XxlJob("monitorQueueMessageCountTask") + public void monitor() throws MessagingException { + //1、拿到所有的队列名称 + Set keys = cacheClient.keys(QUEUE_PATTERN); + + + + //2、需要基于channel去操作 + Connection connection = connectionFactory.createConnection(); + Channel channel = connection.createChannel(false); + listenQueueAndSendEmail(channel,RabbitMQConstants.SMS_PRE_SEND); + for (String key : keys) { + // 封装队列名称 + String queueName = RabbitMQConstants.SMS_GATEWAY + key.substring(CHANNEL_ID_INDEX); + listenQueueAndSendEmail(channel, queueName); + } + + + + + } + + private void listenQueueAndSendEmail(Channel channel, String queueName) throws MessagingException { + // 队列不存在,直接构建,如果已经存在,直接忽略 + try { + channel.queueDeclare(queueName,true,false,false,null); + } catch (IOException e) { + e.printStackTrace(); + } + //3、拿到对应队列的消息,确认消息数量,超过限制,及时通知 + + long count = 0; + try { + count = channel.messageCount(queueName); + } catch (IOException e) { + e.printStackTrace(); + } + if(count > MESSAGE_COUNT_LIMIT){ + //4、通知的方式就是发送短信。 + mailUtil.sendEmail(queueName + "队列消息堆积",String.format(text,queueName,count)); + } + } + + +} diff --git a/beacon-monitor/src/main/java/com/mashibing/monitor/task/TestTask.java b/beacon-monitor/src/main/java/com/mashibing/monitor/task/TestTask.java new file mode 100644 index 0000000..68914cc --- /dev/null +++ b/beacon-monitor/src/main/java/com/mashibing/monitor/task/TestTask.java @@ -0,0 +1,20 @@ +package com.mashibing.monitor.task; + +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author 郑大仙丶 + * @version V1.0.0 + */ +@Component +@Slf4j +public class TestTask { + + @XxlJob("test") + public void test(){ + // 编写任务逻辑 + log.info("Hello World!"); + } +} diff --git a/beacon-monitor/src/main/java/com/mashibing/monitor/util/MailUtil.java b/beacon-monitor/src/main/java/com/mashibing/monitor/util/MailUtil.java new file mode 100644 index 0000000..4367378 --- /dev/null +++ b/beacon-monitor/src/main/java/com/mashibing/monitor/util/MailUtil.java @@ -0,0 +1,48 @@ +package com.mashibing.monitor.util; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.mail.SimpleMailMessage; +import org.springframework.mail.javamail.JavaMailSender; +import org.springframework.mail.javamail.MimeMessageHelper; +import org.springframework.stereotype.Component; + +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage; + +/** + * @author 郑大仙丶 + * @version V1.0.0 + */ +@Component +@RefreshScope +public class MailUtil { + + @Value("${spring.mail.username}") + private String from; + + @Value("${spring.mail.tos}") + private String tos; + + @Autowired + private JavaMailSender javaMailSender; + + + public void sendEmail(String subject,String text) throws MessagingException { + // 构建MimeMessage对象 + MimeMessage mimeMessage = javaMailSender.createMimeMessage(); + + // 给邮件指定信息 + MimeMessageHelper helper = new MimeMessageHelper(mimeMessage); + helper.setFrom(from); + helper.setTo(tos.split(",")); + helper.setSubject(subject); + helper.setText(text); + + // 发送邮件 + javaMailSender.send(mimeMessage); + } + + +} diff --git a/beacon-monitor/src/main/resources/bootstrap.yml b/beacon-monitor/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..4451a7b --- /dev/null +++ b/beacon-monitor/src/main/resources/bootstrap.yml @@ -0,0 +1,17 @@ +# 服务名称 +spring: + application: + name: beacon-monitor + # 多环境 + profiles: + active: dev + # nacos注册中心地址 + cloud: + nacos: + discovery: + server-addr: 114.116.226.76:8848 + # nacos配置中心地址: + config: + server-addr: 114.116.226.76:8848 + file-extension: yml + # beacon-monitor-dev.yml \ No newline at end of file diff --git a/beacon-strategy/pom.xml b/beacon-strategy/pom.xml index 7fb4b6a..8dc5c57 100644 --- a/beacon-strategy/pom.xml +++ b/beacon-strategy/pom.xml @@ -56,6 +56,11 @@ hutool-dfa 5.8.12 + + com.mashibing + beacon-api + 1.0-SNAPSHOT + diff --git a/pom.xml b/pom.xml index daebbfd..08cb30e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ beacon-search beacon-push beacon-smsgateway + beacon-monitor org.springframework.boot