diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java index 2e7127ba..1915ad04 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java @@ -19,6 +19,9 @@ package cn.hippo4j.config.service; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.design.observer.AbstractSubjectCenter; +import cn.hippo4j.common.design.observer.Observer; +import cn.hippo4j.common.design.observer.ObserverMessage; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.StringUtil; @@ -32,9 +35,8 @@ import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; @@ -47,42 +49,55 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; public class ThreadPoolAdapterService { /** - * Map>>> + * Map>>> */ - private final Map>>> THREAD_POOL_ADAPTER_MAP = Maps.newConcurrentMap(); + private static final Map>>> THREAD_POOL_ADAPTER_MAP = Maps.newConcurrentMap(); - public synchronized void register(List requestParameter) { - for (ThreadPoolAdapterCacheConfig each : requestParameter) { - String mark = each.getMark(); - Map>> actual = THREAD_POOL_ADAPTER_MAP.get(mark); - if (CollectionUtil.isEmpty(actual)) { - actual = Maps.newHashMap(); - THREAD_POOL_ADAPTER_MAP.put(mark, actual); - } - Map> tenantItemMap = actual.get(each.getTenantItemKey()); - if (CollectionUtil.isEmpty(tenantItemMap)) { - tenantItemMap = Maps.newHashMap(); - actual.put(each.getTenantItemKey(), tenantItemMap); - } - List threadPoolAdapterStates = each.getThreadPoolAdapterStates(); - for (ThreadPoolAdapterState adapterState : threadPoolAdapterStates) { - List threadPoolKeyList = tenantItemMap.get(adapterState.getThreadPoolKey()); - if (CollectionUtil.isEmpty(threadPoolKeyList)) { - threadPoolKeyList = Lists.newArrayList(); - tenantItemMap.put(adapterState.getThreadPoolKey(), threadPoolKeyList); + static { + AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearThreadPoolAdapterCache()); + } + + public void register(List requestParameter) { + synchronized (ThreadPoolAdapterService.class) { + for (ThreadPoolAdapterCacheConfig each : requestParameter) { + String mark = each.getMark(); + Map>> actual = THREAD_POOL_ADAPTER_MAP.get(mark); + if (CollectionUtil.isEmpty(actual)) { + actual = Maps.newHashMap(); + THREAD_POOL_ADAPTER_MAP.put(mark, actual); + } + Map> tenantItemMap = actual.get(each.getTenantItemKey()); + if (CollectionUtil.isEmpty(tenantItemMap)) { + tenantItemMap = Maps.newHashMap(); + actual.put(each.getTenantItemKey(), tenantItemMap); + } + List threadPoolAdapterStates = each.getThreadPoolAdapterStates(); + for (ThreadPoolAdapterState adapterState : threadPoolAdapterStates) { + List adapterStateList = tenantItemMap.get(adapterState.getThreadPoolKey()); + if (CollectionUtil.isEmpty(adapterStateList)) { + adapterStateList = Lists.newArrayList(); + tenantItemMap.put(adapterState.getThreadPoolKey(), adapterStateList); + } + Optional first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), each.getClientAddress())).findFirst(); + if (!first.isPresent()) { + ThreadPoolAdapterState state = new ThreadPoolAdapterState(); + state.setClientAddress(each.getClientAddress()); + state.setIdentify(each.getClientIdentify()); + adapterStateList.add(state); + } } - threadPoolKeyList.add(each.getClientAddress()); } } } public List query(ThreadPoolAdapterReqDTO requestParameter) { - List actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark())) + List actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark())) .map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem())) .map(each -> each.get(requestParameter.getThreadPoolKey())) .orElse(Lists.newArrayList()); + List addressList = actual.stream().map(ThreadPoolAdapterState::getClientAddress).collect(Collectors.toList()); List result = Lists.newCopyOnWriteArrayList(); - actual.parallelStream().forEach(each -> { + addressList.parallelStream().forEach(each -> { String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/info").toString(); Map param = Maps.newHashMap(); param.put("mark", requestParameter.getMark()); @@ -99,4 +114,26 @@ public class ThreadPoolAdapterService { }); return result; } + + static class ClearThreadPoolAdapterCache implements Observer { + + @Override + public void accept(ObserverMessage observerMessage) { + log.info("Clean up the configuration cache. Key :: {}", observerMessage.message()); + String identify = observerMessage.message(); + synchronized (ThreadPoolAdapterService.class) { + THREAD_POOL_ADAPTER_MAP.values().forEach(each -> each.forEach((key, val) -> { + val.forEach((threadPoolKey, states) -> { + Iterator iterator = states.iterator(); + while (iterator.hasNext()) { + ThreadPoolAdapterState adapterState = iterator.next(); + if (Objects.equals(adapterState.getIdentify(), identify)) { + iterator.remove(); + } + } + }); + })); + } + } + } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index 64283b1e..a0ba1ac9 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -22,6 +22,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.remote.HttpAgent; @@ -63,6 +64,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { cacheConfig.setMark(val.mark()); String tenantItemKey = properties.getNamespace() + IDENTIFY_SLICER_SYMBOL + properties.getItemId(); cacheConfig.setTenantItemKey(tenantItemKey); + cacheConfig.setClientIdentify(IdentifyUtil.getIdentify()); String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils); cacheConfig.setClientAddress(clientAddress); cacheConfig.setThreadPoolAdapterStates(val.getThreadPoolStates());