Merge branch 'opengoofy:develop' into develop

pull/1567/head
Pan_Yujie 1 year ago committed by GitHub
commit 472effafb8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -62,7 +62,7 @@ jobs:
version: 17
}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Build with Maven
run: echo y | mvn clean install -Dskip.gpg=true -Dspotless.apply.skip=true -Dmaven.javadoc.skip=true
@ -71,7 +71,7 @@ jobs:
name: Test coverage report
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Cache Maven Repos
uses: actions/cache@v3
with:

@ -58,7 +58,7 @@
<gmaven-plugin.version>1.5</gmaven-plugin.version>
<checkstyle.fails.on.error>true</checkstyle.fails.on.error>
<slf4j.version>1.7.25</slf4j.version>
<guava.version>30.1.1-jre</guava.version>
<guava.version>32.0.0-jre</guava.version>
<wiremock.version>2.16.0</wiremock.version>
</properties>

@ -34,6 +34,7 @@ public final class ConfigParserHandler {
}
PARSERS.add(new PropertiesConfigParser());
PARSERS.add(new YamlConfigParser());
PARSERS.add(new JsonConfigParser());
}
public Map<Object, Object> parseConfig(String content, String type) throws IOException {

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.threadpool.dynamic.mode.config.parser;
import cn.hippo4j.common.toolkit.CollectionUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Json config parser.
*/
public class JsonConfigParser extends AbstractConfigParser {
private static final ObjectMapper MAPPER;
private static final String DOT = ".";
private static final String LEFT_BRACE = "{";
private static final String RIGHT_BRACE = "{";
private static final String LEFT_BRACKET = "[";
private static final String RIGHT_BRACKET = "]";
static {
MAPPER = new ObjectMapper();
}
public Map<Object, Object> doParse(String content, String prefix) throws IOException {
Map<String, Object> originMap = MAPPER.readValue(content, LinkedHashMap.class);
Map<Object, Object> result = new HashMap<>();
flatMap(result, originMap, prefix);
return result;
}
private void flatMap(Map<Object, Object> result, Map<String, Object> dataMap, String prefix) {
if (MapUtils.isEmpty(dataMap)) {
return;
}
dataMap.forEach((k, v) -> {
String fullKey = genFullKey(prefix, k);
if (v instanceof Map) {
flatMap(result, (Map<String, Object>) v, fullKey);
return;
} else if (v instanceof Collection) {
int count = 0;
for (Object obj : (Collection<Object>) v) {
String kk = LEFT_BRACKET + (count++) + RIGHT_BRACKET;
flatMap(result, Collections.singletonMap(kk, obj), fullKey);
}
return;
}
result.put(fullKey, v);
});
}
private String genFullKey(String prefix, String key) {
if (StringUtils.isEmpty(prefix)) {
return key;
}
return key.startsWith(LEFT_BRACE) ? prefix.concat(key) : prefix.concat(DOT).concat(key);
}
@Override
public Map<Object, Object> doParse(String content) throws IOException {
if (StringUtils.isEmpty(content)) {
return new HashMap<>(1);
}
return doParse(content,"");
}
@Override
public List<ConfigFileTypeEnum> getConfigFileTypes() {
return CollectionUtil.newArrayList(ConfigFileTypeEnum.JSON, ConfigFileTypeEnum.JSON);
}
}

@ -24,7 +24,6 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties;
import cn.hippo4j.threadpool.monitor.api.DynamicThreadPoolMonitor;
import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -53,6 +52,10 @@ public class ThreadPoolMonitorExecutor implements ApplicationRunner, DisposableB
private List<ThreadPoolMonitor> threadPoolMonitors;
static {
ServiceLoaderRegistry.register(ThreadPoolMonitor.class);
}
@Override
public void run(ApplicationArguments args) throws Exception {
MonitorProperties monitor = properties.getMonitor();
@ -70,8 +73,8 @@ public class ThreadPoolMonitorExecutor implements ApplicationRunner, DisposableB
// Get dynamic thread pool monitoring component.
List<String> collectTypes = Arrays.asList(monitor.getCollectTypes().split(","));
ApplicationContextHolder.getBeansOfType(ThreadPoolMonitor.class).forEach((beanName, bean) -> threadPoolMonitors.add(bean));
Collection<DynamicThreadPoolMonitor> dynamicThreadPoolMonitors =
ServiceLoaderRegistry.getSingletonServiceInstances(DynamicThreadPoolMonitor.class);
Collection<ThreadPoolMonitor> dynamicThreadPoolMonitors =
ServiceLoaderRegistry.getSingletonServiceInstances(ThreadPoolMonitor.class);
dynamicThreadPoolMonitors.stream().filter(each -> collectTypes.contains(each.getType())).forEach(each -> threadPoolMonitors.add(each));
// Execute dynamic thread pool monitoring component.
collectScheduledExecutor.scheduleWithFixedDelay(

@ -41,6 +41,8 @@ public class ConsulRefresherHandler extends AbstractConfigThreadPoolDynamicRefre
private static final int INITIAL_CAPACITY = 64;
private static final String DEFAULT_CONTEXT = "spring.cloud.consul.config.default-context";
@EventListener(EnvironmentChangeEvent.class)
public void refreshed(EnvironmentChangeEvent event) {
Map<String, Object> configInfo = extractLatestConfigInfo(event);
@ -70,7 +72,7 @@ public class ConsulRefresherHandler extends AbstractConfigThreadPoolDynamicRefre
private CharSequence getApplicationConfigDefaultContext(AbstractEnvironment environment) {
return environment.getPropertySources().stream()
.filter(propertySource -> propertySource instanceof OriginTrackedMapPropertySource)
.map(propertySource -> ((Map<String, CharSequence>) propertySource.getSource()).get("spring.cloud.consul.config.default-context"))
.map(propertySource -> ((Map<String, CharSequence>) propertySource.getSource()).get(DEFAULT_CONTEXT))
.findFirst().orElse(StringUtils.EMPTY);
}

@ -17,7 +17,6 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import io.etcd.jetcd.ByteSequence;
@ -65,7 +64,6 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
String key = etcd.get(KEY);
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
initClient(etcd, charset);
// TODO Currently only supports json
GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get();
KeyValue keyValue = getResponse.getKvs().get(0);
if (Objects.isNull(keyValue)) {
@ -77,12 +75,10 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
public void onNext(WatchResponse response) {
WatchEvent watchEvent = response.getEvents().get(0);
WatchEvent.EventType eventType = watchEvent.getEventType();
// todo Currently only supports json
if (Objects.equals(eventType, WatchEvent.EventType.PUT)) {
KeyValue keyValue1 = watchEvent.getKeyValue();
String value = keyValue1.getValue().toString(charset);
Map map = JSONUtil.parseObject(value, Map.class);
dynamicRefresh(keyValue1.getKey().toString(charset), map);
dynamicRefresh(value);
}
}
@ -105,7 +101,6 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
* @param charset charset
*/
private void initClient(Map<String, String> etcd, Charset charset) {
// TODO
if (Objects.isNull(client)) {
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);

@ -166,7 +166,6 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
private void checkNotifyConsistencyAndReplace(ExecutorProperties executorProperties) {
boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false;
List<String> changeKeys = new ArrayList<>();
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap =
configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs();
@ -179,7 +178,6 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
for (NotifyConfigDTO notifyConfig : each.getValue()) {
if (!notifyConfigDTOS.contains(notifyConfig)) {
checkNotifyConfig = true;
changeKeys.add(each.getKey());
break;
}
}
@ -194,10 +192,9 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
// FIXME Compare using Objects.equals
if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm())
|| (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm())
|| (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) {
if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm()))
|| (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm()))
|| (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm()));
threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm()));

@ -87,6 +87,10 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
*/
private ScheduledThreadPoolExecutor collectVesselExecutor;
static {
ServiceLoaderRegistry.register(ThreadPoolMonitor.class);
}
@SneakyThrows
@Override
public void run() {

@ -46,7 +46,7 @@ public class BeforeCheckConfiguration {
// TODO test
boolean checkFlag = properties != null && properties.getEnable();
if (checkFlag) {
String propertiesClassName = properties.getClass().getName();
String propertiesClassName = properties.getClass().getSuperclass() == Object.class ? properties.getClass().getName() : properties.getClass().getSuperclass().getName();
switch (propertiesClassName) {
case bootstrapPropertiesClassName: {
String namespace = properties.getNamespace();

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package brave.internal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public abstract class WrappingExecutorService implements ExecutorService {
protected WrappingExecutorService() {
}
protected abstract ExecutorService delegate();
protected abstract <R> Callable<R> wrap(Callable<R> var1);
protected abstract Runnable wrap(Runnable var1);
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return this.delegate().awaitTermination(timeout, unit);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return this.delegate().invokeAll(this.wrap(tasks));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return this.delegate().invokeAll(this.wrap(tasks), timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return this.delegate().invokeAny(this.wrap(tasks));
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.delegate().invokeAny(this.wrap(tasks), timeout, unit);
}
public boolean isShutdown() {
return this.delegate().isShutdown();
}
public boolean isTerminated() {
return this.delegate().isTerminated();
}
public void shutdown() {
this.delegate().shutdown();
}
public List<Runnable> shutdownNow() {
return this.delegate().shutdownNow();
}
public void execute(Runnable task) {
this.delegate().execute(this.wrap(task));
}
public <T> Future<T> submit(Callable<T> task) {
return this.delegate().submit(this.wrap(task));
}
public Future<?> submit(Runnable task) {
return this.delegate().submit(this.wrap(task));
}
public <T> Future<T> submit(Runnable task, T result) {
return this.delegate().submit(this.wrap(task), result);
}
<T> Collection<? extends Callable<T>> wrap(Collection<? extends Callable<T>> tasks) {
ArrayList<Callable<T>> result = new ArrayList(tasks.size());
Iterator var3 = tasks.iterator();
while (var3.hasNext()) {
Callable<T> task = (Callable) var3.next();
result.add(this.wrap(task));
}
return result;
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core;
import brave.internal.WrappingExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
public class CustomWrappingExecutorService extends WrappingExecutorService {
ExecutorService delegate;
public CustomWrappingExecutorService() {
}
public CustomWrappingExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public ExecutorService delegate() {
return delegate;
}
@Override
protected <R> Callable<R> wrap(Callable<R> callable) {
return callable;
}
@Override
protected Runnable wrap(Runnable runnable) {
return runnable;
}
}

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.adapter;
import cn.hippo4j.common.executor.support.RunsOldestTaskPolicy;
import cn.hippo4j.common.handler.ZipkinExecutorAdapter;
import cn.hippo4j.core.CustomWrappingExecutorService;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.scheduling.concurrent.DefaultManagedAwareThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for ${@link ZipkinExecutorAdapter}
* */
@Slf4j
public class ZipkinExecutorAdapterTest {
ZipkinExecutorAdapter zipkinExecutorAdapter = new ZipkinExecutorAdapter();
Executor dynamicThreadPool = ThreadPoolBuilder.builder()
.threadPoolId("test")
.dynamicPool()
.corePoolSize(1)
.maximumPoolSize(2)
.keepAliveTime(1000)
.timeUnit(TimeUnit.MICROSECONDS)
.threadFactory(new DefaultManagedAwareThreadFactory())
.workQueue(new SynchronousQueue<>())
.rejected(new RunsOldestTaskPolicy())
.build();
@Test
public void testMatch() {
Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool());
Assert.assertTrue(zipkinExecutorAdapter.match(executor));
}
@Test
public void testUnwrap() {
Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool());
ThreadPoolExecutor unwrap = zipkinExecutorAdapter.unwrap(executor);
Assert.assertNull(unwrap);
}
@Test
public void testReplace() {
Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool());
CustomWrappingExecutorService executorChange = (CustomWrappingExecutorService)executor;
ExecutorService beforeReplace = executorChange.delegate();
zipkinExecutorAdapter.replace(executor, dynamicThreadPool);
ExecutorService afterReplace = executorChange.delegate();
Assert.assertNotSame(beforeReplace, afterReplace);
Assert.assertSame(afterReplace, dynamicThreadPool);
}
}

@ -17,12 +17,14 @@
package cn.hippo4j.monitor.micrometer;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.monitor.base.AbstractDynamicThreadPoolMonitor;
import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor;
import cn.hippo4j.threadpool.monitor.support.MonitorTypeEnum;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
@ -48,6 +50,10 @@ public class DynamicThreadPoolMicrometerMonitorHandler extends AbstractDynamicTh
super(handler);
}
static {
ServiceLoaderRegistry.register(ThreadPoolMonitor.class);
}
@Override
protected void execute(ThreadPoolRunStateInfo poolRunStateInfo) {
ThreadPoolRunStateInfo stateInfo = runStateCache.get(poolRunStateInfo.getTpId());

Loading…
Cancel
Save