From cb371069e5b2f2dd421713f44f5acf0690fbaaf2 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Mon, 8 Aug 2022 23:20:45 +0800 Subject: [PATCH] Ali TTL framework thread pool adaptation (#506) --- .../support/DynamicThreadPoolService.java | 17 +++++ .../adpter/DynamicThreadPoolAdapter.java | 52 ++++++++++++++ .../DynamicThreadPoolAdapterChoose.java | 71 +++++++++++++++++++ ...ansmittableThreadLocalExecutorAdapter.java | 53 ++++++++++++++ ...ableThreadLocalExecutorServiceAdapter.java | 53 ++++++++++++++ .../core/DynamicThreadPoolPostProcessor.java | 9 ++- 6 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapter.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorAdapter.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorServiceAdapter.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java index e0dd1544..b967a08c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.core.executor.support; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapter.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapter.java new file mode 100644 index 00000000..b16f3d74 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support.adpter; + +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; + +import java.util.concurrent.Executor; + +/** + * Dynamic thread pool adapter. + */ +public interface DynamicThreadPoolAdapter { + + /** + * Match. + * + * @param executor + * @return + */ + boolean match(Object executor); + + /** + * Unwrap. + * + * @param executor + * @return + */ + DynamicThreadPoolExecutor unwrap(Object executor); + + /** + * Replace. + * + * @param executor + * @param dynamicThreadPoolExecutor + */ + void replace(Object executor, Executor dynamicThreadPoolExecutor); +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java new file mode 100644 index 00000000..c203ee5d --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support.adpter; + +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executor; + +/** + * Dynamic thread pool adapter choose. + */ +public class DynamicThreadPoolAdapterChoose { + + private static List DYNAMIC_THREAD_POOL_ADAPTERS = new ArrayList<>(); + + static { + DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter()); + DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter()); + } + + /** + * Match. + * + * @param executor + * @return + */ + public static boolean match(Object executor) { + return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor)); + } + + /** + * Unwrap. + * + * @param executor + * @return + */ + public static DynamicThreadPoolExecutor unwrap(Object executor) { + Optional dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst(); + return dynamicThreadPoolAdapterOptional.map(each -> each.unwrap(executor)).orElse(null); + } + + /** + * Replace. + * + * @param executor + */ + public static void replace(Object executor, Executor dynamicThreadPoolExecutor) { + Optional dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst(); + if (dynamicThreadPoolAdapterOptional.isPresent()) { + dynamicThreadPoolAdapterOptional.get().replace(executor, dynamicThreadPoolExecutor); + } + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorAdapter.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorAdapter.java new file mode 100644 index 00000000..f51c809c --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorAdapter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support.adpter; + +import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; + +import java.util.Objects; +import java.util.concurrent.Executor; + +/** + * Transmittable thread local executor adapter. + */ +public class TransmittableThreadLocalExecutorAdapter implements DynamicThreadPoolAdapter { + + private static String MATCH_CLASS_NAME = "ExecutorTtlWrapper"; + + private static String FIELD_NAME = "executor"; + + @Override + public boolean match(Object executor) { + return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName()); + } + + @Override + public DynamicThreadPoolExecutor unwrap(Object executor) { + Object unwrap = ReflectUtil.getFieldValue(executor, FIELD_NAME); + if (unwrap != null && unwrap instanceof DynamicThreadPoolExecutor) { + return (DynamicThreadPoolExecutor) unwrap; + } + return null; + } + + @Override + public void replace(Object executor, Executor dynamicThreadPoolExecutor) { + ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorServiceAdapter.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorServiceAdapter.java new file mode 100644 index 00000000..575c9725 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/TransmittableThreadLocalExecutorServiceAdapter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support.adpter; + +import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; + +import java.util.Objects; +import java.util.concurrent.Executor; + +/** + * Transmittable thread local executor service adapter. + */ +public class TransmittableThreadLocalExecutorServiceAdapter implements DynamicThreadPoolAdapter { + + private static String MATCH_CLASS_NAME = "ExecutorServiceTtlWrapper"; + + private static String FIELD_NAME = "executorService"; + + @Override + public boolean match(Object executor) { + return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName()); + } + + @Override + public DynamicThreadPoolExecutor unwrap(Object executor) { + Object unwrap = ReflectUtil.getFieldValue(executor, FIELD_NAME); + if (unwrap != null && unwrap instanceof DynamicThreadPoolExecutor) { + return (DynamicThreadPoolExecutor) unwrap; + } + return null; + } + + @Override + public void replace(Object executor, Executor dynamicThreadPoolExecutor) { + ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor); + } +} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java index 9d6c2fa4..21821849 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java @@ -30,6 +30,7 @@ import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.*; +import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.springboot.starter.config.BootstrapProperties; @@ -69,7 +70,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof DynamicThreadPoolExecutor) { + if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) { DynamicThreadPool dynamicThreadPool; try { dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class); @@ -84,9 +85,13 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { log.error("Failed to create dynamic thread pool in annotation mode.", ex); return bean; } - DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; + DynamicThreadPoolExecutor dynamicThreadPoolExecutor; + if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { + dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; + } DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); + DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); subscribeConfig(dynamicThreadPoolWrapper); return remoteThreadPoolExecutor; }