feat: 功能持续更新.

pull/161/head
chen.ma 4 years ago
parent e02e528a88
commit 847f87f790

@ -24,6 +24,8 @@ public class Constants {
public static final String WORD_SEPARATOR = Character.toString((char) 2);
public static final String GENERAL_SPLIT_SYMBOL = ",";
public static final String LONGPOLLING_LINE_SEPARATOR = "\r\n";
public static final String BASE_PATH = "/v1/cs";

@ -98,20 +98,25 @@ public class ClientWorker {
*/
class LongPollingRunnable implements Runnable {
@SneakyThrows
private void checkStatus() {
// 服务端状态不正常睡眠 30s
if (!isHealthServer) {
log.error("[Check config] Error. exception message, Thread sleep 30 s.");
Thread.sleep(30000);
}
}
@Override
@SneakyThrows
public void run() {
checkStatus();
List<CacheData> cacheDataList = new ArrayList();
List<CacheData> queryCacheDataList = cacheMap.entrySet()
.stream().map(each -> each.getValue()).collect(Collectors.toList());
List<String> changedTpIds = null;
try {
changedTpIds = checkUpdateDataIds(queryCacheDataList);
} catch (Exception ex) {
log.error("[Long polling] Error. exception message :: {}, Thread sleep 30 s.", ex.getMessage(), ex);
Thread.sleep(30000);
}
List<String> changedTpIds = checkUpdateDataIds(queryCacheDataList);
if (!CollectionUtils.isEmpty(changedTpIds)) {
log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
for (String each : changedTpIds) {
@ -180,7 +185,6 @@ public class ClientWorker {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
Result result = agent.httpPost(Constants.LISTENER_PATH, headers, params, readTimeoutMs);
if (result == null || result.isFail()) {
setHealthServer(false);
log.warn("[check-update] get changed dataId error, code: {}", result == null ? "error" : result.getCode());
} else {
setHealthServer(true);
@ -188,7 +192,7 @@ public class ClientWorker {
}
} catch (Exception ex) {
setHealthServer(false);
log.error("[check-update] get changed dataId exception.", ex);
log.error("[check-update] get changed dataId exception. error message :: {}", ex.getMessage());
}
return Collections.emptyList();
@ -236,6 +240,7 @@ public class ClientWorker {
log.error("[polling-resp] decode modifiedDataIdsString error", e);
}
List<String> updateList = new LinkedList();
for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
if (!StringUtils.isEmpty(dataIdAndGroup)) {
@ -289,10 +294,14 @@ public class ClientWorker {
cacheData = new CacheData(namespace, itemId, tpId);
CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
if (lastCacheData == null) {
// TODO 连接不到 server 端报错
String serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
PoolParameterInfo poolInfo = JSON.parseObject(serverConfig, PoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo));
String serverConfig = null;
try {
serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
PoolParameterInfo poolInfo = JSON.parseObject(serverConfig, PoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo));
} catch (Exception ex) {
log.error("[Cache Data] Error. Service Unavailable :: {}", ex.getMessage());
}
int taskId = cacheMap.size() / Constants.CONFIG_LONG_POLL_TIMEOUT;
cacheData.setTaskId(taskId);

@ -67,7 +67,7 @@ public class ThreadPoolRunListener {
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
}

@ -2,6 +2,7 @@ package io.dynamic.threadpool.starter.toolkit;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
@ -13,19 +14,27 @@ import java.util.concurrent.TimeUnit;
* @author chen.ma
* @date 2021/6/25 17:19
*/
@Slf4j
public class ThreadPoolChangeUtil {
public static void changePool(ThreadPoolExecutor executor, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime) {
if (coreSize != null) {
executor.setCorePoolSize(coreSize);
}
if (maxSize != null) {
executor.setMaximumPoolSize(maxSize);
}
if (capacity != null && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type, queueType)) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(capacity);
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(capacity);
} else {
log.warn("[Pool change] The queue length cannot be modified. Queue type mismatch.");
}
}
if (keepAliveTime != null) {
executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS);
}

@ -11,6 +11,8 @@ import lombok.Data;
@Data
public class ItemUpdateReqDTO {
private String namespace;
private String itemId;
private String itemName;

@ -246,7 +246,8 @@ public class LongPollingService {
private void generateResponse(HttpServletResponse response, List<String> changedGroups) {
if (!CollectionUtils.isEmpty(changedGroups)) {
try {
final String respString = JSON.toJSONString(Results.success(changedGroups));
final String changedGroupKeStr = Md5ConfigUtil.compareMd5ResultString(changedGroups);
final String respString = JSON.toJSONString(Results.success(changedGroupKeStr));
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");

@ -86,7 +86,10 @@ public class ItemServiceImpl implements ItemService {
public void updateItem(ItemUpdateReqDTO reqDTO) {
ItemInfo itemInfo = BeanUtil.convert(reqDTO, ItemInfo.class);
int updateResult = itemInfoMapper.update(itemInfo, Wrappers
.lambdaUpdate(ItemInfo.class).eq(ItemInfo::getItemId, reqDTO.getItemId()));
.lambdaUpdate(ItemInfo.class)
.eq(ItemInfo::getTenantId, reqDTO.getNamespace())
.eq(ItemInfo::getItemId, reqDTO.getItemId()));
boolean retBool = SqlHelper.retBool(updateResult);
if (!retBool) {
throw new RuntimeException("修改失败.");

@ -1,17 +1,22 @@
package io.dynamic.threadpool.server.toolkit;
import io.dynamic.threadpool.common.toolkit.GroupKey;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.service.ConfigCacheService;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import org.springframework.util.StringUtils;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static io.dynamic.threadpool.common.constant.Constants.LINE_SEPARATOR;
import static io.dynamic.threadpool.common.constant.Constants.WORD_SEPARATOR;
/**
* Md5
@ -120,4 +125,28 @@ public class Md5ConfigUtil {
return sb.toString();
}
public static String compareMd5ResultString(List<String> changedGroupKeys) throws IOException {
if (null == changedGroupKeys) {
return "";
}
StringBuilder sb = new StringBuilder();
for (String groupKey : changedGroupKeys) {
String[] dataIdGroupId = GroupKey.parseKey(groupKey);
sb.append(dataIdGroupId[0]);
sb.append(WORD_SEPARATOR);
sb.append(dataIdGroupId[1]);
// if have tenant, then set it
if (dataIdGroupId.length == 3) {
if (org.apache.commons.lang3.StringUtils.isNotBlank(dataIdGroupId[2])) {
sb.append(WORD_SEPARATOR);
sb.append(dataIdGroupId[2]);
}
}
sb.append(LINE_SEPARATOR);
}
return URLEncoder.encode(sb.toString(), "UTF-8");
}
}

Loading…
Cancel
Save