From 68f7653558353679aa191f6432966d7ef8133e11 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Fri, 25 Jun 2021 00:03:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8A=9F=E8=83=BD=E6=8C=81=E7=BB=AD?= =?UTF-8?q?=E6=9B=B4=E6=96=B0.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/executor/ExecutorFactory.java | 1 - .../common/executor/ThreadPoolManager.java | 1 + .../common/model/PoolParameter.java | 55 +++++++++++++++++++ .../threadpool/common/toolkit/Md5Util.java | 7 --- .../server/controller/ConfigController.java | 11 ++-- .../threadpool/server/event/SlowEvent.java | 2 + .../server/notify/DefaultPublisher.java | 8 --- .../server/notify/DefaultSharePublisher.java | 2 +- .../server/notify/NotifyCenter.java | 30 +++++----- .../server/service/ConfigCacheService.java | 20 +++++++ .../server/service/ConfigChangePublisher.java | 7 ++- .../server/service/LongPollingService.java | 11 ++-- .../service/impl/ConfigServiceImpl.java | 13 +++-- .../server/toolkit/RequestUtil.java | 2 - .../server/toolkit/SimpleReadWriteLock.java | 13 ----- .../server/toolkit/SingletonRepository.java | 8 --- 16 files changed, 121 insertions(+), 70 deletions(-) diff --git a/common/src/main/java/io/dynamic/threadpool/common/executor/ExecutorFactory.java b/common/src/main/java/io/dynamic/threadpool/common/executor/ExecutorFactory.java index 96fe96e3..fd028cde 100644 --- a/common/src/main/java/io/dynamic/threadpool/common/executor/ExecutorFactory.java +++ b/common/src/main/java/io/dynamic/threadpool/common/executor/ExecutorFactory.java @@ -12,7 +12,6 @@ import java.util.concurrent.ThreadFactory; */ public class ExecutorFactory { - public static final class Managed { private static final String DEFAULT_NAMESPACE = "dynamic.thread-pool"; diff --git a/common/src/main/java/io/dynamic/threadpool/common/executor/ThreadPoolManager.java b/common/src/main/java/io/dynamic/threadpool/common/executor/ThreadPoolManager.java index 01035d92..a1c8f47a 100644 --- a/common/src/main/java/io/dynamic/threadpool/common/executor/ThreadPoolManager.java +++ b/common/src/main/java/io/dynamic/threadpool/common/executor/ThreadPoolManager.java @@ -42,6 +42,7 @@ public class ThreadPoolManager { lockers.put(namespace, new Object()); } } + final Object monitor = lockers.get(namespace); synchronized (monitor) { Map> map = resourcesManager.get(namespace); diff --git a/common/src/main/java/io/dynamic/threadpool/common/model/PoolParameter.java b/common/src/main/java/io/dynamic/threadpool/common/model/PoolParameter.java index 5172b16c..3eac8ae6 100644 --- a/common/src/main/java/io/dynamic/threadpool/common/model/PoolParameter.java +++ b/common/src/main/java/io/dynamic/threadpool/common/model/PoolParameter.java @@ -8,26 +8,81 @@ package io.dynamic.threadpool.common.model; */ public interface PoolParameter { + /** + * namespace + * + * @return + */ String getNamespace(); + /** + * itemId + * + * @return + */ String getItemId(); + /** + * tpId + * + * @return + */ String getTpId(); + /** + * coreSize + * + * @return + */ Integer getCoreSize(); + /** + * maxSize + * + * @return + */ Integer getMaxSize(); + /** + * queueType + * + * @return + */ Integer getQueueType(); + /** + * capacity + * + * @return + */ Integer getCapacity(); + /** + * keepAliveTime + * + * @return + */ Integer getKeepAliveTime(); + /** + * isAlarm + * + * @return + */ Integer getIsAlarm(); + /** + * capacityAlarm + * + * @return + */ Integer getCapacityAlarm(); + /** + * livenessAlarm + * + * @return + */ Integer getLivenessAlarm(); } diff --git a/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java b/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java index a5c81fb3..e1d1964f 100644 --- a/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java +++ b/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java @@ -45,7 +45,6 @@ public class Md5Util { public static String encodeHexString(byte[] bytes) { int l = bytes.length; - char[] out = new char[l << 1]; for (int i = 0, j = 0; i < l; i++) { @@ -56,12 +55,6 @@ public class Md5Util { return new String(out); } - /** - * 获取 ThreadPool 相关内容 Md5 值 - * - * @param config - * @return - */ public static String getTpContentMd5(PoolParameter config) { return Md5Util.md5Hex(ContentUtil.getPoolContent(config), "UTF-8"); } diff --git a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java index b583630b..2ff76df4 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java +++ b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java @@ -1,15 +1,15 @@ package io.dynamic.threadpool.server.controller; +import io.dynamic.threadpool.common.web.base.Result; +import io.dynamic.threadpool.common.web.base.Results; import io.dynamic.threadpool.server.constant.Constants; -import io.dynamic.threadpool.server.event.ConfigDataChangeEvent; +import io.dynamic.threadpool.server.event.LocalDataChangeEvent; import io.dynamic.threadpool.server.model.ConfigAllInfo; import io.dynamic.threadpool.server.model.ConfigInfoBase; import io.dynamic.threadpool.server.service.ConfigChangePublisher; import io.dynamic.threadpool.server.service.ConfigService; import io.dynamic.threadpool.server.service.ConfigServletInner; import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil; -import io.dynamic.threadpool.common.web.base.Result; -import io.dynamic.threadpool.common.web.base.Results; import lombok.SneakyThrows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; @@ -51,8 +51,11 @@ public class ConfigController { configService.insertOrUpdate(config); long gmtModified = new Timestamp(System.currentTimeMillis()).getTime(); + /*ConfigChangePublisher + .notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified));*/ + ConfigChangePublisher - .notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified)); + .notifyConfigChange(new LocalDataChangeEvent("")); return Results.success(true); } diff --git a/server/src/main/java/io/dynamic/threadpool/server/event/SlowEvent.java b/server/src/main/java/io/dynamic/threadpool/server/event/SlowEvent.java index 0f014c33..8e3b3edb 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/event/SlowEvent.java +++ b/server/src/main/java/io/dynamic/threadpool/server/event/SlowEvent.java @@ -7,8 +7,10 @@ package io.dynamic.threadpool.server.event; * @date 2021/6/23 19:05 */ public abstract class SlowEvent extends Event { + @Override public long sequence() { return 0; } + } diff --git a/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultPublisher.java b/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultPublisher.java index 5a554678..df6d55d9 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultPublisher.java +++ b/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultPublisher.java @@ -127,15 +127,7 @@ public class DefaultPublisher extends Thread implements EventPublisher { } void receiveEvent(Event event) { - final long currentEventSequence = event.sequence(); - for (Subscriber subscriber : subscribers) { - /*if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { - LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", - event.getClass()); - continue; - }*/ - notifySubscriber(subscriber, event); } } diff --git a/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultSharePublisher.java b/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultSharePublisher.java index 31f44b1d..f37b0e44 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultSharePublisher.java +++ b/server/src/main/java/io/dynamic/threadpool/server/notify/DefaultSharePublisher.java @@ -33,7 +33,7 @@ public class DefaultSharePublisher extends DefaultPublisher { try { Set sets = subMappings.get(subSlowEventType); if (sets == null) { - Set newSet = new ConcurrentHashSet(); + Set newSet = new ConcurrentHashSet(); newSet.add(subscriber); subMappings.put(subSlowEventType, newSet); return; diff --git a/server/src/main/java/io/dynamic/threadpool/server/notify/NotifyCenter.java b/server/src/main/java/io/dynamic/threadpool/server/notify/NotifyCenter.java index f19144b0..5bd3bcf8 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/notify/NotifyCenter.java +++ b/server/src/main/java/io/dynamic/threadpool/server/notify/NotifyCenter.java @@ -8,9 +8,7 @@ import io.dynamic.threadpool.server.toolkit.ClassUtil; import io.dynamic.threadpool.server.toolkit.MapUtil; import lombok.extern.slf4j.Slf4j; -import java.util.Iterator; import java.util.Map; -import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; @@ -33,23 +31,16 @@ public class NotifyCenter { private static Class clazz = null; + private static EventPublisher eventPublisher = new DefaultPublisher(); + private static BiFunction, Integer, EventPublisher> publisherFactory = null; private final Map publisherMap = new ConcurrentHashMap(16); static { - final ServiceLoader loader = ServiceLoader.load(EventPublisher.class); - Iterator iterator = loader.iterator(); - - if (iterator.hasNext()) { - clazz = iterator.next().getClass(); - } else { - clazz = DefaultPublisher.class; - } - publisherFactory = (cls, buffer) -> { try { - EventPublisher publisher = clazz.newInstance(); + EventPublisher publisher = eventPublisher; publisher.init(cls, buffer); return publisher; } catch (Throwable ex) { @@ -62,7 +53,7 @@ public class NotifyCenter { INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); } - public static void registerSubscriber(final Subscriber consumer) { + public static void registerSubscriber(final Subscriber consumer) { if (consumer instanceof SmartSubscriber) { for (Class subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) { if (ClassUtil.isAssignableFrom(SlowEvent.class, subscribeType)) { @@ -84,7 +75,6 @@ public class NotifyCenter { } private static void addSubscriber(final Subscriber consumer, Class subscribeType) { - final String topic = ClassUtil.getCanonicalName(subscribeType); synchronized (NotifyCenter.class) { MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize); @@ -116,4 +106,16 @@ public class NotifyCenter { log.warn("There are no [{}] publishers for this event, please register", topic); return false; } + + public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) { + if (ClassUtil.isAssignableFrom(SlowEvent.class, eventType)) { + return INSTANCE.sharePublisher; + } + + final String topic = ClassUtil.getCanonicalName(eventType); + synchronized (NotifyCenter.class) { + MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize); + } + return INSTANCE.publisherMap.get(topic); + } } diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java index 93f3dbb0..d55a09a0 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java +++ b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java @@ -3,8 +3,10 @@ package io.dynamic.threadpool.server.service; import io.dynamic.threadpool.common.config.ApplicationContextHolder; import io.dynamic.threadpool.common.toolkit.Md5Util; import io.dynamic.threadpool.server.constant.Constants; +import io.dynamic.threadpool.server.event.LocalDataChangeEvent; import io.dynamic.threadpool.server.model.CacheItem; import io.dynamic.threadpool.server.model.ConfigAllInfo; +import io.dynamic.threadpool.server.notify.NotifyCenter; import org.springframework.util.StringUtils; import java.util.Objects; @@ -56,4 +58,22 @@ public class ConfigCacheService { return (cacheItem != null) ? cacheItem.md5 : Constants.NULL; } + public static void updateMd5(String groupKey, String md5, long lastModifiedTs) { + CacheItem cache = makeSure(groupKey); + if (cache.md5 == null || !cache.md5.equals(md5)) { + cache.md5 = md5; + cache.lastModifiedTs = lastModifiedTs; + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); + } + } + + static CacheItem makeSure(final String groupKey) { + CacheItem item = CACHE.get(groupKey); + if (null != item) { + return item; + } + CacheItem tmp = new CacheItem(groupKey); + item = CACHE.putIfAbsent(groupKey, tmp); + return (null == item) ? tmp : item; + } } diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigChangePublisher.java b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigChangePublisher.java index 546f751c..b08cef07 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigChangePublisher.java +++ b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigChangePublisher.java @@ -1,6 +1,6 @@ package io.dynamic.threadpool.server.service; -import io.dynamic.threadpool.server.event.ConfigDataChangeEvent; +import io.dynamic.threadpool.server.event.LocalDataChangeEvent; import io.dynamic.threadpool.server.notify.NotifyCenter; /** @@ -14,9 +14,10 @@ public class ConfigChangePublisher { /** * Notify ConfigChange. * - * @param event ConfigDataChangeEvent instance. + * @param event */ - public static void notifyConfigChange(ConfigDataChangeEvent event) { + public static void notifyConfigChange(LocalDataChangeEvent event) { NotifyCenter.publishEvent(event); } + } diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java b/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java index 25fb6331..93945464 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java +++ b/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java @@ -1,14 +1,14 @@ package io.dynamic.threadpool.server.service; import com.alibaba.fastjson.JSON; -import io.dynamic.threadpool.server.event.LocalDataChangeEvent; +import io.dynamic.threadpool.common.web.base.Results; import io.dynamic.threadpool.server.event.Event; +import io.dynamic.threadpool.server.event.LocalDataChangeEvent; import io.dynamic.threadpool.server.notify.NotifyCenter; import io.dynamic.threadpool.server.notify.listener.Subscriber; import io.dynamic.threadpool.server.toolkit.ConfigExecutor; import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil; import io.dynamic.threadpool.server.toolkit.RequestUtil; -import io.dynamic.threadpool.common.web.base.Results; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -45,6 +45,8 @@ public class LongPollingService { ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS); + NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); + NotifyCenter.registerSubscriber(new Subscriber() { @Override @@ -74,6 +76,8 @@ public class LongPollingService { } } + final Queue allSubs; + class DataChangeTask implements Runnable { final String groupKey; @@ -97,7 +101,6 @@ public class LongPollingService { } catch (Exception ex) { log.error("Data change error :: {}", ex.getMessage(), ex); } - } } @@ -138,8 +141,6 @@ public class LongPollingService { ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName)); } - final Queue allSubs; - class ClientLongPolling implements Runnable { final AsyncContext asyncContext; diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java b/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java index 0ed4f040..6d2f864f 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java +++ b/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java @@ -1,5 +1,7 @@ package io.dynamic.threadpool.server.service.impl; +import io.dynamic.threadpool.common.toolkit.ContentUtil; +import io.dynamic.threadpool.common.toolkit.Md5Util; import io.dynamic.threadpool.server.mapper.RowMapperManager; import io.dynamic.threadpool.server.model.ConfigAllInfo; import io.dynamic.threadpool.server.service.ConfigService; @@ -30,7 +32,7 @@ public class ConfigServiceImpl implements ConfigService { @Override public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace) { ConfigAllInfo configAllInfo = jdbcTemplate.queryForObject( - "select * from config_info where tp_id = ? and item_id = ? and namespace = ?", + "select * from config_info where tp_id = ? and item_id = ? and tenant_id = ?", new Object[]{tpId, itemId, namespace}, RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER); @@ -47,7 +49,8 @@ public class ConfigServiceImpl implements ConfigService { } private Long addConfigInfo(ConfigAllInfo config) { - final String sql = "INSERT INTO `config_info` (`tenant_id`, `item_id`, `tp_id`, `core_size`, `max_size`, `queue_type`, `capacity`, `keep_alive_time`, `content`, `md5`, `is_alarm`, `capacity_alarm`, `liveness_alarm`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);"; + final String sql = "INSERT INTO `config_info` (`tenant_id`, `item_id`, `tp_id`, `core_size`, `max_size`, `queue_type`, `capacity`, `keep_alive_time`, `content`, `md5`, `is_alarm`, `capacity_alarm`, `liveness_alarm`) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"; KeyHolder keyHolder = new GeneratedKeyHolder(); try { @@ -82,10 +85,12 @@ public class ConfigServiceImpl implements ConfigService { private void updateConfigInfo(ConfigAllInfo config) { try { + String poolContent = ContentUtil.getPoolContent(config); + String md5 = Md5Util.md5Hex(poolContent, "UTF-8"); jdbcTemplate.update("update config_info set core_size = ?, max_size = ?, queue_type = ?, capacity = ?, keep_alive_time = ?, content = ?, md5 = ?, is_alarm = ?, capacity_alarm = ?, liveness_alarm = ? " + - "where tenant_id = ?, item_id = ?, tp_id = ?", + "where tenant_id = ? and item_id = ? and tp_id = ?", config.getCoreSize(), config.getMaxSize(), config.getQueueType(), config.getCapacity(), config.getKeepAliveTime(), - config.getContent(), Md5ConfigUtil.getTpContentMd5(config), config.getIsAlarm(), config.getCapacityAlarm(), + poolContent, md5, config.getIsAlarm(), config.getCapacityAlarm(), config.getLivenessAlarm(), config.getNamespace(), config.getItemId(), config.getTpId()); } catch (Exception ex) { log.error("[db-error] message :: {}", ex.getMessage(), ex); diff --git a/server/src/main/java/io/dynamic/threadpool/server/toolkit/RequestUtil.java b/server/src/main/java/io/dynamic/threadpool/server/toolkit/RequestUtil.java index 3861a30d..741d3c68 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/toolkit/RequestUtil.java +++ b/server/src/main/java/io/dynamic/threadpool/server/toolkit/RequestUtil.java @@ -18,8 +18,6 @@ public class RequestUtil { private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ","; - public static final String CLIENT_APPNAME_HEADER = "Client-AppName"; - public static String getRemoteIp(HttpServletRequest request) { String xForwardedFor = request.getHeader(X_FORWARDED_FOR); if (!StringUtils.isEmpty(xForwardedFor)) { diff --git a/server/src/main/java/io/dynamic/threadpool/server/toolkit/SimpleReadWriteLock.java b/server/src/main/java/io/dynamic/threadpool/server/toolkit/SimpleReadWriteLock.java index 5ffaf448..2aac5ad8 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/toolkit/SimpleReadWriteLock.java +++ b/server/src/main/java/io/dynamic/threadpool/server/toolkit/SimpleReadWriteLock.java @@ -8,9 +8,6 @@ package io.dynamic.threadpool.server.toolkit; */ public class SimpleReadWriteLock { - /** - * Try read lock. - */ public synchronized boolean tryReadLock() { if (isWriteLocked()) { return false; @@ -20,16 +17,10 @@ public class SimpleReadWriteLock { } } - /** - * Release the read lock. - */ public synchronized void releaseReadLock() { status--; } - /** - * Try write lock. - */ public synchronized boolean tryWriteLock() { if (!isFree()) { return false; @@ -51,9 +42,5 @@ public class SimpleReadWriteLock { return status == 0; } - /** - * Zero means no lock; Negative Numbers mean write locks; Positive Numbers mean read locks, and the numeric value - * represents the number of read locks. - */ private int status = 0; } diff --git a/server/src/main/java/io/dynamic/threadpool/server/toolkit/SingletonRepository.java b/server/src/main/java/io/dynamic/threadpool/server/toolkit/SingletonRepository.java index 03f78044..b42079fe 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/toolkit/SingletonRepository.java +++ b/server/src/main/java/io/dynamic/threadpool/server/toolkit/SingletonRepository.java @@ -11,7 +11,6 @@ import java.util.concurrent.ConcurrentHashMap; public class SingletonRepository { public SingletonRepository() { - // Initializing size 2^16, the container itself use about 50K of memory, avoiding constant expansion shared = new ConcurrentHashMap(1 << 16); } @@ -24,19 +23,12 @@ public class SingletonRepository { return shared.size(); } - /** - * Be careful use. - */ public void remove(Object obj) { shared.remove(obj); } private final ConcurrentHashMap shared; - - /** - * Cache of DataId and Group. - */ public static class DataIdGroupIdCache { public static String getSingleton(String str) {