fix: merge batches (#16866)

whenever a batch is committed, we essentially 'rebase' other pending batches on top of the newly applied state
pull/16848/head
Rich Harris 3 days ago committed by GitHub
parent 87f7e97963
commit 25cbdc8cb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,5 @@
---
'svelte': patch
---
fix: rebase pending batches when other batches are committed

@ -1,4 +1,4 @@
/** @import { Derived, Effect, Source } from '#client' */
/** @import { Derived, Effect, Source, Value } from '#client' */
import {
BLOCK_EFFECT,
BRANCH_EFFECT,
@ -10,10 +10,11 @@ import {
INERT,
RENDER_EFFECT,
ROOT_EFFECT,
MAYBE_DIRTY
MAYBE_DIRTY,
DERIVED
} from '#client/constants';
import { async_mode_flag } from '../../flags/index.js';
import { deferred, define_property } from '../../shared/utils.js';
import { deferred, define_property, noop } from '../../shared/utils.js';
import {
active_effect,
is_dirty,
@ -97,22 +98,8 @@ export class Batch {
#deferred = null;
/**
* True if an async effect inside this batch resolved and
* its parent branch was already deleted
*/
#neutered = false;
/**
* Async effects (created inside `async_derived`) encountered during processing.
* These run after the rest of the batch has updated, since they should
* always have the latest values
* @type {Effect[]}
*/
#async_effects = [];
/**
* The same as `#async_effects`, but for effects inside a newly-created
* `<svelte:boundary>` these do not prevent the batch from committing
* Async effects inside a newly-created `<svelte:boundary>`
* these do not prevent the batch from committing
* @type {Effect[]}
*/
#boundary_async_effects = [];
@ -165,32 +152,7 @@ export class Batch {
previous_batch = null;
/** @type {Map<Source, { v: unknown, wv: number }> | null} */
var current_values = null;
// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one
if (async_mode_flag && batches.size > 1) {
current_values = new Map();
batch_deriveds = new Map();
for (const [source, current] of this.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}
for (const batch of batches) {
if (batch === this) continue;
for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}
}
var revert = Batch.apply(this);
for (const root of root_effects) {
this.#traverse_effect_tree(root);
@ -198,7 +160,7 @@ export class Batch {
// if we didn't start any new async work, and no async work
// is outstanding from a previous flush, commit
if (this.#async_effects.length === 0 && this.#pending === 0) {
if (this.#pending === 0) {
this.#commit();
var render_effects = this.#render_effects;
@ -210,7 +172,7 @@ export class Batch {
// If sources are written to, then work needs to happen in a separate batch, else prior sources would be mixed with
// newly updated sources, which could lead to infinite loops when effects run over and over again.
previous_batch = current_batch;
previous_batch = this;
current_batch = null;
flush_queued_effects(render_effects);
@ -223,27 +185,12 @@ export class Batch {
this.#defer_effects(this.#block_effects);
}
if (current_values) {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}
batch_deriveds = null;
}
for (const effect of this.#async_effects) {
update_effect(effect);
}
revert();
for (const effect of this.#boundary_async_effects) {
update_effect(effect);
}
this.#async_effects = [];
this.#boundary_async_effects = [];
}
@ -272,12 +219,8 @@ export class Batch {
} else if (async_mode_flag && (flags & RENDER_EFFECT) !== 0) {
this.#render_effects.push(effect);
} else if ((flags & CLEAN) === 0) {
if ((flags & ASYNC) !== 0) {
var effects = effect.b?.is_pending()
? this.#boundary_async_effects
: this.#async_effects;
effects.push(effect);
if ((flags & ASYNC) !== 0 && effect.b?.is_pending()) {
this.#boundary_async_effects.push(effect);
} else if (is_dirty(effect)) {
if ((effect.f & BLOCK_EFFECT) !== 0) this.#block_effects.push(effect);
update_effect(effect);
@ -350,10 +293,6 @@ export class Batch {
}
}
neuter() {
this.#neutered = true;
}
flush() {
if (queued_root_effects.length > 0) {
this.activate();
@ -374,13 +313,58 @@ export class Batch {
* Append and remove branches to/from the DOM
*/
#commit() {
if (!this.#neutered) {
for (const fn of this.#callbacks) {
fn();
}
for (const fn of this.#callbacks) {
fn();
}
this.#callbacks.clear();
// If there are other pending batches, they now need to be 'rebased' —
// in other words, we re-run block/async effects with the newly
// committed state, unless the batch in question has a more
// recent value for a given source
if (batches.size > 1) {
this.#previous.clear();
let is_earlier = true;
for (const batch of batches) {
if (batch === this) {
is_earlier = false;
continue;
}
for (const [source, value] of this.current) {
if (batch.current.has(source)) {
if (is_earlier) {
// bring the value up to date
batch.current.set(source, value);
} else {
// later batch has more recent value,
// no need to re-run these effects
continue;
}
}
mark_effects(source);
}
if (queued_root_effects.length > 0) {
current_batch = batch;
const revert = Batch.apply(batch);
for (const root of queued_root_effects) {
batch.#traverse_effect_tree(root);
}
queued_root_effects = [];
revert();
}
}
current_batch = null;
}
batches.delete(this);
}
@ -402,9 +386,6 @@ export class Batch {
schedule_effect(e);
}
this.#render_effects = [];
this.#effects = [];
this.flush();
} else {
this.deactivate();
@ -444,6 +425,51 @@ export class Batch {
static enqueue(task) {
queue_micro_task(task);
}
/**
* @param {Batch} current_batch
*/
static apply(current_batch) {
if (!async_mode_flag || batches.size === 1) {
return noop;
}
// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one
/** @type {Map<Source, { v: unknown, wv: number }>} */
var current_values = new Map();
batch_deriveds = new Map();
for (const [source, current] of current_batch.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}
for (const batch of batches) {
if (batch === current_batch) continue;
for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}
return () => {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}
batch_deriveds = null;
};
}
}
/**
@ -615,6 +641,26 @@ function flush_queued_effects(effects) {
eager_block_effects = null;
}
/**
* This is similar to `mark_reactions`, but it only marks async/block effects
* so that these can re-run after another batch has been committed
* @param {Value} value
*/
function mark_effects(value) {
if (value.reactions !== null) {
for (const reaction of value.reactions) {
const flags = reaction.f;
if ((flags & DERIVED) !== 0) {
mark_effects(/** @type {Derived} */ (reaction));
} else if ((flags & (ASYNC | BLOCK_EFFECT)) !== 0) {
set_signal_status(reaction, DIRTY);
schedule_effect(/** @type {Effect} */ (reaction));
}
}
}
}
/**
* @param {Effect} signal
* @returns {void}

@ -26,7 +26,7 @@ import {
import { equals, safe_equals } from './equality.js';
import * as e from '../errors.js';
import * as w from '../warnings.js';
import { async_effect, destroy_effect } from './effects.js';
import { async_effect, destroy_effect, teardown } from './effects.js';
import { inspect_effects, internal_set, set_inspect_effects, source } from './sources.js';
import { get_stack } from '../dev/tracing.js';
import { async_mode_flag, tracing_mode_flag } from '../../flags/index.js';
@ -35,6 +35,7 @@ import { component_context } from '../context.js';
import { UNINITIALIZED } from '../../../constants.js';
import { batch_deriveds, current_batch } from './batch.js';
import { unset_context } from './async.js';
import { deferred } from '../../shared/utils.js';
/** @type {Effect | null} */
export let current_async_effect = null;
@ -109,37 +110,40 @@ export function async_derived(fn, location) {
var promise = /** @type {Promise<V>} */ (/** @type {unknown} */ (undefined));
var signal = source(/** @type {V} */ (UNINITIALIZED));
/** @type {Promise<V> | null} */
var prev = null;
// only suspend in async deriveds created on initialisation
var should_suspend = !active_reaction;
/** @type {Map<Batch, ReturnType<typeof deferred<V>>>} */
var deferreds = new Map();
async_effect(() => {
if (DEV) current_async_effect = active_effect;
/** @type {ReturnType<typeof deferred<V>>} */
var d = deferred();
promise = d.promise;
try {
var p = fn();
// Make sure to always access the then property to read any signals
// it might access, so that we track them as dependencies.
if (prev) Promise.resolve(p).catch(() => {}); // avoid unhandled rejection
// If this code is changed at some point, make sure to still access the then property
// of fn() to read any signals it might access, so that we track them as dependencies.
Promise.resolve(fn()).then(d.resolve, d.reject);
} catch (error) {
p = Promise.reject(error);
d.reject(error);
}
if (DEV) current_async_effect = null;
var r = () => p;
promise = prev?.then(r, r) ?? Promise.resolve(p);
prev = promise;
var batch = /** @type {Batch} */ (current_batch);
var pending = boundary.is_pending();
if (should_suspend) {
boundary.update_pending_count(1);
if (!pending) batch.increment();
if (!pending) {
batch.increment();
deferreds.get(batch)?.reject(STALE_REACTION);
deferreds.set(batch, d);
}
}
/**
@ -147,8 +151,6 @@ export function async_derived(fn, location) {
* @param {unknown} error
*/
const handler = (value, error = undefined) => {
prev = null;
current_async_effect = null;
if (!pending) batch.activate();
@ -187,12 +189,12 @@ export function async_derived(fn, location) {
unset_context();
};
promise.then(handler, (e) => handler(null, e || 'unknown'));
d.promise.then(handler, (e) => handler(null, e || 'unknown'));
});
if (batch) {
return () => {
queueMicrotask(() => batch.neuter());
};
teardown(() => {
for (const d of deferreds.values()) {
d.reject(STALE_REACTION);
}
});

@ -14,17 +14,6 @@ export default test({
const [reset, a, b, increment] = target.querySelectorAll('button');
a.click();
// TODO why is this necessary? why isn't `await tick()` enough?
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
flushSync();
await tick();
assert.htmlEqual(
target.innerHTML,

@ -2,30 +2,46 @@ import { tick } from 'svelte';
import { test } from '../../test';
export default test({
html: `<button>reset</button><button>true</button><button>false</button><p>pending</p>`,
html: `<button>shift</button><button>true</button><button>false</button><p>pending</p>`,
async test({ assert, target }) {
const [reset, t, f] = target.querySelectorAll('button');
const [shift, t, f] = target.querySelectorAll('button');
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
'<button>shift</button><button>true</button><button>false</button><h1>yes</h1>'
);
f.click();
await tick();
t.click();
await tick();
f.click();
await tick();
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
'<button>reset</button><button>true</button><button>false</button><h1>yes</h1>'
'<button>shift</button><button>true</button><button>false</button><h1>no</h1>'
);
reset.click();
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
'<button>reset</button><button>true</button><button>false</button><h1>yes</h1>'
'<button>shift</button><button>true</button><button>false</button><h1>yes</h1>'
);
f.click();
shift.click();
await tick();
assert.htmlEqual(
target.innerHTML,
'<button>reset</button><button>true</button><button>false</button><h1>no</h1>'
'<button>shift</button><button>true</button><button>false</button><h1>no</h1>'
);
}
});

@ -1,13 +1,24 @@
<script>
let deferred = $state(Promise.withResolvers());
let condition = $state(true);
let deferreds = [];
function push(value) {
const deferred = Promise.withResolvers();
deferreds.push({ deferred, value });
return deferred.promise;
}
</script>
<button onclick={() => deferred = Promise.withResolvers()}>reset</button>
<button onclick={() => deferred.resolve(true)}>true</button>
<button onclick={() => deferred.resolve(false)}>false</button>
<button onclick={() => {
const d = deferreds.shift();
d?.deferred.resolve(d.value);
}}>shift</button>
<button onclick={() => condition = true}>true</button>
<button onclick={() => condition = false}>false</button>
<svelte:boundary>
{#if await deferred.promise}
{#if await push(condition)}
<h1>yes</h1>
{:else}
<h1>no</h1>

@ -3,26 +3,24 @@ import { test } from '../../test';
export default test({
async test({ assert, target }) {
const [a, b, reset1, reset2, resolve1, resolve2] = target.querySelectorAll('button');
const [a, b, shift, pop] = target.querySelectorAll('button');
resolve1.click();
shift.click();
await tick();
const p = /** @type {HTMLElement} */ (target.querySelector('#test'));
assert.htmlEqual(p.innerHTML, '1 + 2 = 3');
flushSync(() => reset1.click());
flushSync(() => a.click());
flushSync(() => reset2.click());
flushSync(() => b.click());
resolve2.click();
pop.click();
await tick();
assert.htmlEqual(p.innerHTML, '1 + 2 = 3');
assert.htmlEqual(p.innerHTML, '1 + 3 = 4');
resolve1.click();
pop.click();
await tick();
assert.htmlEqual(p.innerHTML, '2 + 3 = 5');

@ -1,14 +1,14 @@
<script>
let delay = 1000;
let deferreds = [];
let a = $state(1);
let b = $state(2);
let d1 = Promise.withResolvers();
let d2 = Promise.withResolvers();
let deferred = d1;
async function push(a, b) {
var d = Promise.withResolvers();
deferreds.push(d);
await d.promise;
async function add(a, b) {
await deferred.promise;
return a + b;
}
</script>
@ -16,14 +16,11 @@
<button onclick={() => a++}>a++</button>
<button onclick={() => b++}>b++</button>
<button onclick={() => deferred = d1 = Promise.withResolvers()}>reset 1</button>
<button onclick={() => deferred = d2 = Promise.withResolvers()}>reset 2</button>
<button onclick={() => d1.resolve()}>resolve 1</button>
<button onclick={() => d2.resolve()}>resolve 2</button>
<button onclick={() => deferreds.shift()?.resolve()}>shift</button>
<button onclick={() => deferreds.pop()?.resolve()}>pop</button>
<svelte:boundary>
<p id="test">{a} + {b} = {await add(a, b)}</p>
<p id="test">{a} + {b} = {await push(a, b)}</p>
{#snippet pending()}
<p>loading...</p>

Loading…
Cancel
Save