From 62e2e269950e6c7f014eb87d0c0ee1b70e4b9f86 Mon Sep 17 00:00:00 2001 From: hailang Date: Fri, 25 Feb 2022 09:51:02 +0800 Subject: [PATCH] add MemoryLimitedLinkedBlockingQueue --- hippo4j-core/pom.xml | 4 + .../MemoryLimitedLinkedBlockingQueue.java | 322 ++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/MemoryLimitedLinkedBlockingQueue.java diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml index 7b146444..130bee48 100644 --- a/hippo4j-core/pom.xml +++ b/hippo4j-core/pom.xml @@ -22,6 +22,10 @@ cn.hippo4j hippo4j-common + + net.bytebuddy + byte-buddy-agent + diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/MemoryLimitedLinkedBlockingQueue.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/MemoryLimitedLinkedBlockingQueue.java new file mode 100644 index 00000000..3e893c67 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/MemoryLimitedLinkedBlockingQueue.java @@ -0,0 +1,322 @@ +package cn.hippo4j.core.executor.support; + +import net.bytebuddy.agent.ByteBuddyAgent; + +import java.lang.instrument.Instrumentation; +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Can completely solve the OOM problem caused by {@link LinkedBlockingQueue}. + * + * @author ZhangZiCheng + * @date 2022/2/24 20:47 + */ +public class MemoryLimitedLinkedBlockingQueue extends LinkedBlockingQueue { + + private static final long serialVersionUID = 9066384065603805582L; + + private final Instrumentation inst; + + private long memoryLimit; + + private final LongAdder memory = new LongAdder(); + + private final ReentrantLock acquireLock = new ReentrantLock(); + + private final Condition notLimited = acquireLock.newCondition(); + + private final ReentrantLock releaseLock = new ReentrantLock(); + + private final Condition notEmpty = releaseLock.newCondition(); + + static { + //init ByteBuddyAgent + ByteBuddyAgent.install(); + } + + public MemoryLimitedLinkedBlockingQueue() { + this(Integer.MAX_VALUE); + } + + public MemoryLimitedLinkedBlockingQueue(long memoryLimit) { + super(Integer.MAX_VALUE); + this.memoryLimit = memoryLimit; + this.inst = ByteBuddyAgent.getInstrumentation(); + } + + public MemoryLimitedLinkedBlockingQueue(Collection c, long memoryLimit) { + super(c); + this.memoryLimit = memoryLimit; + this.inst = ByteBuddyAgent.getInstrumentation(); + } + + public void setMemoryLimit(long memoryLimit) { + if (memoryLimit <= 0) { + throw new IllegalArgumentException(); + } + this.memoryLimit = memoryLimit; + } + + public long getMemoryLimit() { + return memoryLimit; + } + + public long getCurrentMemory() { + return memory.sum(); + } + + public long getCurrentRemainMemory() { + return getMemoryLimit() - getCurrentMemory(); + } + + /** + * Signals a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void signalNotEmpty() { + releaseLock.lock(); + try { + notEmpty.signal(); + } finally { + releaseLock.unlock(); + } + } + + /** + * Signals a waiting put. Called only from take/poll. + */ + private void signalNotLimited() { + acquireLock.lock(); + try { + notLimited.signal(); + } finally { + acquireLock.unlock(); + } + } + + /** + * Locks to prevent both puts and takes. + */ + private void fullyLock() { + acquireLock.lock(); + releaseLock.lock(); + } + + /** + * Unlocks to allow both puts and takes. + */ + private void fullyUnlock() { + releaseLock.unlock(); + acquireLock.unlock(); + } + + @Override + public void put(E e) throws InterruptedException { + acquireInterruptibly(e); + super.put(e); + } + + private void acquireInterruptibly(E e) throws InterruptedException { + if (e == null) { + throw new NullPointerException(); + } + final long objectSize = inst.getObjectSize(e); + acquireLock.lockInterruptibly(); + try { + while (memory.sum() + objectSize >= memoryLimit) { + notLimited.await(); + } + memory.add(objectSize); + if (memory.sum() < memoryLimit) { + notLimited.signal(); + } + } finally { + acquireLock.unlock(); + } + if (memory.sum() > 0) { + signalNotEmpty(); + } + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + return acquire(e, timeout, unit) && super.offer(e, timeout, unit); + } + + private boolean acquire(E e, long timeout, TimeUnit unit) throws InterruptedException { + if (e == null) { + throw new NullPointerException(); + } + long nanos = unit.toNanos(timeout); + final long objectSize = inst.getObjectSize(e); + acquireLock.lockInterruptibly(); + try { + while (memory.sum() + objectSize >= memoryLimit) { + if (nanos <= 0) { + return false; + } + nanos = notLimited.awaitNanos(nanos); + } + memory.add(objectSize); + if (memory.sum() < memoryLimit) { + notLimited.signal(); + } + } finally { + acquireLock.unlock(); + } + if (memory.sum() > 0) { + signalNotEmpty(); + } + return true; + } + + @Override + public boolean offer(E e) { + return acquire(e) && super.offer(e); + } + + private boolean acquire(E e) { + if (e == null) { + throw new NullPointerException(); + } + if (memory.sum() >= memoryLimit) { + return false; + } + acquireLock.lock(); + try { + final long objectSize = inst.getObjectSize(e); + if (memory.sum() + objectSize < memoryLimit) { + memory.add(objectSize); + if (memory.sum() < memoryLimit) { + notLimited.signal(); + } + } + } finally { + acquireLock.unlock(); + } + if (memory.sum() > 0) { + signalNotEmpty(); + } + return true; + } + + @Override + public E take() throws InterruptedException { + final E e = super.take(); + releaseInterruptibly(e); + return e; + } + + private void releaseInterruptibly(E e) throws InterruptedException { + if (null == e) { + return; + } + final long objectSize = inst.getObjectSize(e); + releaseLock.lockInterruptibly(); + try { + while (memory.sum() == 0) { + notEmpty.await(); + } + memory.add(-objectSize); + if (memory.sum() > 0) { + notEmpty.signal(); + } + } finally { + releaseLock.unlock(); + } + if (memory.sum() < memoryLimit) { + signalNotLimited(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + final E e = super.poll(timeout, unit); + releaseInterruptibly(e, timeout, unit); + return e; + } + + private void releaseInterruptibly(E e, long timeout, TimeUnit unit) throws InterruptedException { + if (null == e) { + return; + } + final long objectSize = inst.getObjectSize(e); + long nanos = unit.toNanos(timeout); + releaseLock.lockInterruptibly(); + try { + while (memory.sum() == 0) { + if (nanos <= 0) { + return; + } + nanos = notEmpty.awaitNanos(nanos); + } + memory.add(-objectSize); + if (memory.sum() > 0) { + notEmpty.signal(); + } + } finally { + releaseLock.unlock(); + } + if (memory.sum() < memoryLimit) { + signalNotLimited(); + } + } + + @Override + public E poll() { + final E e = super.poll(); + release(e); + return e; + } + + private void release(Object e) { + if (null == e) { + return; + } + if (memory.sum() == 0) { + return; + } + final long objectSize = inst.getObjectSize(e); + releaseLock.lock(); + try { + if (memory.sum() > 0) { + memory.add(-objectSize); + if (memory.sum() > 0) { + notEmpty.signal(); + } + } + } finally { + releaseLock.unlock(); + } + if (memory.sum() < memoryLimit) { + signalNotLimited(); + } + } + + @Override + public boolean remove(Object o) { + final boolean success = super.remove(o); + if (success) { + release(o); + } + return success; + } + + @Override + public void clear() { + super.clear(); + fullyLock(); + try { + if (memory.sumThenReset() < memoryLimit) { + notLimited.signal(); + } + } finally { + fullyUnlock(); + } + } +}