Continuous iterative rocketmq function (#215)

pull/233/head
chen.ma 2 years ago
parent 29baf830e4
commit 8da0063c0e

@ -19,6 +19,9 @@ package cn.hippo4j.config.service;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.design.observer.Observer;
import cn.hippo4j.common.design.observer.ObserverMessage;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
@ -32,9 +35,8 @@ import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.*;
import java.util.Map; import java.util.stream.Collectors;
import java.util.Optional;
import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT; import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
@ -47,42 +49,55 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
public class ThreadPoolAdapterService { public class ThreadPoolAdapterService {
/** /**
* Map<mark, Map<tenantItem, Map<threadPoolKey, List<String>>>> * Map<mark, Map<tenantItem, Map<threadPoolKey, List<ThreadPoolAdapterState>>>>
*/ */
private final Map<String, Map<String, Map<String, List<String>>>> THREAD_POOL_ADAPTER_MAP = Maps.newConcurrentMap(); private static final Map<String, Map<String, Map<String, List<ThreadPoolAdapterState>>>> THREAD_POOL_ADAPTER_MAP = Maps.newConcurrentMap();
public synchronized void register(List<ThreadPoolAdapterCacheConfig> requestParameter) { static {
for (ThreadPoolAdapterCacheConfig each : requestParameter) { AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearThreadPoolAdapterCache());
String mark = each.getMark(); }
Map<String, Map<String, List<String>>> actual = THREAD_POOL_ADAPTER_MAP.get(mark);
if (CollectionUtil.isEmpty(actual)) { public void register(List<ThreadPoolAdapterCacheConfig> requestParameter) {
actual = Maps.newHashMap(); synchronized (ThreadPoolAdapterService.class) {
THREAD_POOL_ADAPTER_MAP.put(mark, actual); for (ThreadPoolAdapterCacheConfig each : requestParameter) {
} String mark = each.getMark();
Map<String, List<String>> tenantItemMap = actual.get(each.getTenantItemKey()); Map<String, Map<String, List<ThreadPoolAdapterState>>> actual = THREAD_POOL_ADAPTER_MAP.get(mark);
if (CollectionUtil.isEmpty(tenantItemMap)) { if (CollectionUtil.isEmpty(actual)) {
tenantItemMap = Maps.newHashMap(); actual = Maps.newHashMap();
actual.put(each.getTenantItemKey(), tenantItemMap); THREAD_POOL_ADAPTER_MAP.put(mark, actual);
} }
List<ThreadPoolAdapterState> threadPoolAdapterStates = each.getThreadPoolAdapterStates(); Map<String, List<ThreadPoolAdapterState>> tenantItemMap = actual.get(each.getTenantItemKey());
for (ThreadPoolAdapterState adapterState : threadPoolAdapterStates) { if (CollectionUtil.isEmpty(tenantItemMap)) {
List<String> threadPoolKeyList = tenantItemMap.get(adapterState.getThreadPoolKey()); tenantItemMap = Maps.newHashMap();
if (CollectionUtil.isEmpty(threadPoolKeyList)) { actual.put(each.getTenantItemKey(), tenantItemMap);
threadPoolKeyList = Lists.newArrayList(); }
tenantItemMap.put(adapterState.getThreadPoolKey(), threadPoolKeyList); List<ThreadPoolAdapterState> threadPoolAdapterStates = each.getThreadPoolAdapterStates();
for (ThreadPoolAdapterState adapterState : threadPoolAdapterStates) {
List<ThreadPoolAdapterState> adapterStateList = tenantItemMap.get(adapterState.getThreadPoolKey());
if (CollectionUtil.isEmpty(adapterStateList)) {
adapterStateList = Lists.newArrayList();
tenantItemMap.put(adapterState.getThreadPoolKey(), adapterStateList);
}
Optional<ThreadPoolAdapterState> first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), each.getClientAddress())).findFirst();
if (!first.isPresent()) {
ThreadPoolAdapterState state = new ThreadPoolAdapterState();
state.setClientAddress(each.getClientAddress());
state.setIdentify(each.getClientIdentify());
adapterStateList.add(state);
}
} }
threadPoolKeyList.add(each.getClientAddress());
} }
} }
} }
public List<ThreadPoolAdapterRespDTO> query(ThreadPoolAdapterReqDTO requestParameter) { public List<ThreadPoolAdapterRespDTO> query(ThreadPoolAdapterReqDTO requestParameter) {
List<String> actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark())) List<ThreadPoolAdapterState> actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark()))
.map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem())) .map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem()))
.map(each -> each.get(requestParameter.getThreadPoolKey())) .map(each -> each.get(requestParameter.getThreadPoolKey()))
.orElse(Lists.newArrayList()); .orElse(Lists.newArrayList());
List<String> addressList = actual.stream().map(ThreadPoolAdapterState::getClientAddress).collect(Collectors.toList());
List<ThreadPoolAdapterRespDTO> result = Lists.newCopyOnWriteArrayList(); List<ThreadPoolAdapterRespDTO> result = Lists.newCopyOnWriteArrayList();
actual.parallelStream().forEach(each -> { addressList.parallelStream().forEach(each -> {
String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/info").toString(); String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/info").toString();
Map<String, Object> param = Maps.newHashMap(); Map<String, Object> param = Maps.newHashMap();
param.put("mark", requestParameter.getMark()); param.put("mark", requestParameter.getMark());
@ -99,4 +114,26 @@ public class ThreadPoolAdapterService {
}); });
return result; return result;
} }
static class ClearThreadPoolAdapterCache implements Observer<String> {
@Override
public void accept(ObserverMessage<String> observerMessage) {
log.info("Clean up the configuration cache. Key :: {}", observerMessage.message());
String identify = observerMessage.message();
synchronized (ThreadPoolAdapterService.class) {
THREAD_POOL_ADAPTER_MAP.values().forEach(each -> each.forEach((key, val) -> {
val.forEach((threadPoolKey, states) -> {
Iterator<ThreadPoolAdapterState> iterator = states.iterator();
while (iterator.hasNext()) {
ThreadPoolAdapterState adapterState = iterator.next();
if (Objects.equals(adapterState.getIdentify(), identify)) {
iterator.remove();
}
}
});
}));
}
}
}
} }

@ -22,6 +22,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -63,6 +64,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner {
cacheConfig.setMark(val.mark()); cacheConfig.setMark(val.mark());
String tenantItemKey = properties.getNamespace() + IDENTIFY_SLICER_SYMBOL + properties.getItemId(); String tenantItemKey = properties.getNamespace() + IDENTIFY_SLICER_SYMBOL + properties.getItemId();
cacheConfig.setTenantItemKey(tenantItemKey); cacheConfig.setTenantItemKey(tenantItemKey);
cacheConfig.setClientIdentify(IdentifyUtil.getIdentify());
String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils); String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils);
cacheConfig.setClientAddress(clientAddress); cacheConfig.setClientAddress(clientAddress);
cacheConfig.setThreadPoolAdapterStates(val.getThreadPoolStates()); cacheConfig.setThreadPoolAdapterStates(val.getThreadPoolStates());

Loading…
Cancel
Save