fix: run batch until complete (#16971)

at present we only call batch.increment() when something async happens if we're not inside a pending boundary, but that's incorrect — it means that a batch is committed before everything resolves. When work inside a pending boundary does resolve, the batch becomes a zombie.

At the same time, we don't handle effects inside pending boundaries correctly. They should be deferred until the boundary (and all its parents) are ready.

This PR attempts to fix that — during traversal, when we exit a pending boundary, any effects that were collected get deferred until the next flush. We also distinguish between batch.#pending (any ongoing async work) and batch.#blocking_pending (any async work that should prevent effects outside pending boundaries from being flushed).
pull/16992/head
Rich Harris 2 weeks ago committed by GitHub
parent 4b32d6d8b3
commit 7e40186a5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,5 @@
---
'svelte': patch
---
fix: keep batches alive until all async work is complete

@ -1,5 +1,6 @@
/** @import { TemplateNode, Value } from '#client' */ /** @import { TemplateNode, Value } from '#client' */
import { flatten } from '../../reactivity/async.js'; import { flatten } from '../../reactivity/async.js';
import { Batch, current_batch } from '../../reactivity/batch.js';
import { get } from '../../runtime.js'; import { get } from '../../runtime.js';
import { import {
hydrate_next, hydrate_next,
@ -18,8 +19,11 @@ import { get_boundary } from './boundary.js';
*/ */
export function async(node, expressions, fn) { export function async(node, expressions, fn) {
var boundary = get_boundary(); var boundary = get_boundary();
var batch = /** @type {Batch} */ (current_batch);
var blocking = !boundary.is_pending();
boundary.update_pending_count(1); boundary.update_pending_count(1);
batch.increment(blocking);
var was_hydrating = hydrating; var was_hydrating = hydrating;
@ -44,6 +48,7 @@ export function async(node, expressions, fn) {
fn(node, ...values); fn(node, ...values);
} finally { } finally {
boundary.update_pending_count(-1); boundary.update_pending_count(-1);
batch.decrement(blocking);
} }
if (was_hydrating) { if (was_hydrating) {

@ -291,13 +291,6 @@ export class Boundary {
this.#anchor.before(this.#offscreen_fragment); this.#anchor.before(this.#offscreen_fragment);
this.#offscreen_fragment = null; this.#offscreen_fragment = null;
} }
// TODO this feels like a little bit of a kludge, but until we
// overhaul the boundary/batch relationship it's probably
// the most pragmatic solution available to us
queue_micro_task(() => {
Batch.ensure().flush();
});
} }
} }

@ -218,10 +218,10 @@ export function unset_context() {
export async function async_body(anchor, fn) { export async function async_body(anchor, fn) {
var boundary = get_boundary(); var boundary = get_boundary();
var batch = /** @type {Batch} */ (current_batch); var batch = /** @type {Batch} */ (current_batch);
var pending = boundary.is_pending(); var blocking = !boundary.is_pending();
boundary.update_pending_count(1); boundary.update_pending_count(1);
if (!pending) batch.increment(); batch.increment(blocking);
var active = /** @type {Effect} */ (active_effect); var active = /** @type {Effect} */ (active_effect);
@ -254,12 +254,7 @@ export async function async_body(anchor, fn) {
} }
boundary.update_pending_count(-1); boundary.update_pending_count(-1);
batch.decrement(blocking);
if (pending) {
batch.flush();
} else {
batch.decrement();
}
unset_context(); unset_context();
} }

@ -11,7 +11,8 @@ import {
RENDER_EFFECT, RENDER_EFFECT,
ROOT_EFFECT, ROOT_EFFECT,
MAYBE_DIRTY, MAYBE_DIRTY,
DERIVED DERIVED,
BOUNDARY_EFFECT
} from '#client/constants'; } from '#client/constants';
import { async_mode_flag } from '../../flags/index.js'; import { async_mode_flag } from '../../flags/index.js';
import { deferred, define_property } from '../../shared/utils.js'; import { deferred, define_property } from '../../shared/utils.js';
@ -31,6 +32,16 @@ import { invoke_error_boundary } from '../error-handling.js';
import { old_values, source, update } from './sources.js'; import { old_values, source, update } from './sources.js';
import { inspect_effect, unlink_effect } from './effects.js'; import { inspect_effect, unlink_effect } from './effects.js';
/**
* @typedef {{
* parent: EffectTarget | null;
* effect: Effect | null;
* effects: Effect[];
* render_effects: Effect[];
* block_effects: Effect[];
* }} EffectTarget
*/
/** @type {Set<Batch>} */ /** @type {Set<Batch>} */
const batches = new Set(); const batches = new Set();
@ -65,6 +76,8 @@ let is_flushing = false;
export let is_flushing_sync = false; export let is_flushing_sync = false;
export class Batch { export class Batch {
committed = false;
/** /**
* The current values of any sources that are updated in this batch * The current values of any sources that are updated in this batch
* They keys of this map are identical to `this.#previous` * They keys of this map are identical to `this.#previous`
@ -91,6 +104,11 @@ export class Batch {
*/ */
#pending = 0; #pending = 0;
/**
* The number of async effects that are currently in flight, _not_ inside a pending boundary
*/
#blocking_pending = 0;
/** /**
* A deferred that resolves when the batch is committed, used with `settled()` * A deferred that resolves when the batch is committed, used with `settled()`
* TODO replace with Promise.withResolvers once supported widely enough * TODO replace with Promise.withResolvers once supported widely enough
@ -98,26 +116,6 @@ export class Batch {
*/ */
#deferred = null; #deferred = null;
/**
* Template effects and `$effect.pre` effects, which run when
* a batch is committed
* @type {Effect[]}
*/
#render_effects = [];
/**
* The same as `#render_effects`, but for `$effect` (which runs after)
* @type {Effect[]}
*/
#effects = [];
/**
* Block effects, which may need to re-run on subsequent flushes
* in order to update internal sources (e.g. each block items)
* @type {Effect[]}
*/
#block_effects = [];
/** /**
* Deferred effects (which run after async work has completed) that are DIRTY * Deferred effects (which run after async work has completed) that are DIRTY
* @type {Effect[]} * @type {Effect[]}
@ -148,41 +146,37 @@ export class Batch {
this.apply(); this.apply();
/** @type {EffectTarget} */
var target = {
parent: null,
effect: null,
effects: [],
render_effects: [],
block_effects: []
};
for (const root of root_effects) { for (const root of root_effects) {
this.#traverse_effect_tree(root); this.#traverse_effect_tree(root, target);
} }
// if there is no outstanding async work, commit this.#resolve();
if (this.#pending === 0) {
// TODO we need this because we commit _then_ flush effects...
// maybe there's a way we can reverse the order?
var previous_batch_sources = batch_values;
this.#commit();
var render_effects = this.#render_effects; if (this.#blocking_pending > 0) {
var effects = this.#effects; this.#defer_effects(target.effects);
this.#defer_effects(target.render_effects);
this.#render_effects = []; this.#defer_effects(target.block_effects);
this.#effects = []; } else {
this.#block_effects = []; // TODO append/detach blocks here, not in #commit
// If sources are written to, then work needs to happen in a separate batch, else prior sources would be mixed with // If sources are written to, then work needs to happen in a separate batch, else prior sources would be mixed with
// newly updated sources, which could lead to infinite loops when effects run over and over again. // newly updated sources, which could lead to infinite loops when effects run over and over again.
previous_batch = this; previous_batch = this;
current_batch = null; current_batch = null;
batch_values = previous_batch_sources; flush_queued_effects(target.render_effects);
flush_queued_effects(render_effects); flush_queued_effects(target.effects);
flush_queued_effects(effects);
previous_batch = null; previous_batch = null;
this.#deferred?.resolve();
} else {
this.#defer_effects(this.#render_effects);
this.#defer_effects(this.#effects);
this.#defer_effects(this.#block_effects);
} }
batch_values = null; batch_values = null;
@ -192,8 +186,9 @@ export class Batch {
* Traverse the effect tree, executing effects or stashing * Traverse the effect tree, executing effects or stashing
* them for later execution as appropriate * them for later execution as appropriate
* @param {Effect} root * @param {Effect} root
* @param {EffectTarget} target
*/ */
#traverse_effect_tree(root) { #traverse_effect_tree(root, target) {
root.f ^= CLEAN; root.f ^= CLEAN;
var effect = root.first; var effect = root.first;
@ -205,15 +200,25 @@ export class Batch {
var skip = is_skippable_branch || (flags & INERT) !== 0 || this.skipped_effects.has(effect); var skip = is_skippable_branch || (flags & INERT) !== 0 || this.skipped_effects.has(effect);
if ((effect.f & BOUNDARY_EFFECT) !== 0 && effect.b?.is_pending()) {
target = {
parent: target,
effect,
effects: [],
render_effects: [],
block_effects: []
};
}
if (!skip && effect.fn !== null) { if (!skip && effect.fn !== null) {
if (is_branch) { if (is_branch) {
effect.f ^= CLEAN; effect.f ^= CLEAN;
} else if ((flags & EFFECT) !== 0) { } else if ((flags & EFFECT) !== 0) {
this.#effects.push(effect); target.effects.push(effect);
} else if (async_mode_flag && (flags & RENDER_EFFECT) !== 0) { } else if (async_mode_flag && (flags & RENDER_EFFECT) !== 0) {
this.#render_effects.push(effect); target.render_effects.push(effect);
} else if (is_dirty(effect)) { } else if (is_dirty(effect)) {
if ((effect.f & BLOCK_EFFECT) !== 0) this.#block_effects.push(effect); if ((effect.f & BLOCK_EFFECT) !== 0) target.block_effects.push(effect);
update_effect(effect); update_effect(effect);
} }
@ -229,6 +234,17 @@ export class Batch {
effect = effect.next; effect = effect.next;
while (effect === null && parent !== null) { while (effect === null && parent !== null) {
if (parent === target.effect) {
// TODO rather than traversing into pending boundaries and deferring the effects,
// could we just attach the effects _to_ the pending boundary and schedule them
// once the boundary is ready?
this.#defer_effects(target.effects);
this.#defer_effects(target.render_effects);
this.#defer_effects(target.block_effects);
target = /** @type {EffectTarget} */ (target.parent);
}
effect = parent.next; effect = parent.next;
parent = parent.parent; parent = parent.parent;
} }
@ -246,8 +262,6 @@ export class Batch {
// mark as clean so they get scheduled if they depend on pending async state // mark as clean so they get scheduled if they depend on pending async state
set_signal_status(e, CLEAN); set_signal_status(e, CLEAN);
} }
effects.length = 0;
} }
/** /**
@ -283,8 +297,8 @@ export class Batch {
// this can happen if a new batch was created during `flush_effects()` // this can happen if a new batch was created during `flush_effects()`
return; return;
} }
} else if (this.#pending === 0) { } else {
this.#commit(); this.#resolve();
} }
this.deactivate(); this.deactivate();
@ -300,16 +314,19 @@ export class Batch {
} }
} }
/** #resolve() {
* Append and remove branches to/from the DOM if (this.#blocking_pending === 0) {
*/ // append/remove branches
#commit() { for (const fn of this.#callbacks) fn();
for (const fn of this.#callbacks) { this.#callbacks.clear();
fn();
} }
this.#callbacks.clear(); if (this.#pending === 0) {
this.#commit();
}
}
#commit() {
// If there are other pending batches, they now need to be 'rebased' — // If there are other pending batches, they now need to be 'rebased' —
// in other words, we re-run block/async effects with the newly // in other words, we re-run block/async effects with the newly
// committed state, unless the batch in question has a more // committed state, unless the batch in question has a more
@ -317,7 +334,17 @@ export class Batch {
if (batches.size > 1) { if (batches.size > 1) {
this.#previous.clear(); this.#previous.clear();
let is_earlier = true; var previous_batch_values = batch_values;
var is_earlier = true;
/** @type {EffectTarget} */
var dummy_target = {
parent: null,
effect: null,
effects: [],
render_effects: [],
block_effects: []
};
for (const batch of batches) { for (const batch of batches) {
if (batch === this) { if (batch === this) {
@ -359,9 +386,11 @@ export class Batch {
batch.apply(); batch.apply();
for (const root of queued_root_effects) { for (const root of queued_root_effects) {
batch.#traverse_effect_tree(root); batch.#traverse_effect_tree(root, dummy_target);
} }
// TODO do we need to do anything with `target`? defer block effects?
queued_root_effects = []; queued_root_effects = [];
batch.deactivate(); batch.deactivate();
} }
@ -369,17 +398,31 @@ export class Batch {
} }
current_batch = null; current_batch = null;
batch_values = previous_batch_values;
} }
this.committed = true;
batches.delete(this); batches.delete(this);
this.#deferred?.resolve();
} }
increment() { /**
*
* @param {boolean} blocking
*/
increment(blocking) {
this.#pending += 1; this.#pending += 1;
if (blocking) this.#blocking_pending += 1;
} }
decrement() { /**
*
* @param {boolean} blocking
*/
decrement(blocking) {
this.#pending -= 1; this.#pending -= 1;
if (blocking) this.#blocking_pending -= 1;
for (const e of this.#dirty_effects) { for (const e of this.#dirty_effects) {
set_signal_status(e, DIRTY); set_signal_status(e, DIRTY);
@ -391,6 +434,9 @@ export class Batch {
schedule_effect(e); schedule_effect(e);
} }
this.#dirty_effects = [];
this.#maybe_dirty_effects = [];
this.flush(); this.flush();
} }

@ -127,7 +127,17 @@ export function async_derived(fn, location) {
// If this code is changed at some point, make sure to still access the then property // If this code is changed at some point, make sure to still access the then property
// of fn() to read any signals it might access, so that we track them as dependencies. // of fn() to read any signals it might access, so that we track them as dependencies.
// We call `unset_context` to undo any `save` calls that happen inside `fn()` // We call `unset_context` to undo any `save` calls that happen inside `fn()`
Promise.resolve(fn()).then(d.resolve, d.reject).then(unset_context); Promise.resolve(fn())
.then(d.resolve, d.reject)
.then(() => {
if (batch === current_batch && batch.committed) {
// if the batch was rejected as stale, we need to cleanup
// after any `$.save(...)` calls inside `fn()`
batch.deactivate();
}
unset_context();
});
} catch (error) { } catch (error) {
d.reject(error); d.reject(error);
unset_context(); unset_context();
@ -136,18 +146,17 @@ export function async_derived(fn, location) {
if (DEV) current_async_effect = null; if (DEV) current_async_effect = null;
var batch = /** @type {Batch} */ (current_batch); var batch = /** @type {Batch} */ (current_batch);
var pending = boundary.is_pending();
if (should_suspend) { if (should_suspend) {
var blocking = !boundary.is_pending();
boundary.update_pending_count(1); boundary.update_pending_count(1);
if (!pending) { batch.increment(blocking);
batch.increment();
deferreds.get(batch)?.reject(STALE_REACTION); deferreds.get(batch)?.reject(STALE_REACTION);
deferreds.delete(batch); // delete to ensure correct order in Map iteration below deferreds.delete(batch); // delete to ensure correct order in Map iteration below
deferreds.set(batch, d); deferreds.set(batch, d);
} }
}
/** /**
* @param {any} value * @param {any} value
@ -156,7 +165,7 @@ export function async_derived(fn, location) {
const handler = (value, error = undefined) => { const handler = (value, error = undefined) => {
current_async_effect = null; current_async_effect = null;
if (!pending) batch.activate(); batch.activate();
if (error) { if (error) {
if (error !== STALE_REACTION) { if (error !== STALE_REACTION) {
@ -193,7 +202,7 @@ export function async_derived(fn, location) {
if (should_suspend) { if (should_suspend) {
boundary.update_pending_count(-1); boundary.update_pending_count(-1);
if (!pending) batch.decrement(); batch.decrement(blocking);
} }
}; };

@ -6,7 +6,7 @@ export default test({
const [reset, resolve] = target.querySelectorAll('button'); const [reset, resolve] = target.querySelectorAll('button');
reset.click(); reset.click();
await settled(); await tick();
assert.deepEqual(logs, ['aborted']); assert.deepEqual(logs, ['aborted']);
resolve.click(); resolve.click();

@ -12,7 +12,7 @@
</script> </script>
<button onclick={() => { <button onclick={() => {
input.focus(); input?.focus();
resolvers.shift()?.(); resolvers.shift()?.();
}}>shift</button> }}>shift</button>

@ -12,7 +12,7 @@
</script> </script>
<button onclick={() => { <button onclick={() => {
select.focus(); select?.focus();
resolvers.shift()?.(); resolvers.shift()?.();
}}>shift</button> }}>shift</button>

@ -24,5 +24,7 @@ export default test({
<p>1</p> <p>1</p>
` `
); );
} },
expect_unhandled_rejections: true
}); });

@ -0,0 +1,63 @@
import { tick } from 'svelte';
import { test } from '../../test';
export default test({
async test({ assert, target }) {
const [increment, shift] = target.querySelectorAll('button');
shift.click();
await tick();
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
`
<button>0</button>
<button>shift</button>
<p>even</p>
<p>0</p>
`
);
increment.click();
await tick();
assert.htmlEqual(
target.innerHTML,
`
<button>1</button>
<button>shift</button>
<p>even</p>
<p>0</p>
`
);
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
`
<button>1</button>
<button>shift</button>
<p>odd</p>
<p>loading...</p>
`
);
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
`
<button>1</button>
<button>shift</button>
<p>odd</p>
<p>1</p>
`
);
}
});

@ -0,0 +1,36 @@
<script>
let resolvers = [];
function push(value) {
const { promise, resolve } = Promise.withResolvers();
resolvers.push(() => resolve(value));
return promise;
}
let count = $state(0);
</script>
<button onclick={() => count += 1}>{$state.eager(count)}</button>
<button onclick={() => resolvers.shift()?.()}>shift</button>
<svelte:boundary>
{#if await push(count) % 2 === 0}
<p>even</p>
{:else}
<p>odd</p>
{/if}
{#key count}
<svelte:boundary>
<p>{await push(count)}</p>
{#snippet pending()}
<p>loading...</p>
{/snippet}
</svelte:boundary>
{/key}
{#snippet pending()}
<p>loading...</p>
{/snippet}
</svelte:boundary>

@ -1,7 +1,11 @@
<script> <script>
$effect(() => {
console.log('before');
});
await 1; await 1;
$effect(() => { $effect(() => {
console.log('hello'); console.log('after');
}); });
</script> </script>

@ -3,7 +3,8 @@ import { test } from '../../test';
export default test({ export default test({
async test({ assert, logs }) { async test({ assert, logs }) {
assert.deepEqual(logs, []);
await tick(); await tick();
assert.deepEqual(logs, ['hello']); assert.deepEqual(logs, ['before', 'after']);
} }
}); });

@ -0,0 +1,5 @@
<script>
$effect(() => {
console.log('in effect')
});
</script>

@ -0,0 +1,16 @@
import { tick } from 'svelte';
import { test } from '../../test';
export default test({
async test({ assert, target, logs }) {
const [shift] = target.querySelectorAll('button');
await tick();
assert.deepEqual(logs, []);
shift.click();
await tick();
assert.deepEqual(logs, ['in effect']);
}
});

@ -0,0 +1,22 @@
<script>
import Child from './Child.svelte';
let resolvers = [];
function push(value) {
const { promise, resolve } = Promise.withResolvers();
resolvers.push(() => resolve(value));
return promise;
}
</script>
<button onclick={() => resolvers.shift()?.()}>shift</button>
<svelte:boundary>
<p>{await push('hello')}</p>
<Child />
{#snippet pending()}
<p>loading...</p>
{/snippet}
</svelte:boundary>

@ -3,23 +3,28 @@ import { test } from '../../test';
export default test({ export default test({
async test({ assert, target }) { async test({ assert, target }) {
// We gotta wait a bit more in this test because of the macrotasks in App.svelte // We gotta wait a bit more in this test because of the macrotasks in App.svelte
function macrotask(t = 3) { function sleep(t = 50) {
return new Promise((r) => setTimeout(r, t)); return new Promise((r) => setTimeout(r, t));
} }
await macrotask(); await sleep();
assert.htmlEqual(target.innerHTML, '<input> 1 | '); assert.htmlEqual(target.innerHTML, '<input> 1 | ');
const [input] = target.querySelectorAll('input'); const [input] = target.querySelectorAll('input');
input.value = '1'; input.value = '1';
input.dispatchEvent(new Event('input', { bubbles: true })); input.dispatchEvent(new Event('input', { bubbles: true }));
await macrotask(); await sleep();
assert.htmlEqual(target.innerHTML, '<input> 1 | '); assert.htmlEqual(target.innerHTML, '<input> 1 | ');
input.value = '12'; input.value = '12';
input.dispatchEvent(new Event('input', { bubbles: true })); input.dispatchEvent(new Event('input', { bubbles: true }));
await macrotask(6); await sleep();
assert.htmlEqual(target.innerHTML, '<input> 3 | 12'); assert.htmlEqual(target.innerHTML, '<input> 3 | 12');
input.value = '';
input.dispatchEvent(new Event('input', { bubbles: true }));
await sleep();
assert.htmlEqual(target.innerHTML, '<input> 4 | ');
} }
}); });

@ -1,26 +1,31 @@
<script> <script>
let count = $state(0); let count = $state(0);
let value = $state(''); let value = $state('');
let prev;
let resolver;
function asd(v) { function asd(v) {
const r = Promise.withResolvers(); let r = Promise.withResolvers();
if (prev || v === '') { function update_and_resolve() {
Promise.resolve().then(async () => {
count++; count++;
r.resolve(v); r.resolve(v);
await new Promise(r => setTimeout(r, 0)); }
// TODO with a microtask like below it still throws a mutation error
// await Promise.resolve(); // make sure the second promise resolve before the first one
prev?.resolve(); if (resolver){
new Promise(r => {
setTimeout(r);
}).then(update_and_resolve).then(() => {
setTimeout(() => {
resolver();
resolver = null;
}); });
} else {
prev = Promise.withResolvers();
prev.promise.then(() => {
count++;
r.resolve(v)
}); });
} else if (v) {
resolver = update_and_resolve;
} else {
Promise.resolve().then(update_and_resolve);
} }
return r.promise; return r.promise;

Loading…
Cancel
Save