diff --git a/README.md b/README.md index c7d34d5..1dc8323 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ austin项目**核心流程**:`austin-api`接收到发送消息请求,直接 **1**、austin使用的MySQL版本**5.7x**。如果目前使用的MySQL版本8.0,注意改变`pom.xml`所依赖的版本 -**2**、适配`application.properties`的配置信息(`srping.datasource`) +**2**、适配`application.properties`的配置信息(`spring.datasource`) **3**、执行`sql`文件夹下的`austin.sql`创建对应的表 @@ -97,9 +97,9 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co - [ ] 04、持续提高消息推送系统的影响力,让更多的业务方了解其功能,进而挖掘更多拉新和唤醒用户的玩法,提高站内的次留率和转化率 -**近期更新时间**:2022年1月25日 +**近期更新时间**:2022年2月14日 -**近期更新功能**:austin前端管理系统 +**近期更新功能**:接入xxl-job分布式定时任务框架并完成定时任务逻辑 ## 项目交流 diff --git a/austin-cron/pom.xml b/austin-cron/pom.xml index a45cc07..a784774 100644 --- a/austin-cron/pom.xml +++ b/austin-cron/pom.xml @@ -21,6 +21,16 @@ austin-support 0.0.1-SNAPSHOT + + com.java3y.austin + austin-service-api + 0.0.1-SNAPSHOT + + + com.java3y.austin + austin-service-api-impl + 0.0.1-SNAPSHOT + com.xuxueli xxl-job-core diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java b/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java index 488d5ef..e652567 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java @@ -1,9 +1,12 @@ package com.java3y.austin.cron.constants; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + /** * @author 3y * @date 2022/2/13 - * 缓冲pending 常量 + * 延迟缓冲 pending 常量信息 */ public class PendingConstant { @@ -23,9 +26,10 @@ public class PendingConstant { public static final Long TIME_THRESHOLD = 1000L; /** - * 消费线程数 + * 真正消费线程池配置的信息 */ - public static final Integer THREAD_NUM = 2; - + public static final Integer CORE_POOL_SIZE = 2; + public static final Integer MAX_POOL_SIZE = 2; + public static final BlockingQueue BLOCKING_QUEUE = new LinkedBlockingQueue<>(5); } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java index 317bd20..e0b11b3 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java @@ -27,7 +27,6 @@ public class CronTaskHandler { log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam()); Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam()); taskHandler.handle(messageTemplateId); - } } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java b/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java index eb24d97..8186085 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java @@ -1,39 +1,88 @@ package com.java3y.austin.cron.pending; -import com.java3y.austin.cron.domain.CrowdInfoVo; -import com.java3y.austin.support.pending.BatchPendingThread; -import com.java3y.austin.support.pending.Pending; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.thread.ExecutorBuilder; +import cn.hutool.core.util.StrUtil; +import com.google.common.collect.Lists; +import com.java3y.austin.cron.constants.PendingConstant; +import com.java3y.austin.cron.vo.CrowdInfoVo; +import com.java3y.austin.service.api.domain.BatchSendRequest; +import com.java3y.austin.service.api.domain.MessageParam; +import com.java3y.austin.service.api.enums.BusinessCode; +import com.java3y.austin.service.api.service.SendService; +import com.java3y.austin.support.pending.AbstractLazyPending; import com.java3y.austin.support.pending.PendingParam; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; /** - * 批量处理任务信息 + * 延迟批量处理人群信息 + * 调用 batch 发送接口 进行消息推送 * * @author 3y */ -@Component @Slf4j -public class CrowdBatchTaskPending extends Pending { +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class CrowdBatchTaskPending extends AbstractLazyPending { - @Override - public void initAndStart(PendingParam pendingParam) { - threadNum = pendingParam.getThreadNum() == null ? threadNum : pendingParam.getThreadNum(); - queue = pendingParam.getQueue(); - - for (int i = 0; i < threadNum; ++i) { - BatchPendingThread batchPendingThread = new BatchPendingThread(); - batchPendingThread.setPendingParam(pendingParam); - batchPendingThread.setName("batchPendingThread-" + i); - batchPendingThread.start(); - } + @Autowired + private SendService sendService; + + public CrowdBatchTaskPending() { + PendingParam pendingParam = new PendingParam<>(); + pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD) + .setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE)) + .setTimeThreshold(PendingConstant.TIME_THRESHOLD) + .setExecutorService(ExecutorBuilder.create() + .setCorePoolSize(PendingConstant.CORE_POOL_SIZE) + .setMaxPoolSize(PendingConstant.MAX_POOL_SIZE) + .setWorkQueue(PendingConstant.BLOCKING_QUEUE) + .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) + .build()); + this.pendingParam = pendingParam; } @Override - public void doHandle(List list) { - log.info("theadName:{},doHandle:{}", Thread.currentThread().getName(), list.size()); + public void doHandle(List crowdInfoVos) { + + // 1. 如果参数相同,组装成同一个MessageParam发送 + Map, String> paramMap = MapUtil.newHashMap(); + for (CrowdInfoVo crowdInfoVo : crowdInfoVos) { + String receiver = crowdInfoVo.getReceiver(); + Map vars = crowdInfoVo.getParams(); + if (paramMap.get(vars) == null) { + paramMap.put(vars, receiver); + } else { + String newReceiver = StringUtils.join(new String[]{ + paramMap.get(vars), receiver}, StrUtil.COMMA); + paramMap.put(vars, newReceiver); + } + } + + // 2. 组装参数 + List messageParams = Lists.newArrayList(); + for (Map.Entry, String> entry : paramMap.entrySet()) { + MessageParam messageParam = MessageParam.builder().receiver(entry.getValue()) + .variables(entry.getKey()).build(); + messageParams.add(messageParam); + } + // 3. 调用批量发送接口发送消息 + BatchSendRequest batchSendRequest = BatchSendRequest.builder().code(BusinessCode.COMMON_SEND.getCode()) + .messageParamList(messageParams) + .messageTemplateId(CollUtil.getFirst(crowdInfoVos.iterator()).getMessageTemplateId()) + .build(); + sendService.batchSend(batchSendRequest); } } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java index d4a3e02..974e434 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java @@ -2,21 +2,19 @@ package com.java3y.austin.cron.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; -import com.java3y.austin.cron.constants.PendingConstant; -import com.java3y.austin.cron.domain.CrowdInfoVo; import com.java3y.austin.cron.pending.CrowdBatchTaskPending; import com.java3y.austin.cron.service.TaskHandler; import com.java3y.austin.cron.utils.ReadFileUtils; +import com.java3y.austin.cron.vo.CrowdInfoVo; import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.domain.MessageTemplate; -import com.java3y.austin.support.pending.PendingParam; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.HashMap; -import java.util.concurrent.LinkedBlockingQueue; /** * @author 3y @@ -27,14 +25,14 @@ import java.util.concurrent.LinkedBlockingQueue; public class TaskHandlerImpl implements TaskHandler { @Autowired private MessageTemplateDao messageTemplateDao; - @Autowired - private CrowdBatchTaskPending crowdBatchTaskPending; + @Autowired + private ApplicationContext context; @Override @Async public void handle(Long messageTemplateId) { - log.info("start:{}", Thread.currentThread().getName()); + log.info("TaskHandler handle:{}", Thread.currentThread().getName()); MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { @@ -42,16 +40,8 @@ public class TaskHandlerImpl implements TaskHandler { return; } - // 初始化pending的信息 - PendingParam pendingParam = new PendingParam<>(); - pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD) - .setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE)) - .setTimeThreshold(PendingConstant.TIME_THRESHOLD) - .setThreadNum(PendingConstant.THREAD_NUM) - .setPending(crowdBatchTaskPending); - crowdBatchTaskPending.initAndStart(pendingParam); - - // 读取文件得到每一行记录给到队列做batch处理 + CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class); + // 读取文件得到每一行记录给到队列做lazy batch处理 ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> { if (CollUtil.isEmpty(row.getFieldMap()) || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { @@ -59,7 +49,7 @@ public class TaskHandlerImpl implements TaskHandler { } HashMap params = ReadFileUtils.getParamFromLine(row.getFieldMap()); CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY)) - .params(params).build(); + .params(params).messageTemplateId(messageTemplateId).build(); crowdBatchTaskPending.pending(crowdInfoVo); }); diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java index 61a3706..a1ce671 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java @@ -4,7 +4,7 @@ import cn.hutool.core.io.FileUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.csv.*; import com.google.common.base.Throwables; -import com.java3y.austin.cron.domain.CrowdInfoVo; +import com.java3y.austin.cron.vo.CrowdInfoVo; import lombok.extern.slf4j.Slf4j; import java.io.FileReader; diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java b/austin-cron/src/main/java/com/java3y/austin/cron/vo/CrowdInfoVo.java similarity index 83% rename from austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java rename to austin-cron/src/main/java/com/java3y/austin/cron/vo/CrowdInfoVo.java index ae17152..b6b8874 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/vo/CrowdInfoVo.java @@ -1,4 +1,4 @@ -package com.java3y.austin.cron.domain; +package com.java3y.austin.cron.vo; import lombok.AllArgsConstructor; import lombok.Builder; @@ -21,6 +21,11 @@ import java.util.Map; @Builder public class CrowdInfoVo implements Serializable { + /** + * 消息模板Id + */ + private Long messageTemplateId; + /** * 接收者id */ diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/config/PrototypeBeanConfig.java b/austin-handler/src/main/java/com/java3y/austin/handler/config/PrototypeBeanConfig.java deleted file mode 100644 index 5f76b25..0000000 --- a/austin-handler/src/main/java/com/java3y/austin/handler/config/PrototypeBeanConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.java3y.austin.handler.config; - -import com.java3y.austin.handler.pending.Task; -import com.java3y.austin.handler.receiver.Receiver; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Scope; - -/** - * Handler模块的配置信息 - * - * @author 3y - */ -@Configuration -public class PrototypeBeanConfig { - - /** - * 定义多例的Receiver - */ - @Bean - @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) - public Receiver receiver() { - return new Receiver(); - } - - /** - * 定义多例的Task - */ - @Bean - @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) - public Task task() { - return new Task(); - } - -} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java b/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java index 3cc49ac..04b5eb9 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java @@ -10,6 +10,9 @@ import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; /** * Task 执行器 @@ -22,6 +25,8 @@ import org.springframework.beans.factory.annotation.Autowired; @Data @Accessors(chain = true) @Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class Task implements Runnable { @Autowired diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java index b4e96dd..c68f6c5 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java @@ -12,10 +12,13 @@ import com.java3y.austin.support.utils.LogUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Scope; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; import java.util.List; import java.util.Optional; @@ -25,6 +28,8 @@ import java.util.Optional; * 消费MQ的消息 */ @Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class Receiver { private static final String LOG_BIZ_TYPE = "Receiver#consumer"; @Autowired diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java index 52c76c3..87b70a0 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java @@ -51,8 +51,8 @@ public class AfterParamCheckAction implements BusinessProcess { * @param taskInfo */ private void filterIllegalPhoneNum(List taskInfo) { - Integer idType = taskInfo.get(0).getIdType(); - Integer sendChannel = taskInfo.get(0).getSendChannel(); + Integer idType = CollUtil.getFirst(taskInfo.iterator()).getIdType(); + Integer sendChannel = CollUtil.getFirst(taskInfo.iterator()).getSendChannel(); if (IdType.PHONE.getCode().equals(idType) && ChannelType.SMS.getCode().equals(sendChannel)) { Iterator iterator = taskInfo.iterator(); @@ -66,7 +66,7 @@ public class AfterParamCheckAction implements BusinessProcess { if (CollUtil.isNotEmpty(illegalPhone)) { task.getReceiver().removeAll(illegalPhone); - log.error("{} find illegal phone!{}", task.getMessageTemplateId(), JSON.toJSONString(illegalPhone)); + log.error("messageTemplateId:{} find illegal phone!{}", task.getMessageTemplateId(), JSON.toJSONString(illegalPhone)); } if (CollUtil.isEmpty(task.getReceiver())) { iterator.remove(); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index dae555a..4f667be 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -1,5 +1,6 @@ package com.java3y.austin.service.api.impl.action; +import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; @@ -35,7 +36,8 @@ public class SendMqAction implements BusinessProcess { } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) - , JSON.toJSONString(sendTaskModel.getTaskInfo().get(0))); + , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); + } } } diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/BatchSendRequest.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/BatchSendRequest.java index a7b9885..bd10ed5 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/BatchSendRequest.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/BatchSendRequest.java @@ -1,6 +1,9 @@ package com.java3y.austin.service.api.domain; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import java.util.List; @@ -13,6 +16,9 @@ import java.util.List; */ @Data @Accessors(chain = true) +@AllArgsConstructor +@NoArgsConstructor +@Builder public class BatchSendRequest { diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java index a126ba2..95aa2f8 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java @@ -1,7 +1,9 @@ package com.java3y.austin.service.api.domain; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import java.util.Map; @@ -13,8 +15,9 @@ import java.util.Map; */ @Data @Accessors(chain = true) +@AllArgsConstructor +@NoArgsConstructor @Builder - public class MessageParam { /** diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java b/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java new file mode 100644 index 0000000..6098815 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java @@ -0,0 +1,113 @@ +package com.java3y.austin.support.pending; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.thread.ThreadUtil; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * 延迟消费 阻塞队列-消费者和生产者实现 + * + * @author 3y + */ +@Slf4j +@Data +public abstract class AbstractLazyPending { + + /** + * 子类构造方法必须初始化该参数 + */ + protected PendingParam pendingParam; + + /** + * 批量装载任务 + */ + private List tasks = new ArrayList<>(); + + /** + * 上次执行的时间 + */ + private Long lastHandleTime = System.currentTimeMillis(); + + /** + * 单线程消费 阻塞队列的数据 + */ + @PostConstruct + public void initConsumePending() { + ThreadUtil.newSingleExecutor().execute(() -> { + while (true) { + try { + T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS); + if (null != obj) { + tasks.add(obj); + } + + // 处理条件:1. 数量超限 2. 时间超限 + if (CollUtil.isNotEmpty(tasks) && dataReady()) { + List taskRef = tasks; + tasks = Lists.newArrayList(); + lastHandleTime = System.currentTimeMillis(); + + // 具体执行逻辑 + pendingParam.getExecutorService().execute(() -> this.handle(taskRef)); + } + } catch (Exception e) { + log.error("Pending#initConsumePending failed:{}", Throwables.getStackTraceAsString(e)); + } + } + }); + } + + /** + * 1. 数量超限 + * 2. 时间超限 + * @return + */ + private boolean dataReady() { + return tasks.size() >= pendingParam.getNumThreshold() || + (System.currentTimeMillis() - lastHandleTime >= pendingParam.getTimeThreshold()); + } + + /** + * 将元素放入阻塞队列中 + * + * @param t + */ + public void pending(T t) { + try { + pendingParam.getQueue().put(t); + } catch (InterruptedException e) { + log.error("Pending#pending error:{}", Throwables.getStackTraceAsString(e)); + } + } + + /** + * 消费阻塞队列元素时的方法 + * + * @param t + */ + public void handle(List t) { + if (t.isEmpty()) { + return; + } + try { + doHandle(t); + } catch (Exception e) { + log.error("Pending#handle failed:{}", Throwables.getStackTraceAsString(e)); + } + } + + /** + * 处理阻塞队列的元素 真正方法 + * + * @param list + */ + public abstract void doHandle(List list); +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/BatchPendingThread.java b/austin-support/src/main/java/com/java3y/austin/support/pending/BatchPendingThread.java deleted file mode 100644 index fac0842..0000000 --- a/austin-support/src/main/java/com/java3y/austin/support/pending/BatchPendingThread.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.java3y.austin.support.pending; - -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import lombok.Data; -import lombok.experimental.Accessors; -import lombok.extern.slf4j.Slf4j; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * 延迟消费的线程 实现 - * 积攒一定的数量 或 时间 才消费,达到批量消费的效果 - * - * @author 3y - */ -@Data -@Accessors(chain = true) -@Slf4j -public class BatchPendingThread extends Thread { - - private PendingParam pendingParam; - - /** - * 批量装载任务 - */ - private List tasks = new ArrayList<>(); - - /** - * 当前装载任务的大小 - */ - private Integer total = 0; - - /** - * 上次执行的时间 - */ - private Long lastHandleTime = System.currentTimeMillis(); - - - @Override - public void run() { - while (true) { - try { - T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS); - if (null != obj) { - tasks.add(obj); - } - - // 处理条件:1. 数量超限 2. 时间超限 - if ((tasks.size() >= pendingParam.getNumThreshold()) - || (System.currentTimeMillis() - lastHandleTime >= pendingParam.getTimeThreshold())) { - List taskRef = tasks; - tasks = Lists.newArrayList(); - lastHandleTime = System.currentTimeMillis(); - pendingParam.getPending().handle(taskRef); - } - } catch (Exception e) { - log.error("BatchPendingThread#run failed:{}", Throwables.getStackTraceAsString(e)); - } - } - } -} diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/Pending.java b/austin-support/src/main/java/com/java3y/austin/support/pending/Pending.java deleted file mode 100644 index 6bf043e..0000000 --- a/austin-support/src/main/java/com/java3y/austin/support/pending/Pending.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.java3y.austin.support.pending; - -import com.google.common.base.Throwables; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.concurrent.BlockingQueue; - -/** - * 阻塞队列-消费者和生产者实现 - * - * @author 3y - */ -@Slf4j -@Data -public abstract class Pending { - - /** - * 可使用的线程数 - */ - public static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors(); - - /** - * 阻塞队列 实现类 - */ - protected BlockingQueue queue; - - /** - * 默认消费线程数 = 目前可使用的线程数 - */ - protected Integer threadNum = DEFAULT_THREAD_NUM; - - - /** - * 将元素放入阻塞队列中 - * - * @param t - */ - public void pending(T t) { - try { - queue.put(t); - } catch (InterruptedException e) { - log.error("Pending#pending error:{}", Throwables.getStackTraceAsString(e)); - } - } - - /** - * 消费阻塞队列元素时的方法 - * - * @param t - */ - public void handle(List t) { - if (t.isEmpty()) { - return; - } - try { - doHandle(t); - } catch (Exception e) { - log.error("Pending#handle failed:{}", Throwables.getStackTraceAsString(e)); - } - } - - - /** - * 初始化并启动 - * - * @param pendingParam 参数信息 - */ - public abstract void initAndStart(PendingParam pendingParam); - - - /** - * 处理阻塞队列的元素 真正方法 - * - * @param list - */ - public abstract void doHandle(List list); -} diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java b/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java index 0e81128..c9f6905 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java +++ b/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java @@ -8,6 +8,7 @@ import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; /** @@ -37,12 +38,8 @@ public class PendingParam { private Long timeThreshold; /** - * pending具体实现对象 + * 消费线程池实例【必填】 */ - private Pending pending; + protected ExecutorService executorService; - /** - * 消费线程数【可选】 - */ - protected Integer threadNum; } diff --git a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java index 4c2a48b..e20cf8c 100644 --- a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java +++ b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java @@ -11,7 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; public class AustinApplication { public static void main(String[] args) { // TODO apollo的地址. - //System.setProperty("apollo.config-service", "http://ip:7000"); + System.setProperty("apollo.config-service", "http://119.91.205.248:7000"); SpringApplication.run(AustinApplication.class, args); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 9e42b49..e6ee17f 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -5,14 +5,14 @@ ##################### database properties ##################### # notice:mysql version 5.7x !!! todo ip port username password -spring.datasource.url=jdbc:mysql://${ip}:${port}/austin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull -spring.datasource.username=${username} -spring.datasource.password=${password} +spring.datasource.url=jdbc:mysql://localhost:3306/austin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull +spring.datasource.username=root +spring.datasource.password=root123_A spring.datasource.driver-class-name=com.mysql.jdbc.Driver ##################### kafka properties ##################### # todo ip port -spring.kafka.bootstrap-servers=${ip}:${port} +spring.kafka.bootstrap-servers=120.48.13.113:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer @@ -23,28 +23,28 @@ spring.kafka.consumer.enable-auto-commit=true ##################### redis properties ##################### # todo ip port password -spring.redis.host=${ip} -spring.redis.port=${port} -spring.redis.password=${password} +spring.redis.host=119.91.205.248 +spring.redis.port=6379 +spring.redis.password=austin ##################### business properties ##################### austin.business.topic.name=austinTopicV2 # TODO if need graylog ,replace ip ! -austin.business.graylog.ip=${ip} +austin.business.graylog.ip=120.48.13.113 # TODO if windows os ,replace path ! austin.business.upload.crowd.path=/Users/3y/temp ##################### xxl properties ##################### # todo ip port -xxl.job.admin.addresses=http://${ip}:${port}/xxl-job-admin +xxl.job.admin.addresses=http://localhost:6767/xxl-job-admin xxl.job.admin.username=admin xxl.job.admin.password=123456 xxl.job.executor.appname=austin xxl.job.executor.jobHandlerName=austinJob xxl.job.executor.ip= xxl.job.executor.port=6666 -xxl.job.executor.logpath= +xxl.job.executor.logpath=logs/xxl xxl.job.executor.logretentiondays=30 xxl.job.accessToken=