Retry failed operations

pull/174/head
M66B 6 years ago
parent 2274d4870e
commit 904b5a3920

@ -126,33 +126,33 @@ class Core {
private static final int DOWNLOAD_BATCH_SIZE = 20; private static final int DOWNLOAD_BATCH_SIZE = 20;
private static final long YIELD_DURATION = 200L; // milliseconds private static final long YIELD_DURATION = 200L; // milliseconds
private static final long FUTURE_RECEIVED = 30 * 24 * 3600 * 1000L; // milliseconds private static final long FUTURE_RECEIVED = 30 * 24 * 3600 * 1000L; // milliseconds
private static final int OPERATION_RETRY_MAX = 10;
private static final long OPERATION_RETRY_DELAY = 15 * 1000L; // milliseconds
static void processOperations( static void processOperations(
Context context, Context context,
EntityAccount account, EntityFolder folder, List<TupleOperationEx> ops, EntityAccount account, EntityFolder folder, List<TupleOperationEx> ops,
Store istore, Folder ifolder, Store istore, Folder ifolder,
State state) State state)
throws MessagingException, JSONException, IOException { throws JSONException {
try { try {
Log.i(folder.name + " start process"); Log.i(folder.name + " start process");
DB db = DB.getInstance(context); DB db = DB.getInstance(context);
List<Long> processed = new ArrayList<>(); int retry = 0;
boolean group = true;
Log.i(folder.name + " executing operations=" + ops.size()); Log.i(folder.name + " executing operations=" + ops.size());
for (int i = 0; i < ops.size() && state.isRunning() && state.isRecoverable(); i++) { while (retry < OPERATION_RETRY_MAX && ops.size() > 0 && state.isRunning() && state.isRecoverable()) {
EntityOperation op = ops.get(i); TupleOperationEx op = ops.get(0);
if (processed.contains(op.id)) {
Log.i(folder.name + " already processed op=" + op.id + "/" + op.name);
continue;
}
try { try {
Log.i(folder.name + Log.i(folder.name +
" start op=" + op.id + "/" + op.name + " start op=" + op.id + "/" + op.name +
" folder=" + op.folder + " folder=" + op.folder +
" msg=" + op.message + " msg=" + op.message +
" args=" + op.args); " args=" + op.args +
" retry=" + retry);
// Fetch most recent copy of message // Fetch most recent copy of message
EntityMessage message = null; EntityMessage message = null;
@ -160,7 +160,7 @@ class Core {
message = db.message().getMessage(op.message); message = db.message().getMessage(op.message);
JSONArray jargs = new JSONArray(op.args); JSONArray jargs = new JSONArray(op.args);
Map<EntityOperation, EntityMessage> similar = new HashMap<>(); Map<TupleOperationEx, EntityMessage> similar = new HashMap<>();
try { try {
// Operations should use database transaction when needed // Operations should use database transaction when needed
@ -173,8 +173,8 @@ class Core {
// Process similar operations // Process similar operations
boolean skip = false; boolean skip = false;
for (int j = i + 1; j < ops.size(); j++) { for (int j = 1; j < ops.size(); j++) {
EntityOperation next = ops.get(j); TupleOperationEx next = ops.get(j);
switch (op.name) { switch (op.name) {
case EntityOperation.ADD: case EntityOperation.ADD:
@ -195,16 +195,15 @@ class Core {
break; break;
case EntityOperation.MOVE: case EntityOperation.MOVE:
if (message.uid != null && if (group &&
message.uid != null &&
EntityOperation.MOVE.equals(next.name)) { EntityOperation.MOVE.equals(next.name)) {
JSONArray jnext = new JSONArray(next.args); JSONArray jnext = new JSONArray(next.args);
// Same target // Same target
if (jargs.getLong(0) == jnext.getLong(0)) { if (jargs.getLong(0) == jnext.getLong(0)) {
EntityMessage m = db.message().getMessage(next.message); EntityMessage m = db.message().getMessage(next.message);
if (m != null && m.uid != null) { if (m != null && m.uid != null)
processed.add(next.id);
similar.put(next, m); similar.put(next, m);
}
} }
} }
break; break;
@ -216,11 +215,12 @@ class Core {
" skipping op=" + op.id + "/" + op.name + " skipping op=" + op.id + "/" + op.name +
" msg=" + op.message + " args=" + op.args); " msg=" + op.message + " args=" + op.args);
db.operation().deleteOperation(op.id); db.operation().deleteOperation(op.id);
ops.remove(op);
continue; continue;
} }
List<Long> sids = new ArrayList<>(); List<Long> sids = new ArrayList<>();
for (EntityOperation s : similar.keySet()) for (TupleOperationEx s : similar.keySet())
sids.add(s.id); sids.add(s.id);
if (similar.size() > 0) if (similar.size() > 0)
@ -242,7 +242,7 @@ class Core {
db.beginTransaction(); db.beginTransaction();
db.operation().setOperationError(op.id, null); db.operation().setOperationError(op.id, null);
for (EntityOperation s : similar.keySet()) for (TupleOperationEx s : similar.keySet())
db.operation().setOperationError(s.id, null); db.operation().setOperationError(s.id, null);
if (message != null) { if (message != null) {
@ -253,7 +253,7 @@ class Core {
if (!EntityOperation.SYNC.equals(op.name)) { if (!EntityOperation.SYNC.equals(op.name)) {
db.operation().setOperationState(op.id, "executing"); db.operation().setOperationState(op.id, "executing");
for (EntityOperation s : similar.keySet()) for (TupleOperationEx s : similar.keySet())
db.operation().setOperationState(s.id, "executing"); db.operation().setOperationState(s.id, "executing");
} }
@ -374,13 +374,17 @@ class Core {
db.beginTransaction(); db.beginTransaction();
db.operation().deleteOperation(op.id); db.operation().deleteOperation(op.id);
for (EntityOperation s : similar.keySet()) for (TupleOperationEx s : similar.keySet())
db.operation().deleteOperation(s.id); db.operation().deleteOperation(s.id);
db.setTransactionSuccessful(); db.setTransactionSuccessful();
} finally { } finally {
db.endTransaction(); db.endTransaction();
} }
ops.remove(op);
for (TupleOperationEx s : similar.keySet())
ops.remove(s);
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(folder.name, ex); Log.e(folder.name, ex);
EntityLog.log(context, folder.name + " " + Log.formatThrowable(ex, false)); EntityLog.log(context, folder.name + " " + Log.formatThrowable(ex, false));
@ -389,7 +393,7 @@ class Core {
db.beginTransaction(); db.beginTransaction();
db.operation().setOperationError(op.id, Log.formatThrowable(ex)); db.operation().setOperationError(op.id, Log.formatThrowable(ex));
for (EntityOperation s : similar.keySet()) for (TupleOperationEx s : similar.keySet())
db.operation().setOperationError(s.id, Log.formatThrowable(ex)); db.operation().setOperationError(s.id, Log.formatThrowable(ex));
if (message != null && !(ex instanceof IllegalArgumentException)) { if (message != null && !(ex instanceof IllegalArgumentException)) {
@ -406,6 +410,12 @@ class Core {
db.endTransaction(); db.endTransaction();
} }
if (similar.size() > 0) {
// Retry individually
group = false;
continue;
}
if (ex instanceof OutOfMemoryError || if (ex instanceof OutOfMemoryError ||
ex instanceof MessageRemovedException || ex instanceof MessageRemovedException ||
ex instanceof MessageRemovedIOException || ex instanceof MessageRemovedIOException ||
@ -433,8 +443,6 @@ class Core {
// There is no use in repeating // There is no use in repeating
db.operation().deleteOperation(op.id); db.operation().deleteOperation(op.id);
for (EntityOperation s : similar.keySet())
db.operation().deleteOperation(s.id);
// Cleanup folder // Cleanup folder
if (EntityOperation.SYNC.equals(op.name)) if (EntityOperation.SYNC.equals(op.name))
@ -445,34 +453,32 @@ class Core {
(ex instanceof MessageRemovedException || (ex instanceof MessageRemovedException ||
ex instanceof MessageRemovedIOException || ex instanceof MessageRemovedIOException ||
ex.getCause() instanceof MessageRemovedException || ex.getCause() instanceof MessageRemovedException ||
ex.getCause() instanceof MessageRemovedIOException)) { ex.getCause() instanceof MessageRemovedIOException))
// Failsafe: retry batch
if (similar.size() > 0)
throw ex;
db.message().deleteMessage(message.id); db.message().deleteMessage(message.id);
}
// Cleanup operations // Cleanup operations
op.cleanup(context); op.cleanup(context);
for (EntityOperation s : similar.keySet())
s.cleanup(context);
db.setTransactionSuccessful(); db.setTransactionSuccessful();
} finally { } finally {
db.endTransaction(); db.endTransaction();
} }
continue; ops.remove(op);
} else {
retry++;
try {
Thread.sleep(OPERATION_RETRY_DELAY);
} catch (InterruptedException ex1) {
Log.w(ex1);
}
} }
throw ex;
} finally { } finally {
try { try {
db.beginTransaction(); db.beginTransaction();
db.operation().setOperationState(op.id, null); db.operation().setOperationState(op.id, null);
for (EntityOperation s : similar.keySet()) for (TupleOperationEx s : similar.keySet())
db.operation().setOperationState(s.id, null); db.operation().setOperationState(s.id, null);
db.setTransactionSuccessful(); db.setTransactionSuccessful();
@ -484,6 +490,10 @@ class Core {
Log.i(folder.name + " end op=" + op.id + "/" + op.name); Log.i(folder.name + " end op=" + op.id + "/" + op.name);
} }
} }
if (ops.size() > 0)
Log.w("Operations failed=" + ops.size());
} finally { } finally {
Log.i(folder.name + " end process state=" + state); Log.i(folder.name + " end process state=" + state);
} }

Loading…
Cancel
Save