|
|
|
@ -17,11 +17,10 @@
|
|
|
|
|
|
|
|
|
|
package com.tencent.cloud.polaris.router;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
import java.util.function.Function;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
import com.tencent.cloud.common.metadata.MetadataContext;
|
|
|
|
@ -30,6 +29,7 @@ import com.tencent.polaris.api.pojo.DefaultServiceInstances;
|
|
|
|
|
import com.tencent.polaris.api.pojo.Instance;
|
|
|
|
|
import com.tencent.polaris.api.pojo.ServiceInstances;
|
|
|
|
|
import com.tencent.polaris.api.pojo.ServiceKey;
|
|
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
@ -57,33 +57,19 @@ public final class RouterUtils {
|
|
|
|
|
* @return ServiceInstances
|
|
|
|
|
*/
|
|
|
|
|
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) {
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
|
|
|
|
|
AtomicReference<List<Instance>> instancesRef = new AtomicReference<>();
|
|
|
|
|
servers.subscribe(serviceInstances -> {
|
|
|
|
|
instancesRef.set(serviceInstances
|
|
|
|
|
.stream()
|
|
|
|
|
List<Instance> instanceList = Collections.synchronizedList(new ArrayList<>());
|
|
|
|
|
servers.flatMap((Function<List<ServiceInstance>, Publisher<?>>) serviceInstances ->
|
|
|
|
|
Flux.fromIterable(serviceInstances.stream()
|
|
|
|
|
.map(instanceTransformer::transform)
|
|
|
|
|
.collect(Collectors.toList()));
|
|
|
|
|
|
|
|
|
|
latch.countDown();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
latch.await(WAIT_TIME, TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
catch (InterruptedException e) {
|
|
|
|
|
LOGGER.error("Wait get instance result error. ", e);
|
|
|
|
|
}
|
|
|
|
|
.collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance));
|
|
|
|
|
|
|
|
|
|
String serviceName = "";
|
|
|
|
|
if (!CollectionUtils.isEmpty(instancesRef.get())) {
|
|
|
|
|
serviceName = instancesRef.get().get(0).getService();
|
|
|
|
|
if (!CollectionUtils.isEmpty(instanceList)) {
|
|
|
|
|
serviceName = instanceList.get(0).getService();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ServiceKey serviceKey = new ServiceKey(MetadataContext.LOCAL_NAMESPACE, serviceName);
|
|
|
|
|
List<Instance> instances = instancesRef.get() == null ? Collections.emptyList() : instancesRef.get();
|
|
|
|
|
|
|
|
|
|
return new DefaultServiceInstances(serviceKey, instances);
|
|
|
|
|
return new DefaultServiceInstances(serviceKey, instanceList);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|