diff --git a/packages/svelte/src/internal/client/reactivity/batch.js b/packages/svelte/src/internal/client/reactivity/batch.js index 3b10d6ebe6..b7b994176b 100644 --- a/packages/svelte/src/internal/client/reactivity/batch.js +++ b/packages/svelte/src/internal/client/reactivity/batch.js @@ -171,14 +171,14 @@ export class Batch { #decrement_queued = false; /** @type {Set} */ - #blockers = new Set(); + blockers = new Set(); #is_deferred() { return this.is_fork || this.#blocking_pending.size > 0; } #is_blocked() { - for (const batch of this.#blockers) { + for (const batch of this.blockers) { for (const effect of batch.#blocking_pending.keys()) { var skipped = false; var e = effect; @@ -257,7 +257,7 @@ export class Batch { const roots = this.#roots; this.#roots = []; - this.apply(); + this.apply(false); /** @type {Effect[]} */ var effects = (collected_effects = []); @@ -301,6 +301,10 @@ export class Batch { reset_branch(e, t); } } else { + // During deferred effect flushing, also account for prior batches' + // previous values. Traversal intentionally sees latest values up to this batch. + this.apply(true); + if (this.#pending.size === 0) { batches.delete(this); } @@ -470,6 +474,7 @@ export class Batch { this.#discard_callbacks.clear(); batches.delete(this); + this.#deferred?.resolve(); } #commit() { @@ -477,6 +482,7 @@ export class Batch { // in other words, we re-run block/async effects with the newly // committed state, unless the batch in question has a more // recent value for a given source + // return; for (const batch of batches) { var is_earlier = batch.id < this.id; @@ -527,7 +533,7 @@ export class Batch { // Only apply and traverse when we know we triggered async work with marking the effects if (batch.#roots.length > 0) { - batch.apply(); + batch.apply(false); for (var root of batch.#roots) { batch.#traverse(root, [], []); @@ -541,10 +547,10 @@ export class Batch { } for (const batch of batches) { - if (batch.#blockers.has(this)) { - batch.#blockers.delete(this); + if (batch.blockers.has(this)) { + batch.blockers.delete(this); - if (batch.#blockers.size === 0 && !batch.#is_deferred()) { + if (batch.blockers.size === 0 && !batch.#is_deferred()) { batch.activate(); batch.#process(); } @@ -590,7 +596,9 @@ export class Batch { } } - if (this.#decrement_queued || skip) return; + // We want to flush when skip=true but this was the last pending effect, because + // otherwise we will never flush the batch and it hangs around indefinitely. + if (this.#decrement_queued || (skip && this.#pending.size > 0)) return; this.#decrement_queued = true; queue_micro_task(() => { @@ -653,7 +661,10 @@ export class Batch { return current_batch; } - apply() { + /** + * @param {boolean} include_prior_previous + */ + apply(include_prior_previous) { if (!async_mode_flag || (!this.is_fork && batches.size === 1)) { batch_values = null; return; @@ -686,8 +697,12 @@ export class Batch { } if (intersects && differs) { - this.#blockers.add(batch); + this.blockers.add(batch); } else { + if (batch.id < this.id && !include_prior_previous) { + continue; + } + for (const [source, previous] of batch.previous) { if (!batch_values.has(source)) { batch_values.set(source, previous); @@ -695,7 +710,17 @@ export class Batch { } } } - } + } /* + apply() needs to happen piece by piece "on the fly" when traversing the graph (is_dirty, get, set etc) + to check if it's blocked by an earlier batch + during traversal async/block effects get the latest value across batches, + and if they read a source that the earlier batch has or a derived which has source of earlier batch in its deps, we know it's blocked + -> also for newly created and eagerly executed (render) effects ($effect.pre executing with new values already possibly before other; problem? feels like we should always eagerly execute $effect.pre) + + id -> batch.id -> upsert (check if id still in batches) -> collect(ids) -> if one smaller than current and still exists, it's blocked + + if we want to go into a skipped branch it's a sign of intersection too - is it also a sign if the branch is pending (about to be shown when prior batch commits)? + */ /** * @@ -992,6 +1017,25 @@ export function schedule_effect(effect) { /** @type {Batch} */ (current_batch).schedule(effect); } +/** + * If an effect/derived is running for the first time and reads a signal that + * belongs to an earlier batch, this batch must wait for that earlier batch. + * @param {Value} signal + */ +export function mark_current_batch_blocked_by_prior_signal(signal) { + if (current_batch === null) return; + + for (const batch of batches) { + if (batch === current_batch || batch.is_fork || batch.id >= current_batch.id) { + continue; + } + + if (batch.current.has(signal)) { + current_batch.blockers.add(batch); + } + } +} + /** @type {Source[]} */ let eager_versions = []; diff --git a/packages/svelte/src/internal/client/reactivity/deriveds.js b/packages/svelte/src/internal/client/reactivity/deriveds.js index 5da0df0670..6a833d358e 100644 --- a/packages/svelte/src/internal/client/reactivity/deriveds.js +++ b/packages/svelte/src/internal/client/reactivity/deriveds.js @@ -184,18 +184,40 @@ export function async_derived(fn, label, location) { * @param {any} value * @param {unknown} error */ - const handler = (value, error = undefined) => { + const handler = async (value, error = undefined) => { if (DEV) { reactivity_loss_tracker = null; } if (decrement_pending) { + var skip = error === STALE_REACTION; + + debugger; + if (!skip) { + /** @type {Promise[]} */ + const waits = []; + + // All prior async derived runs are now stale, + // but we have to wait for the corresponding batch to resolve before proceeding + for (const [b, d] of deferreds) { + if (b === batch) break; + deferreds.delete(b); + waits.push(b.settled()); + d.reject(STALE_REACTION); + } + + if (waits.length > 0) { + await Promise.all(waits); + } + } + // don't trigger an update if we're only here because // the promise was superseded before it could resolve - var skip = error === STALE_REACTION; decrement_pending(skip); } + deferreds.delete(batch); + if (error === STALE_REACTION || (effect.f & DESTROYED) !== 0) { return; } @@ -214,13 +236,6 @@ export function async_derived(fn, label, location) { internal_set(signal, value); - // All prior async derived runs are now stale - for (const [b, d] of deferreds) { - deferreds.delete(b); - if (b === batch) break; - d.reject(STALE_REACTION); - } - if (DEV && location !== undefined) { recent_async_deriveds.add(signal); diff --git a/packages/svelte/src/internal/client/runtime.js b/packages/svelte/src/internal/client/runtime.js index 906d68fbf0..4a65a8e361 100644 --- a/packages/svelte/src/internal/client/runtime.js +++ b/packages/svelte/src/internal/client/runtime.js @@ -51,6 +51,7 @@ import { batch_values, current_batch, flushSync, + mark_current_batch_blocked_by_prior_signal, schedule_effect } from './reactivity/batch.js'; import { handle_error } from './error-handling.js'; @@ -526,6 +527,17 @@ export function get(signal) { captured_signals?.add(signal); + // A first-run reaction that touches a signal from an earlier batch + // introduces a cross-batch dependency and must wait on that batch. + if ( + async_mode_flag && + current_batch !== null && + active_reaction !== null && + (active_reaction.f & REACTION_RAN) === 0 + ) { + mark_current_batch_blocked_by_prior_signal(signal); + } + // Register the dependency on the current reaction signal. if (active_reaction !== null && !untracking) { // if we're in a derived that is being read inside an _async_ derived,