Improved send logic

pull/182/head
M66B 5 years ago
parent d063ead7ee
commit 6867f9ab6b

@ -145,4 +145,7 @@ public interface DaoOperation {
@Query("DELETE FROM operation WHERE id = :id") @Query("DELETE FROM operation WHERE id = :id")
int deleteOperation(long id); int deleteOperation(long id);
@Query("DELETE FROM operation WHERE folder = :folder")
int deleteOperations(long folder);
} }

@ -482,4 +482,9 @@ public class EntityOperation {
} else } else
return false; return false;
} }
@Override
public String toString() {
return Long.toString(id);
}
} }

@ -69,12 +69,11 @@ public class ServiceSend extends ServiceBase {
private PowerManager.WakeLock wlOutbox; private PowerManager.WakeLock wlOutbox;
private TwoStateOwner owner = new TwoStateOwner("send"); private TwoStateOwner owner = new TwoStateOwner("send");
private List<Long> handling = new ArrayList<>();
private static ExecutorService executor = Helper.getBackgroundExecutor(1, "send"); private static ExecutorService executor = Helper.getBackgroundExecutor(1, "send");
private static final int PI_SEND = 1; private static final int PI_SEND = 1;
private static final long CONNECTIVITY_DELAY = 5000L; // milliseconds private static final long CONNECTIVITY_DELAY = 5000L; // milliseconds
private static final int IDENTITY_ERROR_AFTER = 30; // minutes
private static final int RETRY_MAX = 3; private static final int RETRY_MAX = 3;
@Override @Override
@ -106,33 +105,42 @@ public class ServiceSend extends ServiceBase {
// Observe send operations // Observe send operations
db.operation().liveOperations(null).observe(owner, new Observer<List<TupleOperationEx>>() { db.operation().liveOperations(null).observe(owner, new Observer<List<TupleOperationEx>>() {
private List<Long> handling = new ArrayList<>();
@Override @Override
public void onChanged(final List<TupleOperationEx> operations) { public void onChanged(List<TupleOperationEx> operations) {
boolean process = false; if (operations == null)
operations = new ArrayList<>();
if (operations.size() == 0)
stopSelf();
final List<TupleOperationEx> process = new ArrayList<>();
List<Long> ops = new ArrayList<>(); List<Long> ops = new ArrayList<>();
for (EntityOperation op : operations) { for (TupleOperationEx op : operations) {
if (!handling.contains(op.id)) if (!handling.contains(op.id))
process = true; process.add(op);
ops.add(op.id); ops.add(op.id);
} }
handling = ops; handling = ops;
if (process) { if (process.size() > 0) {
Log.i("OUTBOX operations=" + operations.size()); Log.i("OUTBOX process=" + TextUtils.join(",", process) +
" handling=" + TextUtils.join(",", handling));
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
processOperations(); processOperations(process);
} }
}); });
} }
} }
}); });
lastSuitable = ConnectionHelper.getNetworkState(this).isSuitable();
if (lastSuitable)
owner.start();
ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE); ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkRequest.Builder builder = new NetworkRequest.Builder(); NetworkRequest.Builder builder = new NetworkRequest.Builder();
builder.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET); builder.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET);
@ -153,6 +161,9 @@ public class ServiceSend extends ServiceBase {
ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE); ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE);
cm.unregisterNetworkCallback(networkCallback); cm.unregisterNetworkCallback(networkCallback);
owner.stop();
handling.clear();
stopForeground(true); stopForeground(true);
NotificationManager nm = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE); NotificationManager nm = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
@ -252,36 +263,29 @@ public class ServiceSend extends ServiceBase {
if (suitable) if (suitable)
owner.start(); owner.start();
else else {
owner.stop(); owner.stop();
handling.clear();
} }
if (suitable)
executor.submit(new Runnable() {
@Override
public void run() {
processOperations();
} }
});
} }
private void processOperations() { private void processOperations(List<TupleOperationEx> ops) {
try { try {
wlOutbox.acquire(); wlOutbox.acquire();
if (!ConnectionHelper.getNetworkState(this).isSuitable())
return;
DB db = DB.getInstance(this); DB db = DB.getInstance(this);
EntityFolder outbox = db.folder().getOutbox(); EntityFolder outbox = db.folder().getOutbox();
try { try {
db.folder().setFolderError(outbox.id, null); db.folder().setFolderError(outbox.id, null);
db.folder().setFolderSyncState(outbox.id, "syncing"); db.folder().setFolderSyncState(outbox.id, "syncing");
List<TupleOperationEx> ops = db.operation().getOperations(outbox.id); Log.i(outbox.name + " processing operations=" + ops.size());
Log.i(outbox.name + " pending operations=" + ops.size());
while (ops.size() > 0) { while (ops.size() > 0) {
if (!ConnectionHelper.getNetworkState(this).isSuitable())
break;
TupleOperationEx op = ops.get(0); TupleOperationEx op = ops.get(0);
EntityMessage message = null; EntityMessage message = null;
@ -292,6 +296,7 @@ public class ServiceSend extends ServiceBase {
Log.i(outbox.name + Log.i(outbox.name +
" start op=" + op.id + "/" + op.name + " start op=" + op.id + "/" + op.name +
" msg=" + op.message + " msg=" + op.message +
" tries=" + op.tries +
" args=" + op.args); " args=" + op.args);
db.operation().setOperationTries(op.id, ++op.tries); db.operation().setOperationTries(op.id, ++op.tries);
@ -364,41 +369,14 @@ public class ServiceSend extends ServiceBase {
} }
continue; continue;
} else { } else
if (message != null) {
String title = MessageHelper.formatAddresses(message.to);
PendingIntent pi = getPendingIntent(this);
EntityLog.log(this, title + " last attempt: " + new Date(message.last_attempt));
long now = new Date().getTime();
long delayed = now - message.last_attempt;
if (delayed > IDENTITY_ERROR_AFTER * 60 * 1000L) {
Log.i("Reporting send error after=" + delayed);
try {
NotificationManager nm = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
nm.notify("send:" + message.id, 1,
Core.getNotificationError(this, "warning", title, ex, pi).build());
} catch (Throwable ex1) {
Log.w(ex1);
}
}
}
throw ex; throw ex;
}
} finally { } finally {
Log.i(outbox.name + " end op=" + op.id + "/" + op.name); Log.i(outbox.name + " end op=" + op.id + "/" + op.name);
db.operation().setOperationState(op.id, null); db.operation().setOperationState(op.id, null);
} }
if (!ConnectionHelper.getNetworkState(this).isSuitable())
break;
} }
if (db.operation().getOperations(outbox.id).size() == 0)
stopSelf();
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(outbox.name, ex); Log.e(outbox.name, ex);
db.folder().setFolderError(outbox.id, Log.formatThrowable(ex)); db.folder().setFolderError(outbox.id, Log.formatThrowable(ex));
@ -414,20 +392,27 @@ public class ServiceSend extends ServiceBase {
private void onSync(EntityFolder outbox) { private void onSync(EntityFolder outbox) {
DB db = DB.getInstance(this); DB db = DB.getInstance(this);
try {
db.beginTransaction();
db.folder().setFolderError(outbox.id, null); db.folder().setFolderError(outbox.id, null);
// Restore snooze timers // Delete pending operations
for (EntityMessage message : db.message().getSnoozed(outbox.id)) db.operation().deleteOperations(outbox.id);
EntityMessage.snooze(this, message.id, message.ui_snoozed);
// Retry failed message // Requeue operations
for (long id : db.message().getMessageByFolder(outbox.id)) { for (long id : db.message().getMessageByFolder(outbox.id)) {
int ops = db.operation().getOperationCount(outbox.id, id, EntityOperation.SEND);
if (ops == 0) {
EntityMessage message = db.message().getMessage(id); EntityMessage message = db.message().getMessage(id);
if (message != null && message.ui_snoozed == null) if (message != null)
if (message.ui_snoozed == null)
EntityOperation.queue(this, message, EntityOperation.SEND); EntityOperation.queue(this, message, EntityOperation.SEND);
else
EntityMessage.snooze(this, message.id, message.ui_snoozed);
} }
db.setTransactionSuccessful();
} finally {
db.endTransaction();
} }
} }
@ -558,8 +543,6 @@ public class ServiceSend extends ServiceBase {
// Send message // Send message
EntityLog.log(this, "Sending " + via); EntityLog.log(this, "Sending " + via);
if (BuildConfig.DEBUG && false)
throw new SendFailedException("Test");
iservice.getTransport().sendMessage(imessage, to); iservice.getTransport().sendMessage(imessage, to);
long time = new Date().getTime(); long time = new Date().getTime();
EntityLog.log(this, "Sent " + via); EntityLog.log(this, "Sent " + via);

Loading…
Cancel
Save