support MetadataContext transfer in thread poll

pull/422/head
wulingxiao 3 years ago
parent e30851b4d8
commit 4bc6279705

@ -18,9 +18,6 @@
package com.tencent.cloud.metadata.concurrent;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -28,6 +25,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
/**
* {@link MetadataCallable} decorate {@link Callable} to get {@link MetadataContext} value
* and transfer it to the time of {@link Callable} execution, needed when use {@link Callable} to thread pool.
@ -35,7 +35,6 @@ import java.util.stream.Collectors;
* Use factory methods {@link #get} / {@link #gets} to create instance.
*
* @author wlx
* @date 2022/7/8 9:31
*/
public final class MetadataCallable<V> implements Callable<V>,
MetadataWrap<Callable<V>> {
@ -56,7 +55,8 @@ public final class MetadataCallable<V> implements Callable<V>,
MetadataContextHolder.set(metadataContext);
try {
return delegate.call();
} finally {
}
finally {
MetadataContextHolder.set(metadataContextBackup);
}
}
@ -71,7 +71,8 @@ public final class MetadataCallable<V> implements Callable<V>,
public static <V> Callable<V> get(Callable<V> delegate) {
if (null == delegate || delegate instanceof MetadataCallable) {
return delegate;
} else {
}
else {
return new MetadataCallable<>(delegate);
}
}

@ -18,15 +18,15 @@
package com.tencent.cloud.metadata.concurrent;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
/**
* {@link MetadataRunnable} decorate {@link Runnable} to get {@link MetadataContext} value
* and transfer it to the time of {@link Runnable} execution, needed when use {@link Runnable} to thread pool.
@ -35,7 +35,6 @@ import java.util.stream.Collectors;
* <p>
*
* @author wlx
* @date 2022/7/8 9:16
*/
public final class MetadataRunnable implements Runnable,
MetadataWrap<Runnable> {
@ -56,7 +55,8 @@ public final class MetadataRunnable implements Runnable,
MetadataContextHolder.set(metadataContext);
try {
delegate.run();
} finally {
}
finally {
MetadataContextHolder.set(metadataContextBackup);
}
}
@ -70,7 +70,8 @@ public final class MetadataRunnable implements Runnable,
public static Runnable get(Runnable delegate) {
if (null == delegate || delegate instanceof MetadataRunnable) {
return delegate;
} else {
}
else {
return new MetadataRunnable(delegate);
}
}

