Fixed skipping invalidations

pull/210/head
M66B 2 years ago
parent 4ddd93f595
commit e0011221cf

@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* A LiveData implementation that closely works with {@link InvalidationTracker} to implement * A LiveData implementation that closely works with {@link InvalidationTracker} to implement
@ -59,11 +60,8 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
final InvalidationTracker.Observer mObserver; final InvalidationTracker.Observer mObserver;
@SuppressWarnings("WeakerAccess") final AtomicInteger queued = new AtomicInteger(0);
final AtomicBoolean mInvalid = new AtomicBoolean(true); final AtomicInteger running = new AtomicInteger(0);
@SuppressWarnings("WeakerAccess")
final AtomicBoolean mComputing = new AtomicBoolean(false);
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
final AtomicBoolean mRegisteredObserver = new AtomicBoolean(false); final AtomicBoolean mRegisteredObserver = new AtomicBoolean(false);
@ -76,49 +74,37 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
if (mRegisteredObserver.compareAndSet(false, true)) { if (mRegisteredObserver.compareAndSet(false, true)) {
mDatabase.getInvalidationTracker().addWeakObserver(mObserver); mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
} }
boolean computed; try {
do { running.incrementAndGet();
computed = false;
// compute can happen only in 1 thread but no reason to lock others. T value = null;
if (mComputing.compareAndSet(false, true)) { boolean computed = false;
// as long as it is invalid, keep computing. synchronized (mComputeFunction) {
try { int retry = 0;
T value = null; while (!computed) {
while (mInvalid.compareAndSet(true, false)) { try {
int retry = 0; value = mComputeFunction.call();
while (!computed) { computed = true;
try { } catch (Throwable e) {
value = mComputeFunction.call(); if (++retry > 5) {
computed = true; eu.faircode.email.Log.e(e);
} catch (Throwable e) { break;
if (++retry > 5) { }
eu.faircode.email.Log.e(e); eu.faircode.email.Log.w(e);
break; try {
} Thread.sleep(2000L);
eu.faircode.email.Log.w(e); } catch (InterruptedException ignored) {
try {
Thread.sleep(2000L);
} catch (InterruptedException ignored) {
}
}
} }
} }
if (computed) {
postValue(value);
}
} finally {
// release compute lock
mComputing.set(false);
} }
} }
// check invalid after releasing compute lock to avoid the following scenario. if (computed) {
// Thread A runs compute() postValue(value);
// Thread A checks invalid, it is false }
// Main thread sets invalid to true } finally {
// Thread B runs, fails to acquire compute lock and skips queued.decrementAndGet();
// Thread A releases compute lock running.decrementAndGet();
// We've left invalid in set state. The check below recovers. }
} while (computed && mInvalid.get());
} }
}; };
@ -127,14 +113,19 @@ class RoomTrackingLiveData<T> extends LiveData<T> {
@MainThread @MainThread
@Override @Override
public void run() { public void run() {
if (running.get() == 0 && queued.get() > 0) {
eu.faircode.email.Log.i(mComputeFunction +
" running=" + running + " queued=" + queued);
return;
}
boolean isActive = hasActiveObservers(); boolean isActive = hasActiveObservers();
if (mInvalid.compareAndSet(false, true)) { if (isActive) {
if (isActive) { queued.incrementAndGet();
getQueryExecutor().execute(mRefreshRunnable); getQueryExecutor().execute(mRefreshRunnable);
}
} }
} }
}; };
@SuppressLint("RestrictedApi") @SuppressLint("RestrictedApi")
RoomTrackingLiveData( RoomTrackingLiveData(
RoomDatabase database, RoomDatabase database,

Loading…
Cancel
Save