merge-batches
Rich Harris 2 days ago
parent 12e1c81fe6
commit e286d30859

@ -1,4 +1,4 @@
/** @import { Derived, Effect, Source } from '#client' */ /** @import { Derived, Effect, Source, Value } from '#client' */
import { import {
BLOCK_EFFECT, BLOCK_EFFECT,
BRANCH_EFFECT, BRANCH_EFFECT,
@ -10,10 +10,11 @@ import {
INERT, INERT,
RENDER_EFFECT, RENDER_EFFECT,
ROOT_EFFECT, ROOT_EFFECT,
MAYBE_DIRTY MAYBE_DIRTY,
DERIVED
} from '#client/constants'; } from '#client/constants';
import { async_mode_flag } from '../../flags/index.js'; import { async_mode_flag } from '../../flags/index.js';
import { deferred, define_property } from '../../shared/utils.js'; import { deferred, define_property, noop } from '../../shared/utils.js';
import { import {
active_effect, active_effect,
is_dirty, is_dirty,
@ -165,32 +166,7 @@ export class Batch {
previous_batch = null; previous_batch = null;
/** @type {Map<Source, { v: unknown, wv: number }> | null} */ var revert = Batch.apply(this);
var current_values = null;
// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one
if (async_mode_flag && batches.size > 1) {
current_values = new Map();
batch_deriveds = new Map();
for (const [source, current] of this.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}
for (const batch of batches) {
if (batch === this) continue;
for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}
}
for (const root of root_effects) { for (const root of root_effects) {
this.#traverse_effect_tree(root); this.#traverse_effect_tree(root);
@ -221,29 +197,20 @@ export class Batch {
this.#defer_effects(this.#render_effects); this.#defer_effects(this.#render_effects);
this.#defer_effects(this.#effects); this.#defer_effects(this.#effects);
this.#defer_effects(this.#block_effects); this.#defer_effects(this.#block_effects);
}
if (current_values) { for (const effect of this.#async_effects) {
for (const [source, { v, wv }] of current_values) { update_effect(effect);
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
} }
batch_deriveds = null; this.#async_effects = [];
} }
for (const effect of this.#async_effects) { revert();
update_effect(effect);
}
for (const effect of this.#boundary_async_effects) { for (const effect of this.#boundary_async_effects) {
update_effect(effect); update_effect(effect);
} }
this.#async_effects = [];
this.#boundary_async_effects = []; this.#boundary_async_effects = [];
} }
@ -381,6 +348,51 @@ export class Batch {
} }
this.#callbacks.clear(); this.#callbacks.clear();
/**
* @param {Value} value
* @param {Set<Effect>} effects
*/
function get_async_effects(value, effects) {
if (value.reactions !== null) {
for (const reaction of value.reactions) {
const flags = reaction.f;
if ((flags & DERIVED) !== 0) {
get_async_effects(/** @type {Derived} */ (reaction), effects);
} else if ((flags & ASYNC) !== 0) {
effects.add(/** @type {Effect} */ (reaction));
}
}
}
}
if (batches.size > 1) {
const effects = new Set();
for (const source of this.current.keys()) {
// TODO do we also need block effects?
get_async_effects(source, effects);
}
this.#previous.clear();
for (const batch of batches) {
if (batch === this) {
continue;
}
current_batch = batch;
const revert = Batch.apply(batch);
for (const e of effects) {
update_effect(e);
}
revert();
}
current_batch = null;
}
batches.delete(this); batches.delete(this);
} }
@ -444,6 +456,56 @@ export class Batch {
static enqueue(task) { static enqueue(task) {
queue_micro_task(task); queue_micro_task(task);
} }
/**
* @param {Batch} current_batch
*/
static apply(current_batch) {
if (!async_mode_flag || batches.size === 1) {
return noop;
}
/** @type {Map<Source, { v: unknown, wv: number }> | null} */
var current_values = null;
// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one
if (async_mode_flag && batches.size > 1) {
current_values = new Map();
batch_deriveds = new Map();
for (const [source, current] of current_batch.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}
for (const batch of batches) {
if (batch === current_batch) continue;
for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}
}
return () => {
if (current_values) {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}
batch_deriveds = null;
}
};
}
} }
/** /**

@ -109,31 +109,26 @@ export function async_derived(fn, location) {
var promise = /** @type {Promise<V>} */ (/** @type {unknown} */ (undefined)); var promise = /** @type {Promise<V>} */ (/** @type {unknown} */ (undefined));
var signal = source(/** @type {V} */ (UNINITIALIZED)); var signal = source(/** @type {V} */ (UNINITIALIZED));
/** @type {Promise<V> | null} */
var prev = null;
// only suspend in async deriveds created on initialisation // only suspend in async deriveds created on initialisation
var should_suspend = !active_reaction; var should_suspend = !active_reaction;
/** @type {Map<Batch, Promise<V>>} */
var promises = new Map();
async_effect(() => { async_effect(() => {
if (DEV) current_async_effect = active_effect; if (DEV) current_async_effect = active_effect;
/** @type {Promise<V>} */
var current;
try { try {
var p = fn(); current = promise = Promise.resolve(fn());
// Make sure to always access the then property to read any signals
// it might access, so that we track them as dependencies.
if (prev) Promise.resolve(p).catch(() => {}); // avoid unhandled rejection
} catch (error) { } catch (error) {
p = Promise.reject(error); current = promise = Promise.reject(error);
} }
if (DEV) current_async_effect = null; if (DEV) current_async_effect = null;
var r = () => p;
promise = prev?.then(r, r) ?? Promise.resolve(p);
prev = promise;
var batch = /** @type {Batch} */ (current_batch); var batch = /** @type {Batch} */ (current_batch);
var pending = boundary.is_pending(); var pending = boundary.is_pending();
@ -142,40 +137,42 @@ export function async_derived(fn, location) {
if (!pending) batch.increment(); if (!pending) batch.increment();
} }
promises.set(batch, promise);
/** /**
* @param {any} value * @param {any} value
* @param {unknown} error * @param {unknown} error
*/ */
const handler = (value, error = undefined) => { const handler = (value, error = undefined) => {
prev = null;
current_async_effect = null; current_async_effect = null;
if (!pending) batch.activate(); if (!pending) batch.activate();
if (error) { if (current === promises.get(batch)) {
if (error !== STALE_REACTION) { if (error) {
signal.f |= ERROR_VALUE; if (error !== STALE_REACTION) {
signal.f |= ERROR_VALUE;
// @ts-expect-error the error is the wrong type, but we don't care // @ts-expect-error the error is the wrong type, but we don't care
internal_set(signal, error); internal_set(signal, error);
} }
} else { } else {
if ((signal.f & ERROR_VALUE) !== 0) { if ((signal.f & ERROR_VALUE) !== 0) {
signal.f ^= ERROR_VALUE; signal.f ^= ERROR_VALUE;
} }
internal_set(signal, value); internal_set(signal, value);
if (DEV && location !== undefined) { if (DEV && location !== undefined) {
recent_async_deriveds.add(signal); recent_async_deriveds.add(signal);
setTimeout(() => { setTimeout(() => {
if (recent_async_deriveds.has(signal)) { if (recent_async_deriveds.has(signal)) {
w.await_waterfall(/** @type {string} */ (signal.label), location); w.await_waterfall(/** @type {string} */ (signal.label), location);
recent_async_deriveds.delete(signal); recent_async_deriveds.delete(signal);
} }
}); });
}
} }
} }

Loading…
Cancel
Save