Merge pull request #507 from mabaiwan/develop

Ali TTL framework thread pool adaptation (#506)
pull/511/head
小马哥 2 years ago committed by GitHub
commit e489651ab2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.concurrent.Executor;
/**
* Dynamic thread pool adapter.
*/
public interface DynamicThreadPoolAdapter {
/**
* Match.
*
* @param executor
* @return
*/
boolean match(Object executor);
/**
* Unwrap.
*
* @param executor
* @return
*/
DynamicThreadPoolExecutor unwrap(Object executor);
/**
* Replace.
*
* @param executor
* @param dynamicThreadPoolExecutor
*/
void replace(Object executor, Executor dynamicThreadPoolExecutor);
}

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
/**
* Dynamic thread pool adapter choose.
*/
public class DynamicThreadPoolAdapterChoose {
private static List<DynamicThreadPoolAdapter> DYNAMIC_THREAD_POOL_ADAPTERS = new ArrayList<>();
static {
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter());
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter());
}
/**
* Match.
*
* @param executor
* @return
*/
public static boolean match(Object executor) {
return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor));
}
/**
* Unwrap.
*
* @param executor
* @return
*/
public static DynamicThreadPoolExecutor unwrap(Object executor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
return dynamicThreadPoolAdapterOptional.map(each -> each.unwrap(executor)).orElse(null);
}
/**
* Replace.
*
* @param executor
*/
public static void replace(Object executor, Executor dynamicThreadPoolExecutor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
if (dynamicThreadPoolAdapterOptional.isPresent()) {
dynamicThreadPoolAdapterOptional.get().replace(executor, dynamicThreadPoolExecutor);
}
}
}

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.Objects;
import java.util.concurrent.Executor;
/**
* Transmittable thread local executor adapter.
*/
public class TransmittableThreadLocalExecutorAdapter implements DynamicThreadPoolAdapter {
private static String MATCH_CLASS_NAME = "ExecutorTtlWrapper";
private static String FIELD_NAME = "executor";
@Override
public boolean match(Object executor) {
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}
@Override
public DynamicThreadPoolExecutor unwrap(Object executor) {
Object unwrap = ReflectUtil.getFieldValue(executor, FIELD_NAME);
if (unwrap != null && unwrap instanceof DynamicThreadPoolExecutor) {
return (DynamicThreadPoolExecutor) unwrap;
}
return null;
}
@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor);
}
}

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.Objects;
import java.util.concurrent.Executor;
/**
* Transmittable thread local executor service adapter.
*/
public class TransmittableThreadLocalExecutorServiceAdapter implements DynamicThreadPoolAdapter {
private static String MATCH_CLASS_NAME = "ExecutorServiceTtlWrapper";
private static String FIELD_NAME = "executorService";
@Override
public boolean match(Object executor) {
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}
@Override
public DynamicThreadPoolExecutor unwrap(Object executor) {
Object unwrap = ReflectUtil.getFieldValue(executor, FIELD_NAME);
if (unwrap != null && unwrap instanceof DynamicThreadPoolExecutor) {
return (DynamicThreadPoolExecutor) unwrap;
}
return null;
}
@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor);
}
}

@ -30,6 +30,7 @@ import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*; import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
@ -69,7 +70,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolExecutor) { if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {
DynamicThreadPool dynamicThreadPool; DynamicThreadPool dynamicThreadPool;
try { try {
dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class); dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
@ -84,9 +85,13 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex); log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean; return bean;
} }
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
subscribeConfig(dynamicThreadPoolWrapper); subscribeConfig(dynamicThreadPoolWrapper);
return remoteThreadPoolExecutor; return remoteThreadPoolExecutor;
} }

Loading…
Cancel
Save