Update operation partitions

pull/172/head
M66B 5 years ago
parent 70392bdeb1
commit 3bf94520cb

@ -1122,45 +1122,58 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
db.operation().liveOperations(folder.id).observe(cowner, new Observer<List<TupleOperationEx>>() {
private List<Long> handling = new ArrayList<>();
private final Map<TupleOperationEx.PartitionKey, List<TupleOperationEx>> partitions =
new TreeMap<>(new Comparator<TupleOperationEx.PartitionKey>() {
@Override
public int compare(TupleOperationEx.PartitionKey k1, TupleOperationEx.PartitionKey k2) {
Integer p1 = k1.getPriority();
Integer p2 = k2.getPriority();
return p1.compareTo(p2);
}
});
private final PowerManager.WakeLock wlFolder = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":folder." + folder.id);
@Override
public void onChanged(final List<TupleOperationEx> _operations) {
// Get new operations
List<Long> ops = new ArrayList<>();
List<TupleOperationEx> submit = new ArrayList<>();
List<TupleOperationEx> added = new ArrayList<>();
for (TupleOperationEx op : _operations) {
if (!handling.contains(op.id))
submit.add(op);
added.add(op);
ops.add(op.id);
}
handling = ops;
if (submit.size() > 0) {
Log.i(folder.name + " queuing operations=" + submit.size() +
if (added.size() > 0) {
Log.i(folder.name + " queuing operations=" + added.size() +
" init=" + folder.initialize + " poll=" + folder.poll);
// Partition operations by priority
Map<TupleOperationEx.PartitionKey, List<TupleOperationEx>> partitions = new TreeMap<>(new Comparator<TupleOperationEx.PartitionKey>() {
@Override
public int compare(TupleOperationEx.PartitionKey k1, TupleOperationEx.PartitionKey k2) {
Integer p1 = k1.getPriority();
Integer p2 = k2.getPriority();
return p1.compareTo(p2);
}
});
boolean offline = (mapFolders.get(folder) == null);
for (TupleOperationEx op : submit) {
TupleOperationEx.PartitionKey key = op.getPartitionKey(offline);
if (!partitions.containsKey(key))
partitions.put(key, new ArrayList<>());
partitions.get(key).add(op);
List<TupleOperationEx.PartitionKey> keys = new ArrayList<>();
synchronized (partitions) {
for (TupleOperationEx op : added) {
TupleOperationEx.PartitionKey key = op.getPartitionKey(offline);
if (!partitions.containsKey(key)) {
partitions.put(key, new ArrayList<>());
keys.add(key);
}
partitions.get(key).add(op);
}
}
for (TupleOperationEx.PartitionKey key : partitions.keySet()) {
List<TupleOperationEx> partition = partitions.get(key);
Log.i(folder.name + " queuing operations=" + partition.size() + " key=" + key);
for (TupleOperationEx.PartitionKey key : keys) {
synchronized (partitions) {
Log.i(folder.name +
" queuing partition=" + key +
" operations=" + partitions.get(key).size());
}
executor.submit(new Helper.PriorityRunnable(key.getPriority()) {
@Override
@ -1168,7 +1181,16 @@ public class ServiceSynchronize extends ServiceBase implements SharedPreferences
super.run();
try {
wlFolder.acquire();
Log.i(folder.name + " process");
List<TupleOperationEx> partition;
synchronized (partitions) {
partition = partitions.get(key);
partitions.remove(key);
}
Log.i(folder.name +
" executing partition=" + key +
" operations=" + partition.size());
// Get folder
Folder ifolder = mapFolders.get(folder); // null when polling

Loading…
Cancel
Save