From 29bad587780725b0d75d7f0962ececd263265809 Mon Sep 17 00:00:00 2001 From: M66B Date: Sun, 4 Oct 2020 16:57:19 +0200 Subject: [PATCH] Monitor operations per account --- .../java/eu/faircode/email/DaoOperation.java | 46 +-- .../java/eu/faircode/email/ObjectHolder.java | 24 ++ .../java/eu/faircode/email/ServiceSend.java | 14 +- .../eu/faircode/email/ServiceSynchronize.java | 309 +++++++++--------- 4 files changed, 210 insertions(+), 183 deletions(-) create mode 100644 app/src/main/java/eu/faircode/email/ObjectHolder.java diff --git a/app/src/main/java/eu/faircode/email/DaoOperation.java b/app/src/main/java/eu/faircode/email/DaoOperation.java index 01ca9be1a5..bdb8979c81 100644 --- a/app/src/main/java/eu/faircode/email/DaoOperation.java +++ b/app/src/main/java/eu/faircode/email/DaoOperation.java @@ -48,57 +48,47 @@ public interface DaoOperation { @Query("SELECT operation.*" + ", " + priority + " AS priority" + ", account.name AS accountName, folder.name AS folderName" + - " ,((account.synchronize IS NULL OR account.synchronize)" + - " AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)) AS synchronize" + + ", (account.synchronize IS NULL OR account.synchronize) AS synchronize" + " FROM operation" + " JOIN folder ON folder.id = operation.folder" + - " LEFT JOIN message ON message.id = operation.message" + " LEFT JOIN account ON account.id = operation.account" + - " LEFT JOIN identity ON identity.id = message.identity" + " ORDER BY " + priority + ", id") LiveData> liveOperations(); - String GET_OPS_FOLDER = "SELECT operation.*" + + @Transaction + @Query("SELECT operation.*" + ", " + priority + " AS priority" + ", account.name AS accountName, folder.name AS folderName" + - " ,((account.synchronize IS NULL OR account.synchronize)" + - " AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)) AS synchronize" + + ", account.synchronize" + " FROM operation" + " JOIN folder ON folder.id = operation.folder" + - " LEFT JOIN message ON message.id = operation.message" + - " LEFT JOIN account ON account.id = operation.account" + - " LEFT JOIN identity ON identity.id = message.identity" + - " WHERE CASE WHEN :folder IS NULL THEN folder.account IS NULL ELSE operation.folder = :folder END" + - " AND (account.synchronize IS NULL OR account.synchronize)" + - " AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)" + - " ORDER BY " + priority + ", id"; - - @Query(GET_OPS_FOLDER) - List getOperations(Long folder); + " JOIN account ON account.id = operation.account" + + " WHERE operation.account = :account" + + " AND account.synchronize" + + " AND folder.account IS NOT NULL" + // not outbox + " ORDER BY " + priority + ", id") + LiveData> liveOperations(long account); @Transaction - @Query(GET_OPS_FOLDER) - LiveData> liveOperations(Long folder); + @Query("SELECT operation.*" + + " FROM operation" + + " JOIN folder ON folder.id = operation.folder" + + " WHERE folder.account IS NULL" + // outbox + " ORDER BY id") + LiveData> liveSend(); @Query("SELECT COUNT(operation.id) AS pending" + ", SUM(CASE WHEN operation.error IS NULL THEN 0 ELSE 1 END) AS errors" + " FROM operation" + - " JOIN folder ON folder.id = operation.folder" + - " LEFT JOIN message ON message.id = operation.message" + " LEFT JOIN account ON account.id = operation.account" + - " LEFT JOIN identity ON identity.id = message.identity" + - " WHERE (account.synchronize IS NULL OR account.synchronize)" + - " AND (NOT folder.account IS NULL OR identity.synchronize IS NULL OR identity.synchronize)") + " WHERE (account.synchronize IS NULL OR account.synchronize)") LiveData liveStats(); @Query("SELECT" + " COUNT(operation.id) AS count" + ", SUM(CASE WHEN operation.state = 'executing' THEN 1 ELSE 0 END) AS busy" + " FROM operation" + - " JOIN message ON message.id = operation.message" + - " JOIN identity ON identity.id = message.identity" + - " WHERE operation.name = '" + EntityOperation.SEND + "'" + - " AND identity.synchronize") + " WHERE operation.name = '" + EntityOperation.SEND + "'") LiveData liveUnsent(); @Query("SELECT * FROM operation ORDER BY id") diff --git a/app/src/main/java/eu/faircode/email/ObjectHolder.java b/app/src/main/java/eu/faircode/email/ObjectHolder.java new file mode 100644 index 0000000000..76b85f6b05 --- /dev/null +++ b/app/src/main/java/eu/faircode/email/ObjectHolder.java @@ -0,0 +1,24 @@ +package eu.faircode.email; + +/* + This file is part of FairEmail. + + FairEmail is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + FairEmail is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with FairEmail. If not, see . + + Copyright 2018-2020 by Marcel Bokhorst (M66B) +*/ + +public class ObjectHolder { + public T value; +} diff --git a/app/src/main/java/eu/faircode/email/ServiceSend.java b/app/src/main/java/eu/faircode/email/ServiceSend.java index 5226c2daaa..74fe0a46be 100644 --- a/app/src/main/java/eu/faircode/email/ServiceSend.java +++ b/app/src/main/java/eu/faircode/email/ServiceSend.java @@ -112,16 +112,16 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar }); // Observe send operations - db.operation().liveOperations(null).observe(owner, new Observer>() { + db.operation().liveSend().observe(owner, new Observer>() { @Override - public void onChanged(List operations) { + public void onChanged(List operations) { if (operations == null) operations = new ArrayList<>(); - final List process = new ArrayList<>(); + final List process = new ArrayList<>(); List ops = new ArrayList<>(); - for (TupleOperationEx op : operations) { + for (EntityOperation op : operations) { if (!handling.contains(op.id)) process.add(op); ops.add(op.id); @@ -322,7 +322,7 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar } }; - private void processOperations(List ops) { + private void processOperations(List ops) { try { wlOutbox.acquire(); @@ -338,7 +338,7 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar if (!ConnectionHelper.getNetworkState(this).isSuitable()) break; - TupleOperationEx op = ops.get(0); + EntityOperation op = ops.get(0); EntityMessage message = null; if (op.message != null) @@ -707,7 +707,7 @@ public class ServiceSend extends ServiceBase implements SharedPreferences.OnShar EntityFolder outbox = db.folder().getOutbox(); if (outbox != null) { - int operations = db.operation().getOperations(outbox.id).size(); + int operations = db.operation().getOperations(EntityOperation.SEND).size(); if (operations > 0) start(context); else { diff --git a/app/src/main/java/eu/faircode/email/ServiceSynchronize.java b/app/src/main/java/eu/faircode/email/ServiceSynchronize.java index 7fecb7489a..3d9db5d68f 100644 --- a/app/src/main/java/eu/faircode/email/ServiceSynchronize.java +++ b/app/src/main/java/eu/faircode/email/ServiceSynchronize.java @@ -908,7 +908,7 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences state.reset(); Log.i(account.name + " run thread=" + currentThread); - final List cowners = new ArrayList<>(); + final ObjectHolder cowner = new ObjectHolder<>(); final ExecutorService executor = Helper.getBackgroundExecutor(1, "account_" + account.id); @@ -1220,179 +1220,189 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences EntityOperation.sync(this, folder.id, false); } else mapFolders.put(folder, null); + } - Log.d(folder.name + " observing"); - getMainHandler().post(new Runnable() { - @Override - public void run() { - TwoStateOwner cowner = new TwoStateOwner(ServiceSynchronize.this, folder.name); - cowners.add(cowner); - cowner.start(); - - db.operation().liveOperations(folder.id).observe(cowner, new Observer>() { - private List handling = new ArrayList<>(); - private final Map> partitions = new HashMap<>(); - - private final PowerManager.WakeLock wlFolder = pm.newWakeLock( - PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":folder." + folder.id); - - @Override - public void onChanged(final List _operations) { - // Get new operations - List ops = new ArrayList<>(); - List added = new ArrayList<>(); - for (TupleOperationEx op : _operations) { - if (!handling.contains(op.id)) - added.add(op); - ops.add(op.id); - } - handling = ops; - - if (added.size() > 0) { - Log.i(folder.name + " queuing operations=" + added.size() + - " init=" + folder.initialize + " poll=" + folder.poll); + Log.i(account.name + " observing operations"); + getMainHandler().post(new Runnable() { + @Override + public void run() { + cowner.value = new TwoStateOwner(ServiceSynchronize.this, account.name); + cowner.value.start(); - // Partition operations by priority - boolean offline = (mapFolders.get(folder) == null); - List keys = new ArrayList<>(); - synchronized (partitions) { - for (TupleOperationEx op : added) { - TupleOperationEx.PartitionKey key = op.getPartitionKey(offline); + db.operation().liveOperations(account.id).observe(cowner.value, new Observer>() { + private List handling = new ArrayList<>(); + private final Map> partitions = new HashMap<>(); - if (!partitions.containsKey(key)) { - partitions.put(key, new ArrayList<>()); - keys.add(key); - } + private final PowerManager.WakeLock wlOperations = pm.newWakeLock( + PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":operations." + account.id); - partitions.get(key).add(op); + @Override + public void onChanged(final List _operations) { + // Get new operations + List ops = new ArrayList<>(); + Map> added = new HashMap<>(); + for (TupleOperationEx op : _operations) { + if (!handling.contains(op.id)) { + boolean found = false; + for (EntityFolder folder : mapFolders.keySet()) + if (Objects.equals(folder.id, op.folder)) { + found = true; + if (!added.containsKey(folder)) + added.put(folder, new ArrayList<>()); + added.get(folder).add(op); + break; } - } - - Collections.sort(keys, new Comparator() { - @Override - public int compare(TupleOperationEx.PartitionKey k1, TupleOperationEx.PartitionKey k2) { - Integer p1 = k1.getPriority(); - Integer p2 = k2.getPriority(); - int priority = p1.compareTo(p2); - if (priority == 0) { - Long o1 = k1.getOrder(); - Long o2 = k2.getOrder(); - return o1.compareTo(o2); - } else - return priority; + if (!found) + Log.e(account.name + " folder not found operation=" + op.name); + } + ops.add(op.id); + } + handling = ops; + + for (EntityFolder folder : added.keySet()) { + Log.i(folder.name + " queuing operations=" + added.size() + + " init=" + folder.initialize + " poll=" + folder.poll); + + // Partition operations by priority + boolean offline = (mapFolders.get(folder) == null); + List keys = new ArrayList<>(); + synchronized (partitions) { + for (TupleOperationEx op : added.get(folder)) { + TupleOperationEx.PartitionKey key = op.getPartitionKey(offline); + + if (!partitions.containsKey(key)) { + partitions.put(key, new ArrayList<>()); + keys.add(key); } - }); - for (TupleOperationEx.PartitionKey key : keys) { - synchronized (partitions) { - Log.i(folder.name + - " queuing partition=" + key + - " operations=" + partitions.get(key).size()); - } + partitions.get(key).add(op); + } + } - final long sequence = state.getSequence(folder.id, key.getPriority()); + Collections.sort(keys, new Comparator() { + @Override + public int compare(TupleOperationEx.PartitionKey k1, TupleOperationEx.PartitionKey k2) { + Integer p1 = k1.getPriority(); + Integer p2 = k2.getPriority(); + int priority = p1.compareTo(p2); + if (priority == 0) { + Long o1 = k1.getOrder(); + Long o2 = k2.getOrder(); + return o1.compareTo(o2); + } else + return priority; + } + }); - executor.submit(new Helper.PriorityRunnable(key.getPriority(), key.getOrder()) { - @Override - public void run() { - super.run(); - try { - wlFolder.acquire(); + for (TupleOperationEx.PartitionKey key : keys) { + synchronized (partitions) { + Log.i(folder.name + + " queuing partition=" + key + + " operations=" + partitions.get(key).size()); + } - List partition; - synchronized (partitions) { - partition = partitions.get(key); - partitions.remove(key); - } + final long sequence = state.getSequence(folder.id, key.getPriority()); - Log.i(folder.name + - " executing partition=" + key + - " operations=" + partition.size()); + executor.submit(new Helper.PriorityRunnable(key.getPriority(), key.getOrder()) { + @Override + public void run() { + super.run(); + try { + wlOperations.acquire(); + + List partition; + synchronized (partitions) { + partition = partitions.get(key); + partitions.remove(key); + } - // Get folder - Folder ifolder = mapFolders.get(folder); // null when polling - boolean canOpen = (account.protocol == EntityAccount.TYPE_IMAP || EntityFolder.INBOX.equals(folder.type)); - final boolean shouldClose = (ifolder == null && canOpen); + Log.i(folder.name + + " executing partition=" + key + + " operations=" + partition.size()); - try { - Log.i(folder.name + " run " + (shouldClose ? "offline" : "online")); + // Get folder + Folder ifolder = mapFolders.get(folder); // null when polling + boolean canOpen = (account.protocol == EntityAccount.TYPE_IMAP || EntityFolder.INBOX.equals(folder.type)); + final boolean shouldClose = (ifolder == null && canOpen); - if (shouldClose) { - // Prevent unnecessary folder connections - if (db.operation().getOperationCount(folder.id, null) == 0) - return; + try { + Log.i(folder.name + " run " + (shouldClose ? "offline" : "online")); - db.folder().setFolderState(folder.id, "connecting"); + if (shouldClose) { + // Prevent unnecessary folder connections + if (db.operation().getOperationCount(folder.id, null) == 0) + return; - try { - ifolder = iservice.getStore().getFolder(folder.name); - } catch (IllegalStateException ex) { - if ("Not connected".equals(ex.getMessage())) - return; // Store closed - else - throw ex; - } + db.folder().setFolderState(folder.id, "connecting"); - try { - ifolder.open(Folder.READ_WRITE); - if (ifolder instanceof IMAPFolder) - db.folder().setFolderReadOnly(folder.id, ((IMAPFolder) ifolder).getUIDNotSticky()); - } catch (ReadOnlyFolderException ex) { - Log.w(folder.name + " read only"); - ifolder.open(Folder.READ_ONLY); - db.folder().setFolderReadOnly(folder.id, true); - } + try { + ifolder = iservice.getStore().getFolder(folder.name); + } catch (IllegalStateException ex) { + if ("Not connected".equals(ex.getMessage())) + return; // Store closed + else + throw ex; + } - db.folder().setFolderState(folder.id, "connected"); - db.folder().setFolderError(folder.id, null); + try { + ifolder.open(Folder.READ_WRITE); + if (ifolder instanceof IMAPFolder) + db.folder().setFolderReadOnly(folder.id, ((IMAPFolder) ifolder).getUIDNotSticky()); + } catch (ReadOnlyFolderException ex) { + Log.w(folder.name + " read only"); + ifolder.open(Folder.READ_ONLY); + db.folder().setFolderReadOnly(folder.id, true); + } - int count = MessageHelper.getMessageCount(ifolder); - db.folder().setFolderTotal(folder.id, count < 0 ? null : count); + db.folder().setFolderState(folder.id, "connected"); + db.folder().setFolderError(folder.id, null); - Log.i(account.name + " folder " + folder.name + " flags=" + ifolder.getPermanentFlags()); - } + int count = MessageHelper.getMessageCount(ifolder); + db.folder().setFolderTotal(folder.id, count < 0 ? null : count); - Core.processOperations(ServiceSynchronize.this, - account, folder, - partition, - iservice.getStore(), ifolder, - state, key.getPriority(), sequence); - - } catch (Throwable ex) { - Log.e(folder.name, ex); - EntityLog.log( - ServiceSynchronize.this, - folder.name + " " + Log.formatThrowable(ex, false)); - db.folder().setFolderError(folder.id, Log.formatThrowable(ex)); - state.error(new OperationCanceledException("Process")); - } finally { - if (shouldClose) { - if (ifolder != null && ifolder.isOpen()) { - db.folder().setFolderState(folder.id, "closing"); - try { - ifolder.close(false); - } catch (Throwable ex) { - Log.w(folder.name, ex); - } + Log.i(account.name + " folder " + folder.name + " flags=" + ifolder.getPermanentFlags()); + } + + Core.processOperations(ServiceSynchronize.this, + account, folder, + partition, + iservice.getStore(), ifolder, + state, key.getPriority(), sequence); + + } catch (Throwable ex) { + Log.e(folder.name, ex); + EntityLog.log( + ServiceSynchronize.this, + folder.name + " " + Log.formatThrowable(ex, false)); + db.folder().setFolderError(folder.id, Log.formatThrowable(ex)); + state.error(new OperationCanceledException("Process")); + } finally { + if (shouldClose) { + if (ifolder != null && ifolder.isOpen()) { + db.folder().setFolderState(folder.id, "closing"); + try { + ifolder.close(false); + } catch (Throwable ex) { + Log.w(folder.name, ex); } - if (folder.synchronize && (folder.poll || !capIdle)) - db.folder().setFolderState(folder.id, "waiting"); - else - db.folder().setFolderState(folder.id, null); } + if (folder.synchronize && (folder.poll || !capIdle)) + db.folder().setFolderState(folder.id, "waiting"); + else + db.folder().setFolderState(folder.id, null); } - } finally { - wlFolder.release(); } + } finally { + wlOperations.release(); } - }); - } + } + }); } } - }); - } - }); - } + } + }); + } + }); // Keep alive boolean first = true; @@ -1603,8 +1613,11 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences getMainHandler().post(new Runnable() { @Override public void run() { - for (TwoStateOwner owner : cowners) - owner.destroy(); + try { + cowner.value.destroy(); + } catch (Throwable ex) { + Log.e(ex); + } } });