blocker/reject/merge work

async-blocking-and-merging
Simon Holthausen 1 month ago
parent 5695f5a6d9
commit 3ab8601957

@ -16,7 +16,8 @@ import {
EAGER_EFFECT,
ERROR_VALUE,
MANAGED_EFFECT,
REACTION_RAN
REACTION_RAN,
STALE_REACTION
} from '#client/constants';
import { async_mode_flag } from '../../flags/index.js';
import { deferred, define_property, includes } from '../../shared/utils.js';
@ -139,6 +140,13 @@ export class Batch {
*/
#deferred = null;
/**
* Async derived reject handlers currently associated with this batch.
* Value indicates whether the corresponding async derived is outdated.
* @type {Map<(reason: unknown) => void, boolean>}
*/
async_deriveds = new Map();
/**
* The root effects that need to be flushed
* @type {Effect[]}
@ -162,7 +170,7 @@ export class Batch {
* is committed we skip over these during `process`.
* The value contains child effects that were dirty/maybe_dirty before being reset,
* so they can be rescheduled if the branch survives.
* @type {Map<Effect, { d: Effect[], m: Effect[] }>}
* @type {Map<Effect, { d: Set<Effect>, m: Set<Effect> }>}
*/
#skipped_branches = new Map();
@ -177,8 +185,10 @@ export class Batch {
return this.is_fork || this.#blocking_pending.size > 0;
}
#is_blocked() {
for (const batch of this.blockers) {
#get_blockers() {
const blockers = [];
for (const batch of [...this.blockers].sort((a, b) => b.id - a.id)) {
for (const effect of batch.#blocking_pending.keys()) {
var skipped = false;
var e = effect;
@ -193,12 +203,13 @@ export class Batch {
}
if (!skipped) {
return true;
blockers.push(batch);
break;
}
}
}
return false;
return blockers;
}
/**
@ -207,7 +218,7 @@ export class Batch {
*/
skip_effect(effect) {
if (!this.#skipped_branches.has(effect)) {
this.#skipped_branches.set(effect, { d: [], m: [] });
this.#skipped_branches.set(effect, { d: new Set(), m: new Set() });
}
}
@ -257,6 +268,7 @@ export class Batch {
const roots = this.#roots;
this.#roots = [];
// For #traverse we want to ignore previous values of prior batches, i.e. we want to see the latest values up to this batch
this.apply(false);
/** @type {Effect[]} */
@ -293,16 +305,26 @@ export class Batch {
collected_effects = null;
legacy_updates = null;
if (this.#is_deferred() || this.#is_blocked()) {
const blockers = this.#get_blockers();
const is_deferred = this.#is_deferred();
if (is_deferred || blockers.length > 0) {
this.#defer_effects(render_effects);
this.#defer_effects(effects);
for (const [e, t] of this.#skipped_branches) {
reset_branch(e, t);
}
if (!is_deferred) {
Promise.all(blockers.map((b) => b.settled())).then(() => {
this.flush();
});
}
} else {
// During deferred effect flushing, also account for prior batches'
// previous values. Traversal intentionally sees latest values up to this batch.
// During deferred effect flushing, also account for prior batches' previous values.
// This is necessary because an earlier batch could be independent to this one with
// respects to the sources etc it touches, so the later one can resolve before the earlier one.
this.apply(true);
if (this.#pending.size === 0) {
@ -323,6 +345,10 @@ export class Batch {
previous_batch = null;
this.#deferred?.resolve();
// TODO can a source within a branch contributing to this.#pending (instead of this.#blocking_pending) be the reason for blocking the batch?
for (const batch of batches) {
batch.blockers.delete(this);
}
}
var next_batch = /** @type {Batch | null} */ (/** @type {unknown} */ (current_batch));
@ -346,10 +372,6 @@ export class Batch {
next_batch.#process();
}
if (!batches.has(this)) {
this.#commit();
}
}
/**
@ -477,87 +499,6 @@ export class Batch {
this.#deferred?.resolve();
}
#commit() {
// If there are other pending batches, they now need to be 'rebased' —
// in other words, we re-run block/async effects with the newly
// committed state, unless the batch in question has a more
// recent value for a given source
// return;
for (const batch of batches) {
var is_earlier = batch.id < this.id;
/** @type {Source[]} */
var sources = [];
for (const [source, [value, is_derived]] of this.current) {
if (batch.current.has(source)) {
var batch_value = /** @type {[any, boolean]} */ (batch.current.get(source))[0]; // faster than destructuring
if (is_earlier && value !== batch_value) {
// bring the value up to date
batch.current.set(source, [value, is_derived]);
} else {
// same value or later batch has more recent value,
// no need to re-run these effects
continue;
}
}
sources.push(source);
}
// Re-run async/block effects that depend on distinct values changed in both batches
var others = [...batch.current.keys()].filter((s) => !this.current.has(s));
if (others.length === 0) {
if (is_earlier) {
// this batch is now obsolete and can be discarded
batch.discard();
}
} else if (sources.length > 0) {
if (DEV) {
invariant(batch.#roots.length === 0, 'Batch has scheduled roots');
}
batch.activate();
/** @type {Set<Value>} */
var marked = new Set();
/** @type {Map<Reaction, boolean>} */
var checked = new Map();
for (var source of sources) {
mark_effects(source, others, marked, checked);
}
// Only apply and traverse when we know we triggered async work with marking the effects
if (batch.#roots.length > 0) {
batch.apply(false);
for (var root of batch.#roots) {
batch.#traverse(root, [], []);
}
batch.#roots = [];
}
batch.deactivate();
}
}
for (const batch of batches) {
if (batch.blockers.has(this)) {
batch.blockers.delete(this);
if (batch.blockers.size === 0 && !batch.#is_deferred()) {
batch.activate();
batch.#process();
}
}
}
}
/**
* @param {boolean} blocking
* @param {Effect} effect
@ -596,14 +537,15 @@ export class Batch {
}
}
// We want to flush when skip=true but this was the last pending effect, because
// otherwise we will never flush the batch and it hangs around indefinitely.
if (this.#decrement_queued || (skip && this.#pending.size > 0)) return;
if (this.#decrement_queued || skip) return;
this.#decrement_queued = true;
queue_micro_task(() => {
this.#decrement_queued = false;
this.flush();
// skip=false does not necessarily mean that this wasn't supposed to be a skip
// (various callsites cannot reliably tell without bloating code), therefore
// check if the batch was made obsolete in the meantime before flushing.
if (batches.has(this)) this.flush();
});
}
@ -634,6 +576,120 @@ export class Batch {
this.#discard_callbacks.add(fn);
}
/**
* @param {(reason: unknown) => void} reject
*/
register_async_derived(reject) {
this.async_deriveds.set(reject, false);
}
/**
* Merge this batch's state into a newer superseding batch.
* @param {Batch} target
*/
#merge_into(target) {
if (target === this) return;
for (const [source, info] of this.current) {
if (!target.current.has(source)) {
target.current.set(source, info);
}
}
for (const [source, value] of this.previous) {
target.previous.set(source, value);
}
target.transfer_effects(this.#dirty_effects, this.#maybe_dirty_effects);
for (const fn of this.#commit_callbacks) {
target.#commit_callbacks.add(fn);
}
this.#commit_callbacks.clear();
for (const fn of this.#discard_callbacks) {
target.#discard_callbacks.add(fn);
}
this.#discard_callbacks.clear();
for (const [effect, tracked] of this.#skipped_branches) {
var existing = target.#skipped_branches.get(effect);
if (existing === undefined) {
target.#skipped_branches.set(effect, tracked);
} else {
for (const e of tracked.d) {
existing.d.add(e);
}
for (const e of tracked.m) {
existing.m.add(e);
}
}
}
this.#skipped_branches.clear();
for (const root of this.#roots) {
if (!target.#roots.includes(root)) {
target.#roots.push(root);
}
}
this.#roots = [];
// No need to merge pending/block_pending, these are already at 0 and obsolete else we couldn't merge into the target batch
for (const blocker of this.blockers) {
if (blocker !== target) {
target.blockers.add(blocker);
}
}
this.blockers.clear();
batches.delete(this);
for (const batch of batches) {
if (!batch.blockers.has(this)) continue;
batch.blockers.delete(this);
if (batch !== target) {
batch.blockers.add(target);
}
}
this.#deferred?.resolve();
}
/**
* Marks an async run as outdated. If all async runs are outdated, either merges this batch into a newer one
* that is blocked on this one, or if noone is blocked by this batch we know a later batch is a superset
* so this one is obsolete and we discard it.
* @param {(reason: unknown) => void} reject
*/
mark_async_derived_outdated(reject) {
if (!this.async_deriveds.has(reject)) return;
this.async_deriveds.set(reject, true);
for (const outdated of this.async_deriveds.values()) {
if (!outdated) return;
}
for (const reject of this.async_deriveds.keys()) {
reject(STALE_REACTION);
}
this.async_deriveds.clear();
// Merge target is the first successor that is blocked on this one.
// This ensure working through async work in linear order where they depend on each other.
var merge_target = [...batches].find((batch) => batch.blockers.has(this));
if (merge_target !== undefined) {
this.#merge_into(merge_target);
} else {
this.discard();
}
}
settled() {
return (this.#deferred ??= deferred()).promise;
}
@ -710,17 +766,7 @@ export class Batch {
}
}
}
} /*
apply() needs to happen piece by piece "on the fly" when traversing the graph (is_dirty, get, set etc)
to check if it's blocked by an earlier batch
during traversal async/block effects get the latest value across batches,
and if they read a source that the earlier batch has or a derived which has source of earlier batch in its deps, we know it's blocked
-> also for newly created and eagerly executed (render) effects ($effect.pre executing with new values already possibly before other; problem? feels like we should always eagerly execute $effect.pre)
id -> batch.id -> upsert (check if id still in batches) -> collect(ids) -> if one smaller than current and still exists, it's blocked
if we want to go into a skipped branch it's a sign of intersection too - is it also a sign if the branch is pending (about to be shown when prior batch commits)?
*/
}
/**
*
@ -929,37 +975,6 @@ function flush_queued_effects(effects) {
eager_block_effects = null;
}
/**
* This is similar to `mark_reactions`, but it only marks async/block effects
* depending on `value` and at least one of the other `sources`, so that
* these effects can re-run after another batch has been committed
* @param {Value} value
* @param {Source[]} sources
* @param {Set<Value>} marked
* @param {Map<Reaction, boolean>} checked
*/
function mark_effects(value, sources, marked, checked) {
if (marked.has(value)) return;
marked.add(value);
if (value.reactions !== null) {
for (const reaction of value.reactions) {
const flags = reaction.f;
if ((flags & DERIVED) !== 0) {
mark_effects(/** @type {Derived} */ (reaction), sources, marked, checked);
} else if (
(flags & (ASYNC | BLOCK_EFFECT)) !== 0 &&
(flags & DIRTY) === 0 &&
depends_on(reaction, sources, checked)
) {
set_signal_status(reaction, DIRTY);
schedule_effect(/** @type {Effect} */ (reaction));
}
}
}
}
/**
* When committing a fork, we need to trigger eager effects so that
* any `$state.eager(...)` expressions update immediately. This
@ -982,33 +997,6 @@ function mark_eager_effects(value, effects) {
}
}
/**
* @param {Reaction} reaction
* @param {Source[]} sources
* @param {Map<Reaction, boolean>} checked
*/
function depends_on(reaction, sources, checked) {
const depends = checked.get(reaction);
if (depends !== undefined) return depends;
if (reaction.deps !== null) {
for (const dep of reaction.deps) {
if (includes.call(sources, dep)) {
return true;
}
if ((dep.f & DERIVED) !== 0 && depends_on(/** @type {Derived} */ (dep), sources, checked)) {
checked.set(/** @type {Derived} */ (dep), true);
return true;
}
}
}
checked.set(reaction, false);
return false;
}
/**
* @param {Effect} effect
* @returns {void}
@ -1100,7 +1088,7 @@ export function eager(fn) {
* they can be correctly rescheduled later. Tracks dirty and maybe_dirty
* effects so they can be rescheduled if the branch survives.
* @param {Effect} effect
* @param {{ d: Effect[], m: Effect[] }} tracked
* @param {{ d: Set<Effect>, m: Set<Effect> }} tracked
*/
function reset_branch(effect, tracked) {
// clean branch = nothing dirty inside, no need to traverse further
@ -1109,9 +1097,9 @@ function reset_branch(effect, tracked) {
}
if ((effect.f & DIRTY) !== 0) {
tracked.d.push(effect);
tracked.d.add(effect);
} else if ((effect.f & MAYBE_DIRTY) !== 0) {
tracked.m.push(effect);
tracked.m.add(effect);
}
set_signal_status(effect, CLEAN);

@ -165,18 +165,32 @@ export function async_derived(fn, label, location) {
var decrement_pending = increment_pending();
}
batch.async_deriveds.delete(d.reject);
if (/** @type {Boundary} */ (parent.b).is_rendered()) {
deferreds.get(batch)?.reject(STALE_REACTION);
// Reject own batch directly without calling mark_async_derived_outdated,
// we don't want it to check if it needs to merge into some other batch.
var stale = deferreds.get(batch);
if (stale) {
stale.reject(STALE_REACTION);
batch.async_deriveds.delete(stale.reject);
}
deferreds.delete(batch); // delete to ensure correct order in Map iteration below
} else {
// While the boundary is still showing pending, a new run supersedes all older in-flight runs
// for this async expression. Cancel eagerly so resolution cannot commit stale values.
for (const d of deferreds.values()) {
for (const [b, d] of deferreds) {
d.reject(STALE_REACTION);
if (b === batch) {
batch.async_deriveds.delete(d.reject);
} else {
b.mark_async_derived_outdated(d.reject);
}
}
deferreds.clear();
}
batch.register_async_derived(d.reject);
deferreds.set(batch, d);
}
@ -192,18 +206,17 @@ export function async_derived(fn, label, location) {
if (decrement_pending) {
var skip = error === STALE_REACTION;
debugger;
if (!skip) {
/** @type {Promise<unknown>[]} */
const waits = [];
// All prior async derived runs are now stale,
// but we have to wait for the corresponding batch to resolve before proceeding
// All prior async derived runs are now stale, but we have to
// wait for the corresponding batches to resolve before proceeding
for (const [b, d] of deferreds) {
if (b === batch) break;
deferreds.delete(b);
waits.push(b.settled());
d.reject(STALE_REACTION);
b.mark_async_derived_outdated(d.reject);
}
if (waits.length > 0) {
@ -219,6 +232,7 @@ export function async_derived(fn, label, location) {
deferreds.delete(batch);
if (error === STALE_REACTION || (effect.f & DESTROYED) !== 0) {
batch.mark_async_derived_outdated(d.reject);
return;
}
@ -255,8 +269,9 @@ export function async_derived(fn, label, location) {
});
teardown(() => {
for (const d of deferreds.values()) {
d.reject(STALE_REACTION);
for (const [batch, d] of deferreds) {
batch.mark_async_derived_outdated(d.reject);
d.reject(STALE_REACTION); // reject directly, prevent handler above from succeeding
}
});

@ -18,7 +18,7 @@ export default test({
pop.click();
await tick();
assert.htmlEqual(p.innerHTML, '1 + 3 = 4');
assert.htmlEqual(p.innerHTML, '2 + 3 = 5');
pop.click();
await tick();

@ -2,36 +2,26 @@ import { tick } from 'svelte';
import { test } from '../../test';
export default test({
skip: true, // this fails on main, too; skip for now
async test({ assert, target, logs }) {
const [x, y, resolve] = target.querySelectorAll('button');
x.click();
await tick();
assert.deepEqual(logs, ['universe']);
y.click();
await tick();
assert.deepEqual(logs, ['universe', 'world', '$effect: world']);
assert.htmlEqual(
target.innerHTML,
`
<button>x</button>
<button>y++</button>
<button>resolve</button>
world
`
);
resolve.click();
await tick();
assert.deepEqual(logs, [
'universe',
'world',
'$effect: world',
'$effect: universe',
'$effect: universe'
]);
assert.deepEqual(logs, ['universe', 'universe', '$effect: universe', '$effect: universe']);
assert.htmlEqual(
target.innerHTML,
`

Loading…
Cancel
Save