|
|
|
@ -1,11 +1,11 @@
|
|
|
|
|
package cn.hippo4j.config.notify;
|
|
|
|
|
|
|
|
|
|
import cn.hippo4j.config.notify.listener.SmartSubscriber;
|
|
|
|
|
import cn.hippo4j.config.notify.listener.Subscriber;
|
|
|
|
|
import cn.hippo4j.config.event.AbstractEvent;
|
|
|
|
|
import cn.hippo4j.config.notify.listener.AbstractSmartSubscriber;
|
|
|
|
|
import cn.hippo4j.config.notify.listener.AbstractSubscriber;
|
|
|
|
|
import cn.hippo4j.config.toolkit.ClassUtil;
|
|
|
|
|
import cn.hippo4j.config.toolkit.MapUtil;
|
|
|
|
|
import cn.hippo4j.config.event.Event;
|
|
|
|
|
import cn.hippo4j.config.event.SlowEvent;
|
|
|
|
|
import cn.hippo4j.config.event.AbstractSlowEvent;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
|
|
import java.util.Map;
|
|
|
|
@ -33,7 +33,7 @@ public class NotifyCenter {
|
|
|
|
|
|
|
|
|
|
private static EventPublisher eventPublisher = new DefaultPublisher();
|
|
|
|
|
|
|
|
|
|
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null;
|
|
|
|
|
private static BiFunction<Class<? extends AbstractEvent>, Integer, EventPublisher> publisherFactory = null;
|
|
|
|
|
|
|
|
|
|
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap(16);
|
|
|
|
|
|
|
|
|
@ -50,13 +50,13 @@ public class NotifyCenter {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
INSTANCE.sharePublisher = new DefaultSharePublisher();
|
|
|
|
|
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
|
|
|
|
|
INSTANCE.sharePublisher.init(AbstractSlowEvent.class, shareBufferSize);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static void registerSubscriber(final Subscriber consumer) {
|
|
|
|
|
if (consumer instanceof SmartSubscriber) {
|
|
|
|
|
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
|
|
|
|
|
if (ClassUtil.isAssignableFrom(SlowEvent.class, subscribeType)) {
|
|
|
|
|
public static void registerSubscriber(final AbstractSubscriber consumer) {
|
|
|
|
|
if (consumer instanceof AbstractSmartSubscriber) {
|
|
|
|
|
for (Class<? extends AbstractEvent> subscribeType : ((AbstractSmartSubscriber) consumer).subscribeTypes()) {
|
|
|
|
|
if (ClassUtil.isAssignableFrom(AbstractSlowEvent.class, subscribeType)) {
|
|
|
|
|
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
|
|
|
|
|
} else {
|
|
|
|
|
addSubscriber(consumer, subscribeType);
|
|
|
|
@ -65,8 +65,8 @@ public class NotifyCenter {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final Class<? extends Event> subscribeType = consumer.subscribeType();
|
|
|
|
|
if (ClassUtil.isAssignableFrom(SlowEvent.class, subscribeType)) {
|
|
|
|
|
final Class<? extends AbstractEvent> subscribeType = consumer.subscribeType();
|
|
|
|
|
if (ClassUtil.isAssignableFrom(AbstractSlowEvent.class, subscribeType)) {
|
|
|
|
|
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -74,7 +74,7 @@ public class NotifyCenter {
|
|
|
|
|
addSubscriber(consumer, subscribeType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
|
|
|
|
|
private static void addSubscriber(final AbstractSubscriber consumer, Class<? extends AbstractEvent> subscribeType) {
|
|
|
|
|
final String topic = ClassUtil.getCanonicalName(subscribeType);
|
|
|
|
|
synchronized (NotifyCenter.class) {
|
|
|
|
|
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
|
|
|
|
@ -83,7 +83,7 @@ public class NotifyCenter {
|
|
|
|
|
publisher.addSubscriber(consumer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static boolean publishEvent(final Event event) {
|
|
|
|
|
public static boolean publishEvent(final AbstractEvent event) {
|
|
|
|
|
try {
|
|
|
|
|
return publishEvent(event.getClass(), event);
|
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
@ -92,8 +92,8 @@ public class NotifyCenter {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
|
|
|
|
|
if (ClassUtil.isAssignableFrom(SlowEvent.class, eventType)) {
|
|
|
|
|
private static boolean publishEvent(final Class<? extends AbstractEvent> eventType, final AbstractEvent event) {
|
|
|
|
|
if (ClassUtil.isAssignableFrom(AbstractSlowEvent.class, eventType)) {
|
|
|
|
|
return INSTANCE.sharePublisher.publish(event);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -107,8 +107,8 @@ public class NotifyCenter {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
|
|
|
|
|
if (ClassUtil.isAssignableFrom(SlowEvent.class, eventType)) {
|
|
|
|
|
public static EventPublisher registerToPublisher(final Class<? extends AbstractEvent> eventType, final int queueMaxSize) {
|
|
|
|
|
if (ClassUtil.isAssignableFrom(AbstractSlowEvent.class, eventType)) {
|
|
|
|
|
return INSTANCE.sharePublisher;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|