diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ff473387..39d46e4d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,7 +62,7 @@ jobs: version: 17 } steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Build with Maven run: echo y | mvn clean install -Dskip.gpg=true -Dspotless.apply.skip=true -Dmaven.javadoc.skip=true @@ -71,7 +71,7 @@ jobs: name: Test coverage report runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Cache Maven Repos uses: actions/cache@v3 with: diff --git a/agent/pom.xml b/agent/pom.xml index 0d899eab..ac0db601 100644 --- a/agent/pom.xml +++ b/agent/pom.xml @@ -58,7 +58,7 @@ 1.5 true 1.7.25 - 30.1.1-jre + 32.0.0-jre 2.16.0 diff --git a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/ConfigParserHandler.java b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/ConfigParserHandler.java index ea61255a..60d1f752 100644 --- a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/ConfigParserHandler.java +++ b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/ConfigParserHandler.java @@ -34,6 +34,7 @@ public final class ConfigParserHandler { } PARSERS.add(new PropertiesConfigParser()); PARSERS.add(new YamlConfigParser()); + PARSERS.add(new JsonConfigParser()); } public Map parseConfig(String content, String type) throws IOException { diff --git a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java new file mode 100644 index 00000000..c0acf1d9 --- /dev/null +++ b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/parser/JsonConfigParser.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.threadpool.dynamic.mode.config.parser; + +import cn.hippo4j.common.toolkit.CollectionUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Json config parser. + */ +public class JsonConfigParser extends AbstractConfigParser { + private static final ObjectMapper MAPPER; + private static final String DOT = "."; + private static final String LEFT_BRACE = "{"; + private static final String RIGHT_BRACE = "{"; + private static final String LEFT_BRACKET = "["; + private static final String RIGHT_BRACKET = "]"; + + static { + MAPPER = new ObjectMapper(); + } + + public Map doParse(String content, String prefix) throws IOException { + + Map originMap = MAPPER.readValue(content, LinkedHashMap.class); + Map result = new HashMap<>(); + + flatMap(result, originMap, prefix); + return result; + } + + private void flatMap(Map result, Map dataMap, String prefix) { + + if (MapUtils.isEmpty(dataMap)) { + return; + } + + dataMap.forEach((k, v) -> { + String fullKey = genFullKey(prefix, k); + if (v instanceof Map) { + flatMap(result, (Map) v, fullKey); + return; + } else if (v instanceof Collection) { + int count = 0; + for (Object obj : (Collection) v) { + String kk = LEFT_BRACKET + (count++) + RIGHT_BRACKET; + flatMap(result, Collections.singletonMap(kk, obj), fullKey); + } + return; + } + + result.put(fullKey, v); + }); + } + + private String genFullKey(String prefix, String key) { + if (StringUtils.isEmpty(prefix)) { + return key; + } + return key.startsWith(LEFT_BRACE) ? prefix.concat(key) : prefix.concat(DOT).concat(key); + } + + @Override + public Map doParse(String content) throws IOException { + if (StringUtils.isEmpty(content)) { + return new HashMap<>(1); + } + + return doParse(content,""); + } + + @Override + public List getConfigFileTypes() { + return CollectionUtil.newArrayList(ConfigFileTypeEnum.JSON, ConfigFileTypeEnum.JSON); + } +} diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/monitor/ThreadPoolMonitorExecutor.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/monitor/ThreadPoolMonitorExecutor.java index a9cce9b0..8cdec513 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/monitor/ThreadPoolMonitorExecutor.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/monitor/ThreadPoolMonitorExecutor.java @@ -24,7 +24,6 @@ import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.config.ApplicationContextHolder; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties; -import cn.hippo4j.threadpool.monitor.api.DynamicThreadPoolMonitor; import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -53,6 +52,10 @@ public class ThreadPoolMonitorExecutor implements ApplicationRunner, DisposableB private List threadPoolMonitors; + static { + ServiceLoaderRegistry.register(ThreadPoolMonitor.class); + } + @Override public void run(ApplicationArguments args) throws Exception { MonitorProperties monitor = properties.getMonitor(); @@ -70,8 +73,8 @@ public class ThreadPoolMonitorExecutor implements ApplicationRunner, DisposableB // Get dynamic thread pool monitoring component. List collectTypes = Arrays.asList(monitor.getCollectTypes().split(",")); ApplicationContextHolder.getBeansOfType(ThreadPoolMonitor.class).forEach((beanName, bean) -> threadPoolMonitors.add(bean)); - Collection dynamicThreadPoolMonitors = - ServiceLoaderRegistry.getSingletonServiceInstances(DynamicThreadPoolMonitor.class); + Collection dynamicThreadPoolMonitors = + ServiceLoaderRegistry.getSingletonServiceInstances(ThreadPoolMonitor.class); dynamicThreadPoolMonitors.stream().filter(each -> collectTypes.contains(each.getType())).forEach(each -> threadPoolMonitors.add(each)); // Execute dynamic thread pool monitoring component. collectScheduledExecutor.scheduleWithFixedDelay( diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ConsulRefresherHandler.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ConsulRefresherHandler.java index ea2cca4e..0f0dd23b 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ConsulRefresherHandler.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ConsulRefresherHandler.java @@ -41,6 +41,8 @@ public class ConsulRefresherHandler extends AbstractConfigThreadPoolDynamicRefre private static final int INITIAL_CAPACITY = 64; + private static final String DEFAULT_CONTEXT = "spring.cloud.consul.config.default-context"; + @EventListener(EnvironmentChangeEvent.class) public void refreshed(EnvironmentChangeEvent event) { Map configInfo = extractLatestConfigInfo(event); @@ -70,7 +72,7 @@ public class ConsulRefresherHandler extends AbstractConfigThreadPoolDynamicRefre private CharSequence getApplicationConfigDefaultContext(AbstractEnvironment environment) { return environment.getPropertySources().stream() .filter(propertySource -> propertySource instanceof OriginTrackedMapPropertySource) - .map(propertySource -> ((Map) propertySource.getSource()).get("spring.cloud.consul.config.default-context")) + .map(propertySource -> ((Map) propertySource.getSource()).get(DEFAULT_CONTEXT)) .findFirst().orElse(StringUtils.EMPTY); } diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java index b6979c9b..004c1f12 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java @@ -17,7 +17,6 @@ package cn.hippo4j.config.springboot.starter.refresher; -import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; import io.etcd.jetcd.ByteSequence; @@ -65,7 +64,6 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh String key = etcd.get(KEY); Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET)); initClient(etcd, charset); - // TODO Currently only supports json GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get(); KeyValue keyValue = getResponse.getKvs().get(0); if (Objects.isNull(keyValue)) { @@ -77,12 +75,10 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh public void onNext(WatchResponse response) { WatchEvent watchEvent = response.getEvents().get(0); WatchEvent.EventType eventType = watchEvent.getEventType(); - // todo Currently only supports json if (Objects.equals(eventType, WatchEvent.EventType.PUT)) { KeyValue keyValue1 = watchEvent.getKeyValue(); String value = keyValue1.getValue().toString(charset); - Map map = JSONUtil.parseObject(value, Map.class); - dynamicRefresh(keyValue1.getKey().toString(charset), map); + dynamicRefresh(value); } } @@ -105,7 +101,6 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh * @param charset charset */ private void initClient(Map etcd, Charset charset) { - // TODO if (Objects.isNull(client)) { String user = etcd.get(USER); String password = etcd.get(PASSWORD); diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index 25bb91e6..1fbac959 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -166,7 +166,6 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener changeKeys = new ArrayList<>(); Map> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs(); @@ -179,7 +178,6 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener Callable wrap(Callable var1); + + protected abstract Runnable wrap(Runnable var1); + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return this.delegate().awaitTermination(timeout, unit); + } + + public List> invokeAll(Collection> tasks) throws InterruptedException { + return this.delegate().invokeAll(this.wrap(tasks)); + } + + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return this.delegate().invokeAll(this.wrap(tasks), timeout, unit); + } + + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return this.delegate().invokeAny(this.wrap(tasks)); + } + + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.delegate().invokeAny(this.wrap(tasks), timeout, unit); + } + + public boolean isShutdown() { + return this.delegate().isShutdown(); + } + + public boolean isTerminated() { + return this.delegate().isTerminated(); + } + + public void shutdown() { + this.delegate().shutdown(); + } + + public List shutdownNow() { + return this.delegate().shutdownNow(); + } + + public void execute(Runnable task) { + this.delegate().execute(this.wrap(task)); + } + + public Future submit(Callable task) { + return this.delegate().submit(this.wrap(task)); + } + + public Future submit(Runnable task) { + return this.delegate().submit(this.wrap(task)); + } + + public Future submit(Runnable task, T result) { + return this.delegate().submit(this.wrap(task), result); + } + + Collection> wrap(Collection> tasks) { + ArrayList> result = new ArrayList(tasks.size()); + Iterator var3 = tasks.iterator(); + + while (var3.hasNext()) { + Callable task = (Callable) var3.next(); + result.add(this.wrap(task)); + } + + return result; + } +} diff --git a/threadpool/core/src/test/java/cn/hippo4j/core/CustomWrappingExecutorService.java b/threadpool/core/src/test/java/cn/hippo4j/core/CustomWrappingExecutorService.java new file mode 100644 index 00000000..ebc6a622 --- /dev/null +++ b/threadpool/core/src/test/java/cn/hippo4j/core/CustomWrappingExecutorService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core; + +import brave.internal.WrappingExecutorService; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +public class CustomWrappingExecutorService extends WrappingExecutorService { + + ExecutorService delegate; + + public CustomWrappingExecutorService() { + } + + public CustomWrappingExecutorService(ExecutorService delegate) { + this.delegate = delegate; + } + + @Override + public ExecutorService delegate() { + return delegate; + } + + @Override + protected Callable wrap(Callable callable) { + return callable; + } + + @Override + protected Runnable wrap(Runnable runnable) { + return runnable; + } +} diff --git a/threadpool/core/src/test/java/cn/hippo4j/core/adapter/ZipkinExecutorAdapterTest.java b/threadpool/core/src/test/java/cn/hippo4j/core/adapter/ZipkinExecutorAdapterTest.java new file mode 100644 index 00000000..eb27eec4 --- /dev/null +++ b/threadpool/core/src/test/java/cn/hippo4j/core/adapter/ZipkinExecutorAdapterTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.adapter; + +import cn.hippo4j.common.executor.support.RunsOldestTaskPolicy; +import cn.hippo4j.common.handler.ZipkinExecutorAdapter; +import cn.hippo4j.core.CustomWrappingExecutorService; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.scheduling.concurrent.DefaultManagedAwareThreadFactory; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for ${@link ZipkinExecutorAdapter} + * */ +@Slf4j +public class ZipkinExecutorAdapterTest { + + ZipkinExecutorAdapter zipkinExecutorAdapter = new ZipkinExecutorAdapter(); + Executor dynamicThreadPool = ThreadPoolBuilder.builder() + .threadPoolId("test") + .dynamicPool() + .corePoolSize(1) + .maximumPoolSize(2) + .keepAliveTime(1000) + .timeUnit(TimeUnit.MICROSECONDS) + .threadFactory(new DefaultManagedAwareThreadFactory()) + .workQueue(new SynchronousQueue<>()) + .rejected(new RunsOldestTaskPolicy()) + .build(); + + @Test + public void testMatch() { + Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool()); + Assert.assertTrue(zipkinExecutorAdapter.match(executor)); + } + + @Test + public void testUnwrap() { + Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool()); + ThreadPoolExecutor unwrap = zipkinExecutorAdapter.unwrap(executor); + Assert.assertNull(unwrap); + } + + @Test + public void testReplace() { + Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool()); + CustomWrappingExecutorService executorChange = (CustomWrappingExecutorService)executor; + ExecutorService beforeReplace = executorChange.delegate(); + zipkinExecutorAdapter.replace(executor, dynamicThreadPool); + ExecutorService afterReplace = executorChange.delegate(); + + Assert.assertNotSame(beforeReplace, afterReplace); + Assert.assertSame(afterReplace, dynamicThreadPool); + } +} \ No newline at end of file diff --git a/threadpool/monitor/micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java b/threadpool/monitor/micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java index 5bfe747c..2263b324 100644 --- a/threadpool/monitor/micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java +++ b/threadpool/monitor/micrometer/src/main/java/cn/hippo4j/monitor/micrometer/DynamicThreadPoolMicrometerMonitorHandler.java @@ -17,12 +17,14 @@ package cn.hippo4j.monitor.micrometer; +import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry; import cn.hippo4j.core.config.ApplicationContextHolder; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor; +import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor; import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tag; @@ -48,6 +50,10 @@ public class DynamicThreadPoolMicrometerMonitorHandler extends AbstractDynamicTh super(handler); } + static { + ServiceLoaderRegistry.register(ThreadPoolMonitor.class); + } + @Override protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) { ThreadPoolRunStateInfo stateInfo = runStateCache.get(poolRunStateInfo.getTpId());