From a26eed0f1ca1865d8d9e18f6781ddc12a33cad5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=A9=AC=E5=93=A5?= Date: Thu, 11 Aug 2022 22:06:39 +0800 Subject: [PATCH] Core code refactoring (#528) --- .../core/executor/support/QueueTypeEnum.java | 1 - .../executor/support/RejectedPolicies.java | 65 ------------------- .../executor/support/RejectedTypeEnum.java | 4 +- .../support/RunsOldestTaskPolicy.java | 44 +++++++++++++ .../executor/support/SyncPutQueuePolicy.java | 42 ++++++++++++ 5 files changed, 88 insertions(+), 68 deletions(-) delete mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java index f605d6ea..283d4618 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java @@ -105,7 +105,6 @@ public enum QueueTypeEnum { if (capacity == null || capacity <= 0) { temCapacity = 1024; } - return new LinkedBlockingQueue(temCapacity); })); return blockingQueue; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java deleted file mode 100644 index 931d06da..00000000 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.core.executor.support; - -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * Rejected policies. - */ -@Slf4j -public class RejectedPolicies { - - public static class RunsOldestTaskPolicy implements RejectedExecutionHandler { - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - if (executor.isShutdown()) { - return; - } - BlockingQueue workQueue = executor.getQueue(); - Runnable firstWork = workQueue.poll(); - boolean newTaskAdd = workQueue.offer(r); - if (firstWork != null) { - firstWork.run(); - } - if (!newTaskAdd) { - executor.execute(r); - } - } - } - - public static class SyncPutQueuePolicy implements RejectedExecutionHandler { - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - if (executor.isShutdown()) { - return; - } - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - log.error("Adding Queue task to thread pool failed.", e); - } - } - } -} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java index ac8e8e43..5dd8d61c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java @@ -40,9 +40,9 @@ public enum RejectedTypeEnum { DISCARD_OLDEST_POLICY(4, "DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy()), - RUNS_OLDEST_TASK_POLICY(5, "RunsOldestTaskPolicy", new RejectedPolicies.RunsOldestTaskPolicy()), + RUNS_OLDEST_TASK_POLICY(5, "RunsOldestTaskPolicy", new RunsOldestTaskPolicy()), - SYNC_PUT_QUEUE_POLICY(6, "SyncPutQueuePolicy", new RejectedPolicies.SyncPutQueuePolicy()); + SYNC_PUT_QUEUE_POLICY(6, "SyncPutQueuePolicy", new SyncPutQueuePolicy()); public Integer type; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java new file mode 100644 index 00000000..0e115865 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Run the oldest task policy. + */ +public class RunsOldestTaskPolicy implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + return; + } + BlockingQueue workQueue = executor.getQueue(); + Runnable firstWork = workQueue.poll(); + boolean newTaskAdd = workQueue.offer(r); + if (firstWork != null) { + firstWork.run(); + } + if (!newTaskAdd) { + executor.execute(r); + } + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java new file mode 100644 index 00000000..ef803bee --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Synchronous put queue policy. + */ +@Slf4j +public class SyncPutQueuePolicy implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + return; + } + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + log.error("Adding Queue task to thread pool failed.", e); + } + } +}