|
|
@ -48,9 +48,6 @@ import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* Long polling service.
|
|
|
|
* Long polling service.
|
|
|
|
*
|
|
|
|
|
|
|
|
* @author chen.ma
|
|
|
|
|
|
|
|
* @date 2021/6/22 23:14
|
|
|
|
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Slf4j
|
|
|
|
@Slf4j
|
|
|
|
@Service
|
|
|
|
@Service
|
|
|
@ -70,11 +67,8 @@ public class LongPollingService {
|
|
|
|
|
|
|
|
|
|
|
|
public LongPollingService() {
|
|
|
|
public LongPollingService() {
|
|
|
|
allSubs = new ConcurrentLinkedQueue();
|
|
|
|
allSubs = new ConcurrentLinkedQueue();
|
|
|
|
|
|
|
|
|
|
|
|
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS);
|
|
|
|
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS);
|
|
|
|
|
|
|
|
|
|
|
|
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
|
|
|
|
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
|
|
|
|
|
|
|
|
|
|
|
|
NotifyCenter.registerSubscriber(new AbstractSubscriber() {
|
|
|
|
NotifyCenter.registerSubscriber(new AbstractSubscriber() {
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -122,13 +116,11 @@ public class LongPollingService {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
|
|
|
|
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
|
|
|
|
ClientLongPolling clientSub = iter.next();
|
|
|
|
ClientLongPolling clientSub = iter.next();
|
|
|
|
|
|
|
|
|
|
|
|
String identity = groupKey + GROUP_KEY_DELIMITER + identify;
|
|
|
|
String identity = groupKey + GROUP_KEY_DELIMITER + identify;
|
|
|
|
List<String> parseMapForFilter = Lists.newArrayList(identity);
|
|
|
|
List<String> parseMapForFilter = Lists.newArrayList(identity);
|
|
|
|
if (StrUtil.isBlank(identify)) {
|
|
|
|
if (StrUtil.isBlank(identify)) {
|
|
|
|
parseMapForFilter = MapUtil.parseMapForFilter(clientSub.clientMd5Map, groupKey);
|
|
|
|
parseMapForFilter = MapUtil.parseMapForFilter(clientSub.clientMd5Map, groupKey);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
parseMapForFilter.forEach(each -> {
|
|
|
|
parseMapForFilter.forEach(each -> {
|
|
|
|
if (clientSub.clientMd5Map.containsKey(each)) {
|
|
|
|
if (clientSub.clientMd5Map.containsKey(each)) {
|
|
|
|
getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis());
|
|
|
|
getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis());
|
|
|
@ -158,7 +150,6 @@ public class LongPollingService {
|
|
|
|
String appName = req.getHeader(CLIENT_APP_NAME_HEADER);
|
|
|
|
String appName = req.getHeader(CLIENT_APP_NAME_HEADER);
|
|
|
|
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
|
|
|
|
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
|
|
|
|
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
|
|
|
|
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
|
|
|
|
|
|
|
|
|
|
|
|
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
|
|
|
|
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
|
|
|
|
if (isFixedPolling()) {
|
|
|
|
if (isFixedPolling()) {
|
|
|
|
timeout = Math.max(10000, getFixedPollingInterval());
|
|
|
|
timeout = Math.max(10000, getFixedPollingInterval());
|
|
|
@ -172,12 +163,9 @@ public class LongPollingService {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
String clientIdentify = RequestUtil.getClientIdentify(req);
|
|
|
|
String clientIdentify = RequestUtil.getClientIdentify(req);
|
|
|
|
|
|
|
|
|
|
|
|
final AsyncContext asyncContext = req.startAsync();
|
|
|
|
final AsyncContext asyncContext = req.startAsync();
|
|
|
|
asyncContext.setTimeout(0L);
|
|
|
|
asyncContext.setTimeout(0L);
|
|
|
|
|
|
|
|
|
|
|
|
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize, timeout - delayTime, appName));
|
|
|
|
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize, timeout - delayTime, appName));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -218,7 +206,6 @@ public class LongPollingService {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
getRetainIps().put(ClientLongPolling.this.clientIdentify, System.currentTimeMillis());
|
|
|
|
getRetainIps().put(ClientLongPolling.this.clientIdentify, System.currentTimeMillis());
|
|
|
|
allSubs.remove(ClientLongPolling.this);
|
|
|
|
allSubs.remove(ClientLongPolling.this);
|
|
|
|
|
|
|
|
|
|
|
|
if (isFixedPolling()) {
|
|
|
|
if (isFixedPolling()) {
|
|
|
|
List<String> changedGroups = Md5ConfigUtil.compareMd5((HttpServletRequest) asyncContext.getRequest(), clientMd5Map);
|
|
|
|
List<String> changedGroups = Md5ConfigUtil.compareMd5((HttpServletRequest) asyncContext.getRequest(), clientMd5Map);
|
|
|
|
if (changedGroups.size() > 0) {
|
|
|
|
if (changedGroups.size() > 0) {
|
|
|
@ -232,9 +219,7 @@ public class LongPollingService {
|
|
|
|
} catch (Exception ex) {
|
|
|
|
} catch (Exception ex) {
|
|
|
|
log.error("Long polling error :: {}", ex.getMessage(), ex);
|
|
|
|
log.error("Long polling error :: {}", ex.getMessage(), ex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}, timeoutTime, TimeUnit.MILLISECONDS);
|
|
|
|
}, timeoutTime, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
|
|
allSubs.add(this);
|
|
|
|
allSubs.add(this);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -262,9 +247,7 @@ public class LongPollingService {
|
|
|
|
asyncContext.complete();
|
|
|
|
asyncContext.complete();
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
|
|
|
|
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
String respStr = buildRespStr(changedGroups);
|
|
|
|
String respStr = buildRespStr(changedGroups);
|
|
|
|
response.setHeader("Pragma", "no-cache");
|
|
|
|
response.setHeader("Pragma", "no-cache");
|
|
|
@ -278,7 +261,6 @@ public class LongPollingService {
|
|
|
|
asyncContext.complete();
|
|
|
|
asyncContext.complete();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public Map<String, Long> getRetainIps() {
|
|
|
|
public Map<String, Long> getRetainIps() {
|
|
|
@ -346,5 +328,4 @@ public class LongPollingService {
|
|
|
|
private static int getFixedPollingInterval() {
|
|
|
|
private static int getFixedPollingInterval() {
|
|
|
|
return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);
|
|
|
|
return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|