Rewritten operation processing

pull/12/merge
M66B 6 years ago
parent dde022a4c1
commit a980b0a76f

@ -37,6 +37,12 @@ public interface DaoOperation {
" ORDER BY operation.id") " ORDER BY operation.id")
List<TupleOperationEx> getOperations(long folder); List<TupleOperationEx> getOperations(long folder);
@Query("SELECT COUNT(operation.id) FROM operation" +
" JOIN message ON message.id = operation.message" +
" WHERE folder = :folder" +
" ORDER BY operation.id")
int getOperationCount(long folder);
@Query("DELETE FROM operation WHERE id = :id") @Query("DELETE FROM operation WHERE id = :id")
void deleteOperation(long id); void deleteOperation(long id);

@ -99,6 +99,9 @@ public class EntityOperation {
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(context); LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(context);
lbm.sendBroadcast( lbm.sendBroadcast(
new Intent(ServiceSynchronize.ACTION_PROCESS_OPERATIONS + message.folder)); new Intent(SEND.equals(name)
? ServiceSynchronize.ACTION_PROCESS_OUTBOX
: ServiceSynchronize.ACTION_PROCESS_FOLDER)
.putExtra("folder", message.folder));
} }
} }

@ -56,7 +56,9 @@ import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -98,7 +100,8 @@ public class ServiceSynchronize extends LifecycleService {
private static final int FETCH_BATCH_SIZE = 10; private static final int FETCH_BATCH_SIZE = 10;
private static final int DOWNLOAD_BUFFER_SIZE = 8192; // bytes private static final int DOWNLOAD_BUFFER_SIZE = 8192; // bytes
static final String ACTION_PROCESS_OPERATIONS = BuildConfig.APPLICATION_ID + ".PROCESS_OPERATIONS."; static final String ACTION_PROCESS_OUTBOX = BuildConfig.APPLICATION_ID + ".PROCESS_OUTBOX";
static final String ACTION_PROCESS_FOLDER = BuildConfig.APPLICATION_ID + ".PROCESS_FOLDER";
private class ServiceState { private class ServiceState {
boolean running = false; boolean running = false;
@ -278,6 +281,7 @@ public class ServiceSynchronize extends LifecycleService {
// Listen for connection changes // Listen for connection changes
istore.addConnectionListener(new ConnectionAdapter() { istore.addConnectionListener(new ConnectionAdapter() {
List<Thread> folderThreads = new ArrayList<>(); List<Thread> folderThreads = new ArrayList<>();
Map<Long, IMAPFolder> mapFolder = new HashMap<>();
@Override @Override
public void opened(ConnectionEvent e) { public void opened(ConnectionEvent e) {
@ -291,8 +295,25 @@ public class ServiceSynchronize extends LifecycleService {
Thread thread = new Thread(new Runnable() { Thread thread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
IMAPFolder ifolder = null;
try { try {
monitorFolder(account, folder, fstore); Log.i(Helper.TAG, folder.name + " start");
ifolder = (IMAPFolder) fstore.getFolder(folder.name);
ifolder.open(Folder.READ_WRITE);
synchronized (mapFolder) {
mapFolder.put(folder.id, ifolder);
}
monitorFolder(account, folder, fstore, ifolder);
} catch (FolderNotFoundException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
// Disable synchronization
folder.synchronize = false;
DB.getInstance(ServiceSynchronize.this).folder().updateFolder(folder);
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex); reportError(account.name, folder.name, ex);
@ -303,12 +324,30 @@ public class ServiceSynchronize extends LifecycleService {
} catch (MessagingException e1) { } catch (MessagingException e1) {
Log.w(Helper.TAG, account.name + " " + e1 + "\n" + Log.getStackTraceString(e1)); Log.w(Helper.TAG, account.name + " " + e1 + "\n" + Log.getStackTraceString(e1));
} }
} finally {
if (ifolder != null && ifolder.isOpen()) {
try {
ifolder.close(false);
} catch (MessagingException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
}
}
Log.i(Helper.TAG, folder.name + " stop");
} }
} }
}, "sync.folder." + folder.id); }, "sync.folder." + folder.id);
folderThreads.add(thread); folderThreads.add(thread);
thread.start(); thread.start();
} }
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.registerReceiver(offline, new IntentFilter(ACTION_PROCESS_FOLDER));
Log.i(Helper.TAG, "listen process folder");
for (final EntityFolder folder : db.folder().getFolders(account.id, false))
if (!EntityFolder.TYPE_OUTBOX.equals(folder.type) &&
db.operation().getOperationCount(folder.id) > 0)
lbm.sendBroadcast(new Intent(ACTION_PROCESS_FOLDER).putExtra("folder", folder.id));
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(Helper.TAG, account.name + " " + ex + "\n" + Log.getStackTraceString(ex)); Log.e(Helper.TAG, account.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, null, ex); reportError(account.name, null, ex);
@ -326,6 +365,13 @@ public class ServiceSynchronize extends LifecycleService {
public void disconnected(ConnectionEvent e) { public void disconnected(ConnectionEvent e) {
Log.e(Helper.TAG, account.name + " disconnected"); Log.e(Helper.TAG, account.name + " disconnected");
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.unregisterReceiver(offline);
synchronized (mapFolder) {
mapFolder.clear();
}
// Check connection // Check connection
synchronized (state) { synchronized (state) {
state.notifyAll(); state.notifyAll();
@ -336,11 +382,71 @@ public class ServiceSynchronize extends LifecycleService {
public void closed(ConnectionEvent e) { public void closed(ConnectionEvent e) {
Log.e(Helper.TAG, account.name + " closed"); Log.e(Helper.TAG, account.name + " closed");
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.unregisterReceiver(offline);
synchronized (mapFolder) {
mapFolder.clear();
}
// Check connection // Check connection
synchronized (state) { synchronized (state) {
state.notifyAll(); state.notifyAll();
} }
} }
BroadcastReceiver offline = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
final long fid = intent.getLongExtra("folder", -1);
IMAPFolder x;
synchronized (mapFolder) {
x = mapFolder.get(fid);
}
final boolean shouldClose = (x == null);
final IMAPFolder ffolder = x;
Log.i(Helper.TAG, "run operations folder=" + fid + " offline=" + shouldClose);
executor.submit(new Runnable() {
@Override
public void run() {
EntityFolder folder = DB.getInstance(ServiceSynchronize.this).folder().getFolder(fid);
IMAPFolder ifolder = ffolder;
try {
Log.i(Helper.TAG, folder.name + " start operations");
if (ifolder == null) {
ifolder = (IMAPFolder) fstore.getFolder(folder.name);
ifolder.open(Folder.READ_WRITE);
}
processOperations(folder, fstore, ifolder);
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
// Cascade up
try {
fstore.close();
} catch (MessagingException e1) {
Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1));
}
} finally {
Log.i(Helper.TAG, folder.name + " start operations");
if (shouldClose)
if (ifolder != null && ifolder.isOpen()) {
try {
ifolder.close(false);
} catch (MessagingException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
}
}
Log.i(Helper.TAG, folder.name + " stop operations");
}
}
});
}
};
}); });
// Initiate connection // Initiate connection
@ -392,15 +498,7 @@ public class ServiceSynchronize extends LifecycleService {
Log.i(Helper.TAG, account.name + " stopped"); Log.i(Helper.TAG, account.name + " stopped");
} }
private void monitorFolder(final EntityAccount account, final EntityFolder folder, final IMAPStore istore) throws MessagingException, JSONException, IOException { private void monitorFolder(final EntityAccount account, final EntityFolder folder, final IMAPStore istore, final IMAPFolder ifolder) throws MessagingException, JSONException, IOException {
IMAPFolder ifolder = null;
try {
Log.i(Helper.TAG, folder.name + " start");
ifolder = (IMAPFolder) istore.getFolder(folder.name);
final IMAPFolder ffolder = ifolder;
ifolder.open(Folder.READ_WRITE);
// Listen for new and deleted messages // Listen for new and deleted messages
ifolder.addMessageCountListener(new MessageCountAdapter() { ifolder.addMessageCountListener(new MessageCountAdapter() {
@Override @Override
@ -408,7 +506,7 @@ public class ServiceSynchronize extends LifecycleService {
try { try {
Log.i(Helper.TAG, folder.name + " messages added"); Log.i(Helper.TAG, folder.name + " messages added");
for (Message imessage : e.getMessages()) for (Message imessage : e.getMessages())
synchronizeMessage(folder, ffolder, (IMAPMessage) imessage); synchronizeMessage(folder, ifolder, (IMAPMessage) imessage);
} catch (MessageRemovedException ex) { } catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (Throwable ex) { } catch (Throwable ex) {
@ -430,7 +528,7 @@ public class ServiceSynchronize extends LifecycleService {
Log.i(Helper.TAG, folder.name + " messages removed"); Log.i(Helper.TAG, folder.name + " messages removed");
for (Message imessage : e.getMessages()) for (Message imessage : e.getMessages())
try { try {
long uid = ffolder.getUID(imessage); long uid = ifolder.getUID(imessage);
DB db = DB.getInstance(ServiceSynchronize.this); DB db = DB.getInstance(ServiceSynchronize.this);
db.message().deleteMessage(folder.id, uid); db.message().deleteMessage(folder.id, uid);
Log.i(Helper.TAG, "Deleted uid=" + uid); Log.i(Helper.TAG, "Deleted uid=" + uid);
@ -462,7 +560,7 @@ public class ServiceSynchronize extends LifecycleService {
public void messageChanged(MessageChangedEvent e) { public void messageChanged(MessageChangedEvent e) {
try { try {
Log.i(Helper.TAG, folder.name + " message changed"); Log.i(Helper.TAG, folder.name + " message changed");
synchronizeMessage(folder, ffolder, (IMAPMessage) e.getMessage()); synchronizeMessage(folder, ifolder, (IMAPMessage) e.getMessage());
} catch (MessageRemovedException ex) { } catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (Throwable ex) { } catch (Throwable ex) {
@ -479,40 +577,9 @@ public class ServiceSynchronize extends LifecycleService {
} }
}); });
BroadcastReceiver receiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
Log.i(Helper.TAG, folder.name + " submit process id=" + folder.id);
executor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (folder) {
processOperations(folder, istore, ffolder);
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
// Cascade up
try {
istore.close();
} catch (MessagingException e1) {
Log.w(Helper.TAG, folder.name + " " + e1 + "\n" + Log.getStackTraceString(e1));
}
}
}
});
}
};
// Listen for process operations requests // Listen for process operations requests
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(this); Log.i(Helper.TAG, folder.name + " start");
lbm.registerReceiver(receiver, new IntentFilter(ACTION_PROCESS_OPERATIONS + folder.id));
Log.i(Helper.TAG, folder.name + " listen process id=" + folder.id);
try { try {
lbm.sendBroadcast(new Intent(ACTION_PROCESS_OPERATIONS + folder.id));
// Keep alive // Keep alive
Thread thread = new Thread(new Runnable() { Thread thread = new Thread(new Runnable() {
@Override @Override
@ -525,9 +592,9 @@ public class ServiceSynchronize extends LifecycleService {
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex.toString()); Log.w(Helper.TAG, folder.name + " " + ex.toString());
} }
open = ffolder.isOpen(); open = ifolder.isOpen();
if (open) if (open)
noop(folder, ffolder); noop(folder, ifolder);
} while (open); } while (open);
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex)); Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
@ -551,22 +618,7 @@ public class ServiceSynchronize extends LifecycleService {
Log.i(Helper.TAG, folder.name + " end idle"); Log.i(Helper.TAG, folder.name + " end idle");
} }
} finally { } finally {
lbm.unregisterReceiver(receiver); Log.i(Helper.TAG, folder.name + " end");
Log.i(Helper.TAG, folder.name + " unlisten process id=" + folder.id);
}
} catch (FolderNotFoundException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
folder.synchronize = false;
DB.getInstance(this).folder().updateFolder(folder);
} finally {
if (ifolder != null && ifolder.isOpen()) {
try {
ifolder.close(false);
} catch (MessagingException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
}
}
Log.i(Helper.TAG, folder.name + " stop");
} }
} }
@ -693,6 +745,7 @@ public class ServiceSynchronize extends LifecycleService {
} finally { } finally {
itransport.close(); itransport.close();
} }
// TODO: cache transport?
} else if (EntityOperation.ATTACHMENT.equals(op.name)) { } else if (EntityOperation.ATTACHMENT.equals(op.name)) {
int sequence = jargs.getInt(0); int sequence = jargs.getInt(0);
@ -1026,9 +1079,9 @@ public class ServiceSynchronize extends LifecycleService {
outbox = db.folder().getOutbox(); outbox = db.folder().getOutbox();
if (outbox != null) { if (outbox != null) {
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.registerReceiver(receiverOutbox, new IntentFilter(ACTION_PROCESS_OPERATIONS + outbox.id)); lbm.registerReceiver(receiverOutbox, new IntentFilter(ACTION_PROCESS_OUTBOX));
Log.i(Helper.TAG, outbox.name + " listen process id=" + outbox.id); Log.i(Helper.TAG, outbox.name + " listen operations");
lbm.sendBroadcast(new Intent(ACTION_PROCESS_OPERATIONS + outbox.id)); lbm.sendBroadcast(new Intent(ACTION_PROCESS_OUTBOX));
} }
} }
@ -1052,24 +1105,27 @@ public class ServiceSynchronize extends LifecycleService {
if (outbox != null) { if (outbox != null) {
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this); LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.unregisterReceiver(receiverOutbox); lbm.unregisterReceiver(receiverOutbox);
Log.i(Helper.TAG, outbox.name + " unlisten process id=" + outbox.id); Log.i(Helper.TAG, outbox.name + " unlisten operations");
} }
} }
BroadcastReceiver receiverOutbox = new BroadcastReceiver() { BroadcastReceiver receiverOutbox = new BroadcastReceiver() {
@Override @Override
public void onReceive(Context context, Intent intent) { public void onReceive(Context context, Intent intent) {
Log.i(Helper.TAG, outbox.name + " submit process id=" + outbox.id); Log.i(Helper.TAG, outbox.name + " run operations");
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
Log.i(Helper.TAG, outbox.name + " start operations");
synchronized (outbox) { synchronized (outbox) {
processOperations(outbox, null, null); processOperations(outbox, null, null);
} }
} catch (Throwable ex) { } catch (Throwable ex) {
Log.e(Helper.TAG, outbox.name + " " + ex + "\n" + Log.getStackTraceString(ex)); Log.e(Helper.TAG, outbox.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(null, outbox.name, ex); reportError(null, outbox.name, ex);
} finally {
Log.i(Helper.TAG, outbox.name + " end operations");
} }
} }
}); });

Loading…
Cancel
Save