1、完成监控模块搭建&注册XXL-JOB

2、完成监控模块监听队列消息个数
master
郑大仙丶 2 years ago
parent 41320029af
commit 773bcde542

@ -42,7 +42,6 @@ public class SignCheckFilter implements CheckFilter {
private final String SIGN_ID = "id";
@Override
public void check(StandardSubmit submit) {
log.info("【接口模块-校验签名】 校验ing…………");

@ -152,5 +152,13 @@ public class CacheController {
return result;
}
@PostMapping(value = "/cache/keys/{pattern}")
public Set<String> keys(@PathVariable String pattern){
log.info("【缓存模块】 keys方法根据pattern查询key的信息 pattern = {}" ,pattern);
Set<String> keys = redisTemplate.keys(pattern);
log.info("【缓存模块】 keys方法根据pattern查询key的信息 pattern = {},查询出全部的key信息 keys = {}" ,pattern,keys);
return keys;
}
}

@ -29,7 +29,7 @@ public interface RabbitMQConstants {
String SMS_PUSH_REPORT = "sms_push_report_topic";
/**
*
* id
*/
String SMS_GATEWAY = "sms_gateway_topic_";

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>beacon-cloud</artifactId>
<groupId>com.mashibing</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>beacon-monitor</artifactId>
<dependencies>
<!-- web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- nacos注册-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos配置-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- 公共模块-->
<dependency>
<groupId>com.mashibing</groupId>
<artifactId>beacon-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- xxl-job的依赖-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>
<!-- RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- openFeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- JavaMail-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,21 @@
package com.mashibing.monitor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* @author
* @version V1.0.0
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class MonitorStarterApp {
public static void main(String[] args) {
SpringApplication.run(MonitorStarterApp.class,args);
}
}

@ -0,0 +1,18 @@
package com.mashibing.monitor.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import java.util.Set;
/**
* @author
* @version V1.0.0
*/
@FeignClient(value = "beacon-cache")
public interface CacheClient {
@PostMapping(value = "/cache/keys/{pattern}")
Set<String> keys(@PathVariable String pattern);
}

@ -0,0 +1,55 @@
package com.mashibing.monitor.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author
* @version V1.0.0
*/
@Configuration
@Slf4j
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
// xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}

@ -0,0 +1,92 @@
package com.mashibing.monitor.task;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.monitor.client.CacheClient;
import com.mashibing.monitor.util.MailUtil;
import com.rabbitmq.client.Channel;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.mail.MessagingException;
import java.io.IOException;
import java.util.Set;
/**
* 10000
* @author
* @version V1.0.0
*/
@Component
@Slf4j
public class MonitorQueueMessageCountTask {
String text = "<h1>您的队列消息队列堆积了,队名为%s消息个数为%s</1>";
// 队列消息限制
private final long MESSAGE_COUNT_LIMIT = 10000;
// 查询队列名称的固定pattern
private final String QUEUE_PATTERN = "channel:*";
// 得到需要截取channelID的索引地址
private final int CHANNEL_ID_INDEX = QUEUE_PATTERN.indexOf("*");
// 注入RabbitMQ的ConnectionFactory
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private CacheClient cacheClient;
@Autowired
private MailUtil mailUtil;
@XxlJob("monitorQueueMessageCountTask")
public void monitor() throws MessagingException {
//1、拿到所有的队列名称
Set<String> keys = cacheClient.keys(QUEUE_PATTERN);
//2、需要基于channel去操作
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
listenQueueAndSendEmail(channel,RabbitMQConstants.SMS_PRE_SEND);
for (String key : keys) {
// 封装队列名称
String queueName = RabbitMQConstants.SMS_GATEWAY + key.substring(CHANNEL_ID_INDEX);
listenQueueAndSendEmail(channel, queueName);
}
}
private void listenQueueAndSendEmail(Channel channel, String queueName) throws MessagingException {
// 队列不存在,直接构建,如果已经存在,直接忽略
try {
channel.queueDeclare(queueName,true,false,false,null);
} catch (IOException e) {
e.printStackTrace();
}
//3、拿到对应队列的消息确认消息数量超过限制及时通知
long count = 0;
try {
count = channel.messageCount(queueName);
} catch (IOException e) {
e.printStackTrace();
}
if(count > MESSAGE_COUNT_LIMIT){
//4、通知的方式就是发送短信。
mailUtil.sendEmail(queueName + "队列消息堆积",String.format(text,queueName,count));
}
}
}

@ -0,0 +1,20 @@
package com.mashibing.monitor.task;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author
* @version V1.0.0
*/
@Component
@Slf4j
public class TestTask {
@XxlJob("test")
public void test(){
// 编写任务逻辑
log.info("Hello World!");
}
}

@ -0,0 +1,48 @@
package com.mashibing.monitor.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
/**
* @author
* @version V1.0.0
*/
@Component
@RefreshScope
public class MailUtil {
@Value("${spring.mail.username}")
private String from;
@Value("${spring.mail.tos}")
private String tos;
@Autowired
private JavaMailSender javaMailSender;
public void sendEmail(String subject,String text) throws MessagingException {
// 构建MimeMessage对象
MimeMessage mimeMessage = javaMailSender.createMimeMessage();
// 给邮件指定信息
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage);
helper.setFrom(from);
helper.setTo(tos.split(","));
helper.setSubject(subject);
helper.setText(text);
// 发送邮件
javaMailSender.send(mimeMessage);
}
}

@ -0,0 +1,17 @@
# 服务名称
spring:
application:
name: beacon-monitor
# 多环境
profiles:
active: dev
# nacos注册中心地址
cloud:
nacos:
discovery:
server-addr: 114.116.226.76:8848
# nacos配置中心地址:
config:
server-addr: 114.116.226.76:8848
file-extension: yml
# beacon-monitor-dev.yml

@ -56,6 +56,11 @@
<artifactId>hutool-dfa</artifactId>
<version>5.8.12</version>
</dependency>
<dependency>
<groupId>com.mashibing</groupId>
<artifactId>beacon-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

@ -12,6 +12,7 @@
<module>beacon-search</module>
<module>beacon-push</module>
<module>beacon-smsgateway</module>
<module>beacon-monitor</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>

Loading…
Cancel
Save