@ -23,14 +23,12 @@ package com.tencent.cloud.metadata.concurrent;
* <p>
* Used to mark wrapper types, for example:
* <ul>
* <li/ {@link MetadataCallable}>
* <li/ {@link MetadataRunnable}>
* <li>{@link MetadataCallable}</li>
* <li>{@link MetadataRunnable}</li>
* </ul>
*
* @author wlx
* @date 2022/7/9 9:17
*/
public interface MetadataWrap<T> {
/**

@ -18,13 +18,14 @@
package com.tencent.cloud.metadata.concurrent.executor;
import java.util.Objects;
import java.util.concurrent.Executor;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.metadata.concurrent.MetadataRunnable;
import com.tencent.cloud.metadata.concurrent.MetadataWrap;
import org.springframework.lang.NonNull;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.springframework.lang.NonNull;
/**
* {@link MetadataContext} Wrapper of {@link Executor},
@ -32,13 +33,12 @@ import java.util.concurrent.Executor;
* to the execution time of {@link Runnable}.
*
* @author wlx
* * @date 2022/7/8 9:35
*/
class MetadataExecutor implements Executor, MetadataWrap<Executor> {
private final Executor delegate;
public MetadataExecutor(Executor delegate) {
MetadataExecutor(Executor delegate) {
this.delegate = delegate;
}

@ -18,11 +18,6 @@
package com.tencent.cloud.metadata.concurrent.executor;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.metadata.concurrent.MetadataCallable;
import com.tencent.cloud.metadata.concurrent.MetadataRunnable;
import org.springframework.lang.NonNull;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
@ -33,19 +28,24 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.metadata.concurrent.MetadataCallable;
import com.tencent.cloud.metadata.concurrent.MetadataRunnable;
import org.springframework.lang.NonNull;
/**
* {@link MetadataContext} Wrapper of {@link ExecutorService},
* transfer the {@link MetadataContext} from the task submit time of {@link Runnable} or {@link Callable}
* to the execution time of {@link Runnable} or {@link Callable}.
*
* @author wlx
* @date 2022/7/8 9:36
*/
class MetadataExecutorService extends MetadataExecutor implements ExecutorService {
private final ExecutorService delegate;
public MetadataExecutorService(ExecutorService delegate) {
MetadataExecutorService(ExecutorService delegate) {
super(delegate);
this.delegate = delegate;
}

@ -18,20 +18,20 @@
package com.tencent.cloud.metadata.concurrent.executor;
import com.tencent.cloud.metadata.concurrent.MetadataWrap;
import org.springframework.lang.Nullable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import com.tencent.cloud.metadata.concurrent.MetadataWrap;
import org.springframework.lang.Nullable;
/**
* Util methods for Metadata wrapper of jdk executors.
*
* @author wlx
* @date 2022/7/8 11:58
*/
public class MetadataExecutors {
public final class MetadataExecutors {
/**
* wrap Executor instance to MetadataExecutorService instance.
@ -65,8 +65,7 @@ public class MetadataExecutors {
* @param scheduledExecutorService scheduledExecutorService
* @return MetadataScheduledExecutorService instance
*/
public static ScheduledExecutorService getMetadataScheduledExecutorService(ScheduledExecutorService
scheduledExecutorService) {
public static ScheduledExecutorService getMetadataScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
if (null == scheduledExecutorService || isMetadataWrap(scheduledExecutorService)) {
return scheduledExecutorService;
}
@ -98,8 +97,14 @@ public class MetadataExecutors {
*
* @param executor input executor
* @param <T> Executor type
* @return if the parameter executor is MetadataExecutor wrapper
*/
public static <T extends Executor> boolean isMetadataWrap(@Nullable T executor) {
return executor instanceof MetadataWrap;
}
private MetadataExecutors() {
}
}

@ -18,31 +18,31 @@
package com.tencent.cloud.metadata.concurrent.executor;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.metadata.concurrent.MetadataCallable;
import com.tencent.cloud.metadata.concurrent.MetadataRunnable;
import org.springframework.lang.NonNull;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.metadata.concurrent.MetadataCallable;
import com.tencent.cloud.metadata.concurrent.MetadataRunnable;
import org.springframework.lang.NonNull;
/**
* {@link MetadataContext} Wrapper of {@link ScheduledExecutorService},
* transfer the {@link MetadataContext} from the task submit time of {@link Runnable} or {@link Callable}
* to the execution time of {@link Runnable} or {@link Callable}.
*
* @author wlx
* @date 2022/7/8 9:40
*/
class MetadataScheduledExecutorService extends MetadataExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService delegate;
public MetadataScheduledExecutorService(ScheduledExecutorService delegate) {
MetadataScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
this.delegate = delegate;
}
@ -61,15 +61,13 @@ class MetadataScheduledExecutorService extends MetadataExecutorService
@Override
@NonNull
public ScheduledFuture<?> scheduleAtFixedRate(@NonNull Runnable command, long initialDelay,
long period, @NonNull TimeUnit unit) {
public ScheduledFuture<?> scheduleAtFixedRate(@NonNull Runnable command, long initialDelay, long period, @NonNull TimeUnit unit) {
return this.delegate.scheduleAtFixedRate(MetadataRunnable.get(command), initialDelay, period, unit);
}
@Override
@NonNull
public ScheduledFuture<?> scheduleWithFixedDelay(@NonNull Runnable command, long initialDelay,
long delay, @NonNull TimeUnit unit) {
public ScheduledFuture<?> scheduleWithFixedDelay(@NonNull Runnable command, long initialDelay, long delay, @NonNull TimeUnit unit) {
return this.delegate.scheduleAtFixedRate(MetadataRunnable.get(command), initialDelay, delay, unit);
}

@ -18,32 +18,32 @@
package com.tencent.cloud.metadata.concurrent;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* Test for {@link MetadataCallable}.
*
* @author wlx
* @date 2022/7/9 12:11
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,

@ -18,27 +18,27 @@
package com.tencent.cloud.metadata.concurrent;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* Test for {@link MetadataRunnable}.
*
* @author wlx
* @date 2022/7/9 3:28
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,

@ -18,18 +18,20 @@
package com.tencent.cloud.metadata.concurrent;
import java.util.HashMap;
import java.util.Map;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.assertj.core.api.Assertions;
import java.util.HashMap;
import java.util.Map;
/**
* @author wlx
* @date 2022/7/9 3:23
*/
public class MetadataTestUtil {
public final class MetadataTestUtil {
private MetadataTestUtil() {
}
public static void initMetadataContext() {
Map<String, String> customMetadata = new HashMap<>();

@ -18,16 +18,6 @@
package com.tencent.cloud.metadata.concurrent.executor;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -40,13 +30,23 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* Test for {@link MetadataExecutorService}.
*
* @author wlx
* @date 2022/7/9 4:04
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,
@ -89,7 +89,8 @@ public class MetadataExecutorServiceTest {
future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
}
catch (InterruptedException | ExecutionException e) {
return null;
}
}
@ -109,10 +110,6 @@ public class MetadataExecutorServiceTest {
executorService.shutdownNow();
}
@SpringBootApplication
protected static class TestApplication {
}
private List<Callable<Map<String, String>>> getCallableList() {
List<Callable<Map<String, String>>> callableList = new ArrayList<>();
callableList.add(() -> {
@ -123,4 +120,8 @@ public class MetadataExecutorServiceTest {
});
return callableList;
}
@SpringBootApplication
protected static class TestApplication {
}
}

@ -18,6 +18,12 @@
package com.tencent.cloud.metadata.concurrent.executor;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.metadata.concurrent.MetadataTestUtil;
@ -25,23 +31,17 @@ import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* Test for {@link MetadataExecutor}.
*
* @author wlx
* @date 2022/7/9 3:44
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,

@ -18,19 +18,18 @@
package com.tencent.cloud.metadata.concurrent.executor;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.assertj.core.api.Assertions;
import org.junit.Test;
/**
* Test for {@link MetadataExecutors}.
*
* @author wlx
* @date 2022/7/9 4:05
*/
public class MetadataExecutorsTest {

@ -18,30 +18,30 @@
package com.tencent.cloud.metadata.concurrent.executor;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* Test for {@link MetadataScheduledExecutorService}.
*
* @author wlx
* @date 2022/7/9 4:04
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,

@ -38,7 +38,7 @@ import static org.mockito.Mockito.when;
/**
* test for {@link StaticMetadataManager}
* test for {@link StaticMetadataManager}.
*@author lepdou 2022-06-27
*/
@RunWith(MockitoJUnitRunner.class)

@ -30,7 +30,6 @@ import static org.assertj.core.api.Assertions.assertThat;
* Test for {@link PolarisRibbonClientConfiguration}.
*
* @author wlx
* @date 2022/7/2 10:36
*/
public class PolarisRibbonClientConfigurationTest {

Loading…
Cancel
Save