Monitor operations per account

pull/187/head
M66B 5 years ago
parent f7e4a6236c
commit 29bad58778

@ -48,57 +48,47 @@ public interface DaoOperation {
@Query("SELECT operation.*" + @Query("SELECT operation.*" +
", " + priority + " AS priority" + ", " + priority + " AS priority" +
", account.name AS accountName, folder.name AS folderName" + ", account.name AS accountName, folder.name AS folderName" +
" ,((account.synchronize IS NULL OR account.synchronize)" + ", (account.synchronize IS NULL OR account.synchronize) AS synchronize" +
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)) AS synchronize" +
" FROM operation" + " FROM operation" +
" JOIN folder ON folder.id = operation.folder" + " JOIN folder ON folder.id = operation.folder" +
" LEFT JOIN message ON message.id = operation.message" +
" LEFT JOIN account ON account.id = operation.account" + " LEFT JOIN account ON account.id = operation.account" +
" LEFT JOIN identity ON identity.id = message.identity" +
" ORDER BY " + priority + ", id") " ORDER BY " + priority + ", id")
LiveData<List<TupleOperationEx>> liveOperations(); LiveData<List<TupleOperationEx>> liveOperations();
String GET_OPS_FOLDER = "SELECT operation.*" + @Transaction
@Query("SELECT operation.*" +
", " + priority + " AS priority" + ", " + priority + " AS priority" +
", account.name AS accountName, folder.name AS folderName" + ", account.name AS accountName, folder.name AS folderName" +
" ,((account.synchronize IS NULL OR account.synchronize)" + ", account.synchronize" +
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)) AS synchronize" +
" FROM operation" + " FROM operation" +
" JOIN folder ON folder.id = operation.folder" + " JOIN folder ON folder.id = operation.folder" +
" LEFT JOIN message ON message.id = operation.message" + " JOIN account ON account.id = operation.account" +
" LEFT JOIN account ON account.id = operation.account" + " WHERE operation.account = :account" +
" LEFT JOIN identity ON identity.id = message.identity" + " AND account.synchronize" +
" WHERE CASE WHEN :folder IS NULL THEN folder.account IS NULL ELSE operation.folder = :folder END" + " AND folder.account IS NOT NULL" + // not outbox
" AND (account.synchronize IS NULL OR account.synchronize)" + " ORDER BY " + priority + ", id")
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)" + LiveData<List<TupleOperationEx>> liveOperations(long account);
" ORDER BY " + priority + ", id";
@Query(GET_OPS_FOLDER)
List<TupleOperationEx> getOperations(Long folder);
@Transaction @Transaction
@Query(GET_OPS_FOLDER) @Query("SELECT operation.*" +
LiveData<List<TupleOperationEx>> liveOperations(Long folder); " FROM operation" +
" JOIN folder ON folder.id = operation.folder" +
" WHERE folder.account IS NULL" + // outbox
" ORDER BY id")
LiveData<List<EntityOperation>> liveSend();
@Query("SELECT COUNT(operation.id) AS pending" + @Query("SELECT COUNT(operation.id) AS pending" +
", SUM(CASE WHEN operation.error IS NULL THEN 0 ELSE 1 END) AS errors" + ", SUM(CASE WHEN operation.error IS NULL THEN 0 ELSE 1 END) AS errors" +
" FROM operation" + " FROM operation" +
" JOIN folder ON folder.id = operation.folder" +
" LEFT JOIN message ON message.id = operation.message" +
" LEFT JOIN account ON account.id = operation.account" + " LEFT JOIN account ON account.id = operation.account" +
" LEFT JOIN identity ON identity.id = message.identity" + " WHERE (account.synchronize IS NULL OR account.synchronize)")
" WHERE (account.synchronize IS NULL OR account.synchronize)" +
" AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)")
LiveData<TupleOperationStats> liveStats(); LiveData<TupleOperationStats> liveStats();
@Query("SELECT" + @Query("SELECT" +
" COUNT(operation.id) AS count" + " COUNT(operation.id) AS count" +
", SUM(CASE WHEN operation.state = 'executing' THEN 1 ELSE 0 END) AS busy" + ", SUM(CASE WHEN operation.state = 'executing' THEN 1 ELSE 0 END) AS busy" +
" FROM operation" + " FROM operation" +
" JOIN message ON message.id = operation.message" + " WHERE operation.name = '" + EntityOperation.SEND + "'")
" JOIN identity ON identity.id = message.identity" +
" WHERE operation.name = '" + EntityOperation.SEND + "'" +
" AND identity.synchronize")
LiveData<TupleUnsent> liveUnsent(); LiveData<TupleUnsent> liveUnsent();
@Query("SELECT * FROM operation ORDER BY id") @Query("SELECT * FROM operation ORDER BY id")

