feat: 功能持续更新.

pull/161/head
chen.ma 3 years ago
parent 0a315a0537
commit 68f7653558

@ -12,7 +12,6 @@ import java.util.concurrent.ThreadFactory;
*/
public class ExecutorFactory {
public static final class Managed {
private static final String DEFAULT_NAMESPACE = "dynamic.thread-pool";

@ -42,6 +42,7 @@ public class ThreadPoolManager {
lockers.put(namespace, new Object());
}
}
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);

@ -8,26 +8,81 @@ package io.dynamic.threadpool.common.model;
*/
public interface PoolParameter {
/**
* namespace
*
* @return
*/
String getNamespace();
/**
* itemId
*
* @return
*/
String getItemId();
/**
* tpId
*
* @return
*/
String getTpId();
/**
* coreSize
*
* @return
*/
Integer getCoreSize();
/**
* maxSize
*
* @return
*/
Integer getMaxSize();
/**
* queueType
*
* @return
*/
Integer getQueueType();
/**
* capacity
*
* @return
*/
Integer getCapacity();
/**
* keepAliveTime
*
* @return
*/
Integer getKeepAliveTime();
/**
* isAlarm
*
* @return
*/
Integer getIsAlarm();
/**
* capacityAlarm
*
* @return
*/
Integer getCapacityAlarm();
/**
* livenessAlarm
*
* @return
*/
Integer getLivenessAlarm();
}

@ -45,7 +45,6 @@ public class Md5Util {
public static String encodeHexString(byte[] bytes) {
int l = bytes.length;
char[] out = new char[l << 1];
for (int i = 0, j = 0; i < l; i++) {
@ -56,12 +55,6 @@ public class Md5Util {
return new String(out);
}
/**
* ThreadPool Md5
*
* @param config
* @return
*/
public static String getTpContentMd5(PoolParameter config) {
return Md5Util.md5Hex(ContentUtil.getPoolContent(config), "UTF-8");
}

@ -1,15 +1,15 @@
package io.dynamic.threadpool.server.controller;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Results;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.ConfigDataChangeEvent;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.model.ConfigInfoBase;
import io.dynamic.threadpool.server.service.ConfigChangePublisher;
import io.dynamic.threadpool.server.service.ConfigService;
import io.dynamic.threadpool.server.service.ConfigServletInner;
import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Results;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
@ -51,8 +51,11 @@ public class ConfigController {
configService.insertOrUpdate(config);
long gmtModified = new Timestamp(System.currentTimeMillis()).getTime();
/*ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified));*/
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified));
.notifyConfigChange(new LocalDataChangeEvent(""));
return Results.success(true);
}

@ -7,8 +7,10 @@ package io.dynamic.threadpool.server.event;
* @date 2021/6/23 19:05
*/
public abstract class SlowEvent extends Event {
@Override
public long sequence() {
return 0;
}
}

@ -127,15 +127,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
}
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
for (Subscriber subscriber : subscribers) {
/*if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}*/
notifySubscriber(subscriber, event);
}
}

