Observe messages to send

pull/156/head
M66B 5 years ago
parent 978843f134
commit 0654ed2eb0

@ -49,7 +49,7 @@ public interface DaoOperation {
" LEFT JOIN message ON message.id = operation.message" +
" LEFT JOIN account ON account.id = operation.account" +
" LEFT JOIN identity ON identity.id = message.identity" +
" WHERE operation.folder = :folder" +
" WHERE CASE WHEN :folder IS NULL THEN folder.account IS NULL ELSE operation.folder = :folder END" +
" AND (account.synchronize IS NULL OR account.synchronize)" +
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)" +
" ORDER BY" +
@ -60,10 +60,10 @@ public interface DaoOperation {
", id";
@Query(GET_OPS_FOLDER)
List<EntityOperation> getOperations(long folder);
List<EntityOperation> getOperations(Long folder);
@Query(GET_OPS_FOLDER)
LiveData<List<EntityOperation>> liveOperations(long folder);
LiveData<List<EntityOperation>> liveOperations(Long folder);
@Query("SELECT COUNT(operation.id) AS pending" +
", SUM(CASE WHEN operation.error IS NULL THEN 0 ELSE 1 END) AS errors" +

@ -47,6 +47,8 @@ import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.mail.Address;
import javax.mail.Message;
@ -62,8 +64,11 @@ import static android.os.Process.THREAD_PRIORITY_BACKGROUND;
public class ServiceSend extends LifecycleService {
private int lastUnsent = 0;
private boolean lastSuitable = false;
private TwoStateOwner cowner;
private static boolean booted = false;
private ExecutorService executor = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
private static final int IDENTITY_ERROR_AFTER = 30; // minutes
@ -72,20 +77,125 @@ public class ServiceSend extends LifecycleService {
Log.i("Service send create");
super.onCreate();
ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkRequest.Builder builder = new NetworkRequest.Builder();
builder.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET);
cm.registerNetworkCallback(builder.build(), networkCallback);
DB db = DB.getInstance(this);
cowner = new TwoStateOwner(ServiceSend.this, "send");
final DB db = DB.getInstance(this);
final PowerManager pm = (PowerManager) getSystemService(Context.POWER_SERVICE);
// Observe unsent count
db.operation().liveUnsent().observe(this, new Observer<Integer>() {
@Override
public void onChanged(Integer unsent) {
NotificationManager nm = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
nm.notify(Helper.NOTIFICATION_SEND, getNotificationService(unsent).build());
nm.notify(Helper.NOTIFICATION_SEND, getNotificationService(unsent, null).build());
}
});
// Observe send operations
db.operation().liveOperations(null).observe(cowner, new Observer<List<EntityOperation>>() {
private List<Long> handling = new ArrayList<>();
private PowerManager.WakeLock wlFolder = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":send");
@Override
public void onChanged(final List<EntityOperation> operations) {
boolean process = false;
List<Long> ops = new ArrayList<>();
for (EntityOperation op : operations) {
if (!handling.contains(op.id))
process = true;
ops.add(op.id);
}
handling = ops;
if (handling.size() > 0 && process) {
Log.i("OUTBOX operations=" + operations.size());
executor.submit(new Runnable() {
@Override
public void run() {
try {
wlFolder.acquire();
EntityFolder outbox = db.folder().getOutbox();
try {
db.folder().setFolderError(outbox.id, null);
db.folder().setFolderSyncState(outbox.id, "syncing");
List<EntityOperation> ops = db.operation().getOperations(outbox.id);
Log.i(outbox.name + " pending operations=" + ops.size());
for (EntityOperation op : ops) {
EntityMessage message = null;
try {
Log.i(outbox.name +
" start op=" + op.id + "/" + op.name +
" msg=" + op.message +
" args=" + op.args);
switch (op.name) {
case EntityOperation.SYNC:
db.folder().setFolderError(outbox.id, null);
break;
case EntityOperation.SEND:
message = db.message().getMessage(op.message);
if (message == null)
throw new MessageRemovedException();
send(message);
break;
default:
throw new IllegalArgumentException("Unknown operation=" + op.name);
}
db.operation().deleteOperation(op.id);
} catch (Throwable ex) {
Log.e(outbox.name, ex);
Core.reportError(ServiceSend.this, null, outbox, ex);
db.operation().setOperationError(op.id, Helper.formatThrowable(ex));
if (message != null)
db.message().setMessageError(message.id, Helper.formatThrowable(ex));
if (ex instanceof OutOfMemoryError ||
ex instanceof MessageRemovedException ||
ex instanceof SendFailedException ||
ex instanceof IllegalArgumentException) {
Log.w("Unrecoverable");
db.operation().deleteOperation(op.id);
continue;
} else
throw ex;
} finally {
Log.i(outbox.name + " end op=" + op.id + "/" + op.name);
}
if (!ConnectionHelper.getNetworkState(ServiceSend.this).isSuitable())
break;
}
} catch (Throwable ex) {
Log.e(outbox.name, ex);
db.folder().setFolderError(outbox.id, Helper.formatThrowable(ex, true));
} finally {
db.folder().setFolderState(outbox.id, null);
db.folder().setFolderSyncState(outbox.id, null);
}
} finally {
wlFolder.release();
}
}
});
}
if (operations.size() == 0)
stopSelf();
}
});
ConnectivityManager cm = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkRequest.Builder builder = new NetworkRequest.Builder();
builder.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET);
cm.registerNetworkCallback(builder.build(), networkCallback);
}
@Override
@ -105,16 +215,18 @@ public class ServiceSend extends LifecycleService {
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
startForeground(Helper.NOTIFICATION_SEND, getNotificationService(null).build());
startForeground(Helper.NOTIFICATION_SEND, getNotificationService(null, null).build());
super.onStartCommand(intent, flags, startId);
return START_STICKY;
}
NotificationCompat.Builder getNotificationService(Integer unsent) {
NotificationCompat.Builder getNotificationService(Integer unsent, Boolean suitable) {
if (unsent != null)
lastUnsent = unsent;
if (suitable != null)
lastSuitable = suitable;
// Build pending intent
Intent intent = new Intent(this, ActivityView.class);
@ -137,13 +249,13 @@ public class ServiceSend extends LifecycleService {
if (lastUnsent > 0)
builder.setContentText(getResources().getQuantityString(
R.plurals.title_notification_unsent, lastUnsent, lastUnsent));
if (!lastSuitable)
builder.setSubText(getString(R.string.title_notification_waiting));
return builder;
}
ConnectivityManager.NetworkCallback networkCallback = new ConnectivityManager.NetworkCallback() {
private Thread thread = null;
@Override
public void onAvailable(Network network) {
Log.i("Service send available=" + network);
@ -157,95 +269,16 @@ public class ServiceSend extends LifecycleService {
}
private void check() {
if (!ConnectionHelper.getNetworkState(ServiceSend.this).isSuitable())
return;
if (thread != null && thread.isAlive())
return;
thread = new Thread(new Runnable() {
@Override
public void run() {
PowerManager pm = (PowerManager) getSystemService(Context.POWER_SERVICE);
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":send");
try {
wl.acquire();
DB db = DB.getInstance(ServiceSend.this);
EntityFolder outbox = db.folder().getOutbox();
try {
db.folder().setFolderError(outbox.id, null);
db.folder().setFolderSyncState(outbox.id, "syncing");
List<EntityOperation> ops = db.operation().getOperations(outbox.id);
Log.i(outbox.name + " pending operations=" + ops.size());
for (EntityOperation op : ops) {
EntityMessage message = null;
try {
Log.i(outbox.name +
" start op=" + op.id + "/" + op.name +
" msg=" + op.message +
" args=" + op.args);
switch (op.name) {
case EntityOperation.SYNC:
db.folder().setFolderError(outbox.id, null);
break;
case EntityOperation.SEND:
message = db.message().getMessage(op.message);
if (message == null)
throw new MessageRemovedException();
send(message);
break;
boolean suitable = ConnectionHelper.getNetworkState(ServiceSend.this).isSuitable();
Log.i("OUTBOX suitable=" + suitable);
default:
throw new IllegalArgumentException("Unknown operation=" + op.name);
}
db.operation().deleteOperation(op.id);
} catch (Throwable ex) {
Log.e(outbox.name, ex);
Core.reportError(ServiceSend.this, null, outbox, ex);
db.operation().setOperationError(op.id, Helper.formatThrowable(ex));
if (message != null)
db.message().setMessageError(message.id, Helper.formatThrowable(ex));
if (ex instanceof OutOfMemoryError ||
ex instanceof MessageRemovedException ||
ex instanceof SendFailedException ||
ex instanceof IllegalArgumentException) {
Log.w("Unrecoverable");
db.operation().deleteOperation(op.id);
continue;
} else
throw ex;
} finally {
Log.i(outbox.name + " end op=" + op.id + "/" + op.name);
}
if (!ConnectionHelper.getNetworkState(ServiceSend.this).isSuitable())
break;
}
NotificationManager nm = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
nm.notify(Helper.NOTIFICATION_SEND, getNotificationService(null, suitable).build());
if (db.operation().getOperations(outbox.id).size() == 0)
stopSelf();
} catch (Throwable ex) {
Log.e(outbox.name, ex);
db.folder().setFolderError(outbox.id, Helper.formatThrowable(ex, true));
} finally {
db.folder().setFolderState(outbox.id, null);
db.folder().setFolderSyncState(outbox.id, null);
}
} finally {
wl.release();
}
}
}, "send:connectivity");
thread.setPriority(THREAD_PRIORITY_BACKGROUND);
thread.start();
if (suitable)
cowner.start();
else
cowner.stop();
}
};

@ -65,6 +65,7 @@
</plurals>
<string name="title_ask_spam_who">Report message from %1$s as spam?</string>
<string name="title_notification_waiting">Waiting for suitable connection</string>
<string name="title_notification_sending">Sending messages</string>
<string name="title_notification_failed">\'%1$s\' failed</string>

Loading…
Cancel
Save