run batch.traverse without `previous` of prior batches, only render effects, and block on overlaps in new branches

async-blocking-and-merging
Simon Holthausen 11 hours ago
parent 0adc22c9ae
commit 5695f5a6d9

@ -171,14 +171,14 @@ export class Batch {
#decrement_queued = false;
/** @type {Set<Batch>} */
#blockers = new Set();
blockers = new Set();
#is_deferred() {
return this.is_fork || this.#blocking_pending.size > 0;
}
#is_blocked() {
for (const batch of this.#blockers) {
for (const batch of this.blockers) {
for (const effect of batch.#blocking_pending.keys()) {
var skipped = false;
var e = effect;
@ -257,7 +257,7 @@ export class Batch {
const roots = this.#roots;
this.#roots = [];
this.apply();
this.apply(false);
/** @type {Effect[]} */
var effects = (collected_effects = []);
@ -301,6 +301,10 @@ export class Batch {
reset_branch(e, t);
}
} else {
// During deferred effect flushing, also account for prior batches'
// previous values. Traversal intentionally sees latest values up to this batch.
this.apply(true);
if (this.#pending.size === 0) {
batches.delete(this);
}
@ -470,6 +474,7 @@ export class Batch {
this.#discard_callbacks.clear();
batches.delete(this);
this.#deferred?.resolve();
}
#commit() {
@ -477,6 +482,7 @@ export class Batch {
// in other words, we re-run block/async effects with the newly
// committed state, unless the batch in question has a more
// recent value for a given source
// return;
for (const batch of batches) {
var is_earlier = batch.id < this.id;
@ -527,7 +533,7 @@ export class Batch {
// Only apply and traverse when we know we triggered async work with marking the effects
if (batch.#roots.length > 0) {
batch.apply();
batch.apply(false);
for (var root of batch.#roots) {
batch.#traverse(root, [], []);
@ -541,10 +547,10 @@ export class Batch {
}
for (const batch of batches) {
if (batch.#blockers.has(this)) {
batch.#blockers.delete(this);
if (batch.blockers.has(this)) {
batch.blockers.delete(this);
if (batch.#blockers.size === 0 && !batch.#is_deferred()) {
if (batch.blockers.size === 0 && !batch.#is_deferred()) {
batch.activate();
batch.#process();
}
@ -590,7 +596,9 @@ export class Batch {
}
}
if (this.#decrement_queued || skip) return;
// We want to flush when skip=true but this was the last pending effect, because
// otherwise we will never flush the batch and it hangs around indefinitely.
if (this.#decrement_queued || (skip && this.#pending.size > 0)) return;
this.#decrement_queued = true;
queue_micro_task(() => {
@ -653,7 +661,10 @@ export class Batch {
return current_batch;
}
apply() {
/**
* @param {boolean} include_prior_previous
*/
apply(include_prior_previous) {
if (!async_mode_flag || (!this.is_fork && batches.size === 1)) {
batch_values = null;
return;
@ -686,8 +697,12 @@ export class Batch {
}
if (intersects && differs) {
this.#blockers.add(batch);
this.blockers.add(batch);
} else {
if (batch.id < this.id && !include_prior_previous) {
continue;
}
for (const [source, previous] of batch.previous) {
if (!batch_values.has(source)) {
batch_values.set(source, previous);
@ -695,7 +710,17 @@ export class Batch {
}
}
}
}
} /*
apply() needs to happen piece by piece "on the fly" when traversing the graph (is_dirty, get, set etc)
to check if it's blocked by an earlier batch
during traversal async/block effects get the latest value across batches,
and if they read a source that the earlier batch has or a derived which has source of earlier batch in its deps, we know it's blocked
-> also for newly created and eagerly executed (render) effects ($effect.pre executing with new values already possibly before other; problem? feels like we should always eagerly execute $effect.pre)
id -> batch.id -> upsert (check if id still in batches) -> collect(ids) -> if one smaller than current and still exists, it's blocked
if we want to go into a skipped branch it's a sign of intersection too - is it also a sign if the branch is pending (about to be shown when prior batch commits)?
*/
/**
*
@ -992,6 +1017,25 @@ export function schedule_effect(effect) {
/** @type {Batch} */ (current_batch).schedule(effect);
}
/**
* If an effect/derived is running for the first time and reads a signal that
* belongs to an earlier batch, this batch must wait for that earlier batch.
* @param {Value} signal
*/
export function mark_current_batch_blocked_by_prior_signal(signal) {
if (current_batch === null) return;
for (const batch of batches) {
if (batch === current_batch || batch.is_fork || batch.id >= current_batch.id) {
continue;
}
if (batch.current.has(signal)) {
current_batch.blockers.add(batch);
}
}
}
/** @type {Source<number>[]} */
let eager_versions = [];

@ -184,18 +184,40 @@ export function async_derived(fn, label, location) {
* @param {any} value
* @param {unknown} error
*/
const handler = (value, error = undefined) => {
const handler = async (value, error = undefined) => {
if (DEV) {
reactivity_loss_tracker = null;
}
if (decrement_pending) {
var skip = error === STALE_REACTION;
debugger;
if (!skip) {
/** @type {Promise<unknown>[]} */
const waits = [];
// All prior async derived runs are now stale,
// but we have to wait for the corresponding batch to resolve before proceeding
for (const [b, d] of deferreds) {
if (b === batch) break;
deferreds.delete(b);
waits.push(b.settled());
d.reject(STALE_REACTION);
}
if (waits.length > 0) {
await Promise.all(waits);
}
}
// don't trigger an update if we're only here because
// the promise was superseded before it could resolve
var skip = error === STALE_REACTION;
decrement_pending(skip);
}
deferreds.delete(batch);
if (error === STALE_REACTION || (effect.f & DESTROYED) !== 0) {
return;
}
@ -214,13 +236,6 @@ export function async_derived(fn, label, location) {
internal_set(signal, value);
// All prior async derived runs are now stale
for (const [b, d] of deferreds) {
deferreds.delete(b);
if (b === batch) break;
d.reject(STALE_REACTION);
}
if (DEV && location !== undefined) {
recent_async_deriveds.add(signal);

@ -51,6 +51,7 @@ import {
batch_values,
current_batch,
flushSync,
mark_current_batch_blocked_by_prior_signal,
schedule_effect
} from './reactivity/batch.js';
import { handle_error } from './error-handling.js';
@ -526,6 +527,17 @@ export function get(signal) {
captured_signals?.add(signal);
// A first-run reaction that touches a signal from an earlier batch
// introduces a cross-batch dependency and must wait on that batch.
if (
async_mode_flag &&
current_batch !== null &&
active_reaction !== null &&
(active_reaction.f & REACTION_RAN) === 0
) {
mark_current_batch_blocked_by_prior_signal(signal);
}
// Register the dependency on the current reaction signal.
if (active_reaction !== null && !untracking) {
// if we're in a derived that is being read inside an _async_ derived,

Loading…
Cancel
Save