@ -0,0 +1,24 @@
package eu.faircode.email;
/*
This file is part of FairEmail.
FairEmail is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
FairEmail is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with FairEmail. If not, see <http://www.gnu.org/licenses/>.
Copyright 2018-2020 by Marcel Bokhorst (M66B)
*/
public class ObjectHolder<T> {
public T value;
}

@ -112,16 +112,16 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar
}); });
// Observe send operations // Observe send operations
db.operation().liveOperations(null).observe(owner, new Observer<List<TupleOperationEx>>() { db.operation().liveSend().observe(owner, new Observer<List<EntityOperation>>() {
@Override @Override
public void onChanged(List<TupleOperationEx> operations) { public void onChanged(List<EntityOperation> operations) {
if (operations == null) if (operations == null)
operations = new ArrayList<>(); operations = new ArrayList<>();
final List<TupleOperationEx> process = new ArrayList<>(); final List<EntityOperation> process = new ArrayList<>();
List<Long> ops = new ArrayList<>(); List<Long> ops = new ArrayList<>();
for (TupleOperationEx op : operations) { for (EntityOperation op : operations) {
if (!handling.contains(op.id)) if (!handling.contains(op.id))
process.add(op); process.add(op);
ops.add(op.id); ops.add(op.id);
@ -322,7 +322,7 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar
} }
}; };
private void processOperations(List<TupleOperationEx> ops) { private void processOperations(List<EntityOperation> ops) {
try { try {
wlOutbox.acquire(); wlOutbox.acquire();
@ -338,7 +338,7 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar
if (!ConnectionHelper.getNetworkState(this).isSuitable()) if (!ConnectionHelper.getNetworkState(this).isSuitable())
break; break;
TupleOperationEx op = ops.get(0); EntityOperation op = ops.get(0);
EntityMessage message = null; EntityMessage message = null;
if (op.message != null) if (op.message != null)
@ -707,7 +707,7 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar
EntityFolder outbox = db.folder().getOutbox(); EntityFolder outbox = db.folder().getOutbox();
if (outbox != null) { if (outbox != null) {
int operations = db.operation().getOperations(outbox.id).size(); int operations = db.operation().getOperations(EntityOperation.SEND).size();
if (operations > 0) if (operations > 0)
start(context); start(context);
else { else {

@ -908,7 +908,7 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
state.reset(); state.reset();
Log.i(account.name + " run thread=" + currentThread); Log.i(account.name + " run thread=" + currentThread);
final List<TwoStateOwner> cowners = new ArrayList<>(); final ObjectHolder<TwoStateOwner> cowner = new ObjectHolder<>();
final ExecutorService executor = final ExecutorService executor =
Helper.getBackgroundExecutor(1, "account_" + account.id); Helper.getBackgroundExecutor(1, "account_" + account.id);
@ -1220,179 +1220,189 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
EntityOperation.sync(this, folder.id, false); EntityOperation.sync(this, folder.id, false);
} else } else
mapFolders.put(folder, null); mapFolders.put(folder, null);
}
Log.d(folder.name + " observing"); Log.i(account.name + " observing operations");
getMainHandler().post(new Runnable() { getMainHandler().post(new Runnable() {
@Override @Override
public void run() { public void run() {
TwoStateOwner cowner = new TwoStateOwner(ServiceSynchronize.this, folder.name); cowner.value = new TwoStateOwner(ServiceSynchronize.this, account.name);
cowners.add(cowner); cowner.value.start();
cowner.start();
db.operation().liveOperations(folder.id).observe(cowner, new Observer<List<TupleOperationEx>>() {
private List<Long> handling = new ArrayList<>();
private final Map<TupleOperationEx.PartitionKey, List<TupleOperationEx>> partitions = new HashMap<>();
private final PowerManager.WakeLock wlFolder = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":folder." + folder.id);
@Override
public void onChanged(final List<TupleOperationEx> _operations) {
// Get new operations
List<Long> ops = new ArrayList<>();
List<TupleOperationEx> added = new ArrayList<>();
for (TupleOperationEx op : _operations) {
if (!handling.contains(op.id))
added.add(op);
ops.add(op.id);
}
handling = ops;
if (added.size() > 0) {
Log.i(folder.name + " queuing operations=" + added.size() +
" init=" + folder.initialize + " poll=" + folder.poll);
// Partition operations by priority db.operation().liveOperations(account.id).observe(cowner.value, new Observer<List<TupleOperationEx>>() {
boolean offline = (mapFolders.get(folder) == null); private List<Long> handling = new ArrayList<>();
List<TupleOperationEx.PartitionKey> keys = new ArrayList<>(); private final Map<TupleOperationEx.PartitionKey, List<TupleOperationEx>> partitions = new HashMap<>();
synchronized (partitions) {
for (TupleOperationEx op : added) {
TupleOperationEx.PartitionKey key = op.getPartitionKey(offline);
if (!partitions.containsKey(key)) { private final PowerManager.WakeLock wlOperations = pm.newWakeLock(
partitions.put(key, new ArrayList<>()); PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":operations." + account.id);
keys.add(key);
}
partitions.get(key).add(op); @Override
public void onChanged(final List<TupleOperationEx> _operations) {
// Get new operations
List<Long> ops = new ArrayList<>();
Map<EntityFolder, List<TupleOperationEx>> added = new HashMap<>();
for (TupleOperationEx op : _operations) {
if (!handling.contains(op.id)) {
boolean found = false;
for (EntityFolder folder : mapFolders.keySet())
if (Objects.equals(folder.id, op.folder)) {
found = true;
if (!added.containsKey(folder))
added.put(folder, new ArrayList<>());
added.get(folder).add(op);
break;
} }
} if (!found)
Log.e(account.name + " folder not found operation=" + op.name);
Collections.sort(keys, new Comparator<TupleOperationEx.PartitionKey>() { }
@Override ops.add(op.id);
public int compare(TupleOperationEx.PartitionKey k1, TupleOperationEx.PartitionKey k2) { }
Integer p1 = k1.getPriority(); handling = ops;
Integer p2 = k2.getPriority();
int priority = p1.compareTo(p2); for (EntityFolder folder : added.keySet()) {
if (priority == 0) { Log.i(folder.name + " queuing operations=" + added.size() +
Long o1 = k1.getOrder(); " init=" + folder.initialize + " poll=" + folder.poll);
Long o2 = k2.getOrder();
return o1.compareTo(o2); // Partition operations by priority
} else boolean offline = (mapFolders.get(folder) == null);
return priority; List<TupleOperationEx.PartitionKey> keys = new ArrayList<>();
synchronized (partitions) {
for (TupleOperationEx op : added.get(folder)) {
TupleOperationEx.PartitionKey key = op.getPartitionKey(offline);
if (!partitions.containsKey(key)) {
partitions.put(key, new ArrayList<>());
keys.add(key);
} }
});
for (TupleOperationEx.PartitionKey key : keys) { partitions.get(key).add(op);
synchronized (partitions) { }
Log.i(folder.name + }
" queuing partition=" + key +
" operations=" + partitions.get(key).size());
}
final long sequence = state.getSequence(folder.id, key.getPriority()); Collections.sort(keys, new Comparator<TupleOperationEx.PartitionKey>() {
@Override
public int compare(TupleOperationEx.PartitionKey k1, TupleOperationEx.PartitionKey k2) {
Integer p1 = k1.getPriority();
Integer p2 = k2.getPriority();
int priority = p1.compareTo(p2);
if (priority == 0) {
Long o1 = k1.getOrder();
Long o2 = k2.getOrder();
return o1.compareTo(o2);
} else
return priority;
}
});
executor.submit(new Helper.PriorityRunnable(key.getPriority(), key.getOrder()) { for (TupleOperationEx.PartitionKey key : keys) {
@Override synchronized (partitions) {
public void run() { Log.i(folder.name +
super.run(); " queuing partition=" + key +
try { " operations=" + partitions.get(key).size());
wlFolder.acquire(); }
List<TupleOperationEx> partition; final long sequence = state.getSequence(folder.id, key.getPriority());
synchronized (partitions) {
partition = partitions.get(key);
partitions.remove(key);
}
Log.i(folder.name + executor.submit(new Helper.PriorityRunnable(key.getPriority(), key.getOrder()) {
" executing partition=" + key + @Override
" operations=" + partition.size()); public void run() {
super.run();
try {
wlOperations.acquire();
List<TupleOperationEx> partition;
synchronized (partitions) {
partition = partitions.get(key);
partitions.remove(key);
}
// Get folder Log.i(folder.name +
Folder ifolder = mapFolders.get(folder); // null when polling " executing partition=" + key +
boolean canOpen = (account.protocol == EntityAccount.TYPE_IMAP || EntityFolder.INBOX.equals(folder.type)); " operations=" + partition.size());
final boolean shouldClose = (ifolder == null && canOpen);
try { // Get folder
Log.i(folder.name + " run " + (shouldClose ? "offline" : "online")); Folder ifolder = mapFolders.get(folder); // null when polling
boolean canOpen = (account.protocol == EntityAccount.TYPE_IMAP || EntityFolder.INBOX.equals(folder.type));
final boolean shouldClose = (ifolder == null && canOpen);
if (shouldClose) { try {
// Prevent unnecessary folder connections Log.i(folder.name + " run " + (shouldClose ? "offline" : "online"));
if (db.operation().getOperationCount(folder.id, null) == 0)
return;
db.folder().setFolderState(folder.id, "connecting"); if (shouldClose) {
// Prevent unnecessary folder connections
if (db.operation().getOperationCount(folder.id, null) == 0)
return;
try { db.folder().setFolderState(folder.id, "connecting");
ifolder = iservice.getStore().getFolder(folder.name);
} catch (IllegalStateException ex) {
if ("Not connected".equals(ex.getMessage()))
return; // Store closed
else
throw ex;
}
try { try {
ifolder.open(Folder.READ_WRITE); ifolder = iservice.getStore().getFolder(folder.name);
if (ifolder instanceof IMAPFolder) } catch (IllegalStateException ex) {
db.folder().setFolderReadOnly(folder.id, ((IMAPFolder) ifolder).getUIDNotSticky()); if ("Not connected".equals(ex.getMessage()))
} catch (ReadOnlyFolderException ex) { return; // Store closed
Log.w(folder.name + " read only"); else
ifolder.open(Folder.READ_ONLY); throw ex;
db.folder().setFolderReadOnly(folder.id, true); }
}
db.folder().setFolderState(folder.id, "connected"); try {
db.folder().setFolderError(folder.id, null); ifolder.open(Folder.READ_WRITE);
if (ifolder instanceof IMAPFolder)
db.folder().setFolderReadOnly(folder.id, ((IMAPFolder) ifolder).getUIDNotSticky());
} catch (ReadOnlyFolderException ex) {
Log.w(folder.name + " read only");
ifolder.open(Folder.READ_ONLY);
db.folder().setFolderReadOnly(folder.id, true);
}
int count = MessageHelper.getMessageCount(ifolder); db.folder().setFolderState(folder.id, "connected");
db.folder().setFolderTotal(folder.id, count < 0 ? null : count); db.folder().setFolderError(folder.id, null);
Log.i(account.name + " folder " + folder.name + " flags=" + ifolder.getPermanentFlags()); int count = MessageHelper.getMessageCount(ifolder);
} db.folder().setFolderTotal(folder.id, count < 0 ? null : count);
Core.processOperations(ServiceSynchronize.this, Log.i(account.name + " folder " + folder.name + " flags=" + ifolder.getPermanentFlags());
account, folder, }
partition,
iservice.getStore(), ifolder, Core.processOperations(ServiceSynchronize.this,
state, key.getPriority(), sequence); account, folder,
partition,
} catch (Throwable ex) { iservice.getStore(), ifolder,
Log.e(folder.name, ex); state, key.getPriority(), sequence);
EntityLog.log(
ServiceSynchronize.this, } catch (Throwable ex) {
folder.name + " " + Log.formatThrowable(ex, false)); Log.e(folder.name, ex);
db.folder().setFolderError(folder.id, Log.formatThrowable(ex)); EntityLog.log(
state.error(new OperationCanceledException("Process")); ServiceSynchronize.this,
} finally { folder.name + " " + Log.formatThrowable(ex, false));
if (shouldClose) { db.folder().setFolderError(folder.id, Log.formatThrowable(ex));
if (ifolder != null && ifolder.isOpen()) { state.error(new OperationCanceledException("Process"));
db.folder().setFolderState(folder.id, "closing"); } finally {
try { if (shouldClose) {
ifolder.close(false); if (ifolder != null && ifolder.isOpen()) {
} catch (Throwable ex) { db.folder().setFolderState(folder.id, "closing");
Log.w(folder.name, ex); try {
} ifolder.close(false);
} catch (Throwable ex) {
Log.w(folder.name, ex);
} }
if (folder.synchronize && (folder.poll || !capIdle))
db.folder().setFolderState(folder.id, "waiting");
else
db.folder().setFolderState(folder.id, null);
} }
if (folder.synchronize && (folder.poll || !capIdle))
db.folder().setFolderState(folder.id, "waiting");
else
db.folder().setFolderState(folder.id, null);
} }
} finally {
wlFolder.release();
} }
} finally {
wlOperations.release();
} }
}); }
} });
} }
} }
}); }
} });
}); }
} });
// Keep alive // Keep alive
boolean first = true; boolean first = true;
@ -1603,8 +1613,11 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
getMainHandler().post(new Runnable() { getMainHandler().post(new Runnable() {
@Override @Override
public void run() { public void run() {
for (TwoStateOwner owner : cowners) try {
owner.destroy(); cowner.value.destroy();
} catch (Throwable ex) {
Log.e(ex);
}
} }
}); });

Loading…
Cancel
Save