Simplified operation batch logic

pull/205/head
M66B 3 years ago
parent d3048fe62a
commit bd98f62d7b

@ -178,7 +178,7 @@ class Core {
Context context, Context context,
EntityAccount account, EntityFolder folder, List<TupleOperationEx> ops, EntityAccount account, EntityFolder folder, List<TupleOperationEx> ops,
Store istore, Folder ifolder, Store istore, Folder ifolder,
State state, int priority, long sequence) State state, long serial)
throws JSONException { throws JSONException {
try { try {
Log.i(folder.name + " start process"); Log.i(folder.name + " start process");
@ -190,10 +190,10 @@ class Core {
int retry = 0; int retry = 0;
boolean group = true; boolean group = true;
Log.i(folder.name + " executing sequence=" + sequence + " operations=" + ops.size()); Log.i(folder.name + " executing serial=" + serial + " operations=" + ops.size());
while (retry < LOCAL_RETRY_MAX && ops.size() > 0 && while (retry < LOCAL_RETRY_MAX && ops.size() > 0 &&
state.isRunning() && state.isRunning() &&
state.batchCanRun(folder.id, priority, sequence)) { state.getSerial() == serial) {
TupleOperationEx op = ops.get(0); TupleOperationEx op = ops.get(0);
try { try {
@ -648,7 +648,7 @@ class Core {
retry++; retry++;
if (retry < LOCAL_RETRY_MAX && if (retry < LOCAL_RETRY_MAX &&
state.isRunning() && state.isRunning() &&
state.batchCanRun(folder.id, priority, sequence)) state.getSerial() == serial)
try { try {
Thread.sleep(LOCAL_RETRY_DELAY); Thread.sleep(LOCAL_RETRY_DELAY);
} catch (InterruptedException ex1) { } catch (InterruptedException ex1) {
@ -674,18 +674,8 @@ class Core {
} }
} }
if (ops.size() == 0) if (ops.size() != 0 && state.getSerial() == serial)
state.batchCompleted(folder.id, priority, sequence);
else {
if (state.batchCanRun(folder.id, priority, sequence))
state.error(new OperationCanceledException("Processing")); state.error(new OperationCanceledException("Processing"));
else {
if (state.isProcessing())
Log.e(folder.name + " cannot run" +
" sequence=" + sequence +
" batch=" + state.getBatch(folder.id, priority));
}
}
} finally { } finally {
Log.i(folder.name + " end process state=" + state + " pending=" + ops.size()); Log.i(folder.name + " end process state=" + state + " pending=" + ops.size());
} }
@ -5143,9 +5133,7 @@ class Core {
private Throwable unrecoverable = null; private Throwable unrecoverable = null;
private Long lastActivity = null; private Long lastActivity = null;
private boolean process = false; private long serial = 0;
private Map<FolderPriority, Long> sequence = new HashMap<>();
private Map<FolderPriority, Long> batch = new HashMap<>();
State(ConnectionHelper.NetworkState networkState) { State(ConnectionHelper.NetworkState networkState) {
this.networkState = networkState; this.networkState = networkState;
@ -5229,19 +5217,10 @@ class Core {
void reset() { void reset() {
recoverable = true; recoverable = true;
lastActivity = null; lastActivity = null;
resetBatches();
process = true;
} }
void resetBatches() { void nextSerial() {
process = false; serial++;
synchronized (this) {
for (FolderPriority key : sequence.keySet()) {
batch.put(key, sequence.get(key));
if (BuildConfig.DEBUG)
Log.i("=== Reset " + key.folder + ":" + key.priority + " batch=" + batch.get(key));
}
}
} }
private void yield() { private void yield() {
@ -5325,55 +5304,8 @@ class Core {
return (last == null ? 0 : SystemClock.elapsedRealtime() - last); return (last == null ? 0 : SystemClock.elapsedRealtime() - last);
} }
long getSequence(long folder, int priority) { long getSerial() {
synchronized (this) { return serial;
FolderPriority key = new FolderPriority(folder, priority);
if (!sequence.containsKey(key)) {
sequence.put(key, 0L);
batch.put(key, 0L);
}
long result = sequence.get(key);
sequence.put(key, result + 1);
if (BuildConfig.DEBUG)
Log.i("=== Get " + folder + ":" + priority + " sequence=" + result);
return result;
}
}
boolean batchCanRun(long folder, int priority, long current) {
if (!process) {
Log.i("=== Can " + folder + ":" + priority + " process=" + process);
return false;
}
synchronized (this) {
FolderPriority key = new FolderPriority(folder, priority);
boolean can = batch.get(key).equals(current);
if (!can || BuildConfig.DEBUG)
Log.i("=== Can " + folder + ":" + priority + " batch=" + batch.get(key) + " current=" + current + " can=" + can);
return can;
}
}
void batchCompleted(long folder, int priority, long current) {
synchronized (this) {
FolderPriority key = new FolderPriority(folder, priority);
if (batch.get(key).equals(current))
batch.put(key, batch.get(key) + 1);
if (BuildConfig.DEBUG)
Log.i("=== Completed " + folder + ":" + priority + " next=" + batch.get(key));
}
}
boolean isProcessing() {
return process;
}
Long getBatch(long folder, int priority) {
synchronized (this) {
FolderPriority key = new FolderPriority(folder, priority);
return batch.get(key);
}
} }
@NonNull @NonNull
@ -5382,7 +5314,7 @@ class Core {
return "[running=" + running + return "[running=" + running +
",recoverable=" + recoverable + ",recoverable=" + recoverable +
",idle=" + getIdleTime() + "" + ",idle=" + getIdleTime() + "" +
",process=" + process + "]"; ",serial=" + serial + "]";
} }
private static class FolderPriority { private static class FolderPriority {

@ -24,9 +24,7 @@ import static android.os.Process.THREAD_PRIORITY_BACKGROUND;
import android.app.AlarmManager; import android.app.AlarmManager;
import android.app.NotificationManager; import android.app.NotificationManager;
import android.app.PendingIntent; import android.app.PendingIntent;
import android.appwidget.AppWidgetManager;
import android.content.BroadcastReceiver; import android.content.BroadcastReceiver;
import android.content.ComponentName;
import android.content.Context; import android.content.Context;
import android.content.Intent; import android.content.Intent;
import android.content.IntentFilter; import android.content.IntentFilter;
@ -1861,14 +1859,14 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
" operations=" + partitions.get(key).size()); " operations=" + partitions.get(key).size());
} }
final long sequence = state.getSequence(folder.id, key.getPriority()); final long serial = state.getSerial();
Map<String, String> crumb = new HashMap<>(); Map<String, String> crumb = new HashMap<>();
crumb.put("account", folder.account == null ? null : Long.toString(folder.account)); crumb.put("account", folder.account == null ? null : Long.toString(folder.account));
crumb.put("folder", folder.name + "/" + folder.type + ":" + folder.id); crumb.put("folder", folder.name + "/" + folder.type + ":" + folder.id);
crumb.put("partition", key.toString()); crumb.put("partition", key.toString());
crumb.put("operations", Integer.toString(partitions.get(key).size())); crumb.put("operations", Integer.toString(partitions.get(key).size()));
crumb.put("sequence", Long.toString(sequence)); crumb.put("serial", Long.toString(serial));
Log.breadcrumb("Queuing", crumb); Log.breadcrumb("Queuing", crumb);
executor.submit(new Helper.PriorityRunnable(key.getPriority(), key.getOrder()) { executor.submit(new Helper.PriorityRunnable(key.getPriority(), key.getOrder()) {
@ -1886,7 +1884,7 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
Log.i(folder.name + Log.i(folder.name +
" executing partition=" + key + " executing partition=" + key +
" sequence=" + sequence + " serial=" + serial +
" operations=" + partition.size()); " operations=" + partition.size());
Map<String, String> crumb = new HashMap<>(); Map<String, String> crumb = new HashMap<>();
@ -1894,7 +1892,7 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
crumb.put("folder", folder.name + "/" + folder.type + ":" + folder.id); crumb.put("folder", folder.name + "/" + folder.type + ":" + folder.id);
crumb.put("partition", key.toString()); crumb.put("partition", key.toString());
crumb.put("operations", Integer.toString(partition.size())); crumb.put("operations", Integer.toString(partition.size()));
crumb.put("sequence", Long.toString(sequence)); crumb.put("serial", Long.toString(serial));
Log.breadcrumb("Executing", crumb); Log.breadcrumb("Executing", crumb);
// Get folder // Get folder
@ -1946,7 +1944,7 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
account, folder, account, folder,
partition, partition,
iservice.getStore(), ifolder, iservice.getStore(), ifolder,
state, key.getPriority(), sequence); state, serial);
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(folder.name, ex); Log.e(folder.name, ex);
@ -2186,7 +2184,7 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
// Stop executing operations // Stop executing operations
Log.i(account.name + " stop executing operations"); Log.i(account.name + " stop executing operations");
state.resetBatches(); state.nextSerial();
((ThreadPoolExecutor) executor).getQueue().clear(); ((ThreadPoolExecutor) executor).getQueue().clear();
// Close store // Close store

Loading…
Cancel
Save