@ -33,7 +33,7 @@ public class DefaultSharePublisher extends DefaultPublisher {
try {
Set<Subscriber> sets = subMappings.get(subSlowEventType);
if (sets == null) {
Set<Subscriber> newSet = new ConcurrentHashSet<Subscriber>();
Set<Subscriber> newSet = new ConcurrentHashSet();
newSet.add(subscriber);
subMappings.put(subSlowEventType, newSet);
return;

@ -8,9 +8,7 @@ import io.dynamic.threadpool.server.toolkit.ClassUtil;
import io.dynamic.threadpool.server.toolkit.MapUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
@ -33,23 +31,16 @@ public class NotifyCenter {
private static Class<? extends EventPublisher> clazz = null;
private static EventPublisher eventPublisher = new DefaultPublisher();
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null;
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap(16);
static {
final ServiceLoader<EventPublisher> loader = ServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = loader.iterator();
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
publisherFactory = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
EventPublisher publisher = eventPublisher;
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
@ -62,7 +53,7 @@ public class NotifyCenter {
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
}
public static <T> void registerSubscriber(final Subscriber consumer) {
public static void registerSubscriber(final Subscriber consumer) {
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
if (ClassUtil.isAssignableFrom(SlowEvent.class, subscribeType)) {
@ -84,7 +75,6 @@ public class NotifyCenter {
}
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
final String topic = ClassUtil.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
@ -116,4 +106,16 @@ public class NotifyCenter {
log.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
if (ClassUtil.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
final String topic = ClassUtil.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}
}

@ -3,8 +3,10 @@ package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.model.CacheItem;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.notify.NotifyCenter;
import org.springframework.util.StringUtils;
import java.util.Objects;
@ -56,4 +58,22 @@ public class ConfigCacheService {
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
}
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
static CacheItem makeSure(final String groupKey) {
CacheItem item = CACHE.get(groupKey);
if (null != item) {
return item;
}
CacheItem tmp = new CacheItem(groupKey);
item = CACHE.putIfAbsent(groupKey, tmp);
return (null == item) ? tmp : item;
}
}

@ -1,6 +1,6 @@
package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.server.event.ConfigDataChangeEvent;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.notify.NotifyCenter;
/**
@ -14,9 +14,10 @@ public class ConfigChangePublisher {
/**
* Notify ConfigChange.
*
* @param event ConfigDataChangeEvent instance.
* @param event
*/
public static void notifyConfigChange(ConfigDataChangeEvent event) {
public static void notifyConfigChange(LocalDataChangeEvent event) {
NotifyCenter.publishEvent(event);
}
}

@ -1,14 +1,14 @@
package io.dynamic.threadpool.server.service;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.common.web.base.Results;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.notify.NotifyCenter;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import io.dynamic.threadpool.server.toolkit.ConfigExecutor;
import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil;
import io.dynamic.threadpool.server.toolkit.RequestUtil;
import io.dynamic.threadpool.common.web.base.Results;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -45,6 +45,8 @@ public class LongPollingService {
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
@ -74,6 +76,8 @@ public class LongPollingService {
}
}
final Queue<ClientLongPolling> allSubs;
class DataChangeTask implements Runnable {
final String groupKey;
@ -97,7 +101,6 @@ public class LongPollingService {
} catch (Exception ex) {
log.error("Data change error :: {}", ex.getMessage(), ex);
}
}
}
@ -138,8 +141,6 @@ public class LongPollingService {
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName));
}
final Queue<ClientLongPolling> allSubs;
class ClientLongPolling implements Runnable {
final AsyncContext asyncContext;

@ -1,5 +1,7 @@
package io.dynamic.threadpool.server.service.impl;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.server.mapper.RowMapperManager;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.service.ConfigService;
@ -30,7 +32,7 @@ public class ConfigServiceImpl implements ConfigService {
@Override
public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace) {
ConfigAllInfo configAllInfo = jdbcTemplate.queryForObject(
"select * from config_info where tp_id = ? and item_id = ? and namespace = ?",
"select * from config_info where tp_id = ? and item_id = ? and tenant_id = ?",
new Object[]{tpId, itemId, namespace},
RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER);
@ -47,7 +49,8 @@ public class ConfigServiceImpl implements ConfigService {
}
private Long addConfigInfo(ConfigAllInfo config) {
final String sql = "INSERT INTO `config_info` (`tenant_id`, `item_id`, `tp_id`, `core_size`, `max_size`, `queue_type`, `capacity`, `keep_alive_time`, `content`, `md5`, `is_alarm`, `capacity_alarm`, `liveness_alarm`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);";
final String sql = "INSERT INTO `config_info` (`tenant_id`, `item_id`, `tp_id`, `core_size`, `max_size`, `queue_type`, `capacity`, `keep_alive_time`, `content`, `md5`, `is_alarm`, `capacity_alarm`, `liveness_alarm`) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);";
KeyHolder keyHolder = new GeneratedKeyHolder();
try {
@ -82,10 +85,12 @@ public class ConfigServiceImpl implements ConfigService {
private void updateConfigInfo(ConfigAllInfo config) {
try {
String poolContent = ContentUtil.getPoolContent(config);
String md5 = Md5Util.md5Hex(poolContent, "UTF-8");
jdbcTemplate.update("update config_info set core_size = ?, max_size = ?, queue_type = ?, capacity = ?, keep_alive_time = ?, content = ?, md5 = ?, is_alarm = ?, capacity_alarm = ?, liveness_alarm = ? " +
"where tenant_id = ?, item_id = ?, tp_id = ?",
"where tenant_id = ? and item_id = ? and tp_id = ?",
config.getCoreSize(), config.getMaxSize(), config.getQueueType(), config.getCapacity(), config.getKeepAliveTime(),
config.getContent(), Md5ConfigUtil.getTpContentMd5(config), config.getIsAlarm(), config.getCapacityAlarm(),
poolContent, md5, config.getIsAlarm(), config.getCapacityAlarm(),
config.getLivenessAlarm(), config.getNamespace(), config.getItemId(), config.getTpId());
} catch (Exception ex) {
log.error("[db-error] message :: {}", ex.getMessage(), ex);

@ -18,8 +18,6 @@ public class RequestUtil {
private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
public static String getRemoteIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader(X_FORWARDED_FOR);
if (!StringUtils.isEmpty(xForwardedFor)) {

@ -8,9 +8,6 @@ package io.dynamic.threadpool.server.toolkit;
*/
public class SimpleReadWriteLock {
/**
* Try read lock.
*/
public synchronized boolean tryReadLock() {
if (isWriteLocked()) {
return false;
@ -20,16 +17,10 @@ public class SimpleReadWriteLock {
}
}
/**
* Release the read lock.
*/
public synchronized void releaseReadLock() {
status--;
}
/**
* Try write lock.
*/
public synchronized boolean tryWriteLock() {
if (!isFree()) {
return false;
@ -51,9 +42,5 @@ public class SimpleReadWriteLock {
return status == 0;
}
/**
* Zero means no lock; Negative Numbers mean write locks; Positive Numbers mean read locks, and the numeric value
* represents the number of read locks.
*/
private int status = 0;
}

@ -11,7 +11,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class SingletonRepository<T> {
public SingletonRepository() {
// Initializing size 2^16, the container itself use about 50K of memory, avoiding constant expansion
shared = new ConcurrentHashMap(1 << 16);
}
@ -24,19 +23,12 @@ public class SingletonRepository<T> {
return shared.size();
}
/**
* Be careful use.
*/
public void remove(Object obj) {
shared.remove(obj);
}
private final ConcurrentHashMap<T, T> shared;
/**
* Cache of DataId and Group.
*/
public static class DataIdGroupIdCache {
public static String getSingleton(String str) {

Loading…
Cancel
Save