- add state changes resulting from an $effect to a separate new batch

- schedule rerunning effects based on the sources that are dirty, not just rerunning them all blindly (excempting async effects which will have run by that time already)
async-fixes-2
Simon Holthausen 2 months ago
parent 9412c5861c
commit b8893f95d9

@ -28,7 +28,7 @@ import * as e from '../errors.js';
import { flush_tasks } from '../dom/task.js';
import { DEV } from 'esm-env';
import { invoke_error_boundary } from '../error-handling.js';
import { old_values } from './sources.js';
import { mark_reactions, old_values } from './sources.js';
import { unlink_effect } from './effects.js';
import { unset_context } from './async.js';
@ -70,13 +70,15 @@ let last_scheduled_effect = null;
let is_flushing = false;
let flushing_sync = false;
export class Batch {
/**
* The current values of any sources that are updated in this batch
* They keys of this map are identical to `this.#previous`
* @type {Map<Source, any>}
*/
#current = new Map();
current = new Map();
/**
* The values of any sources that are updated in this batch _before_ those updates took place.
@ -156,7 +158,7 @@ export class Batch {
*
* @param {Effect[]} root_effects
*/
#process(root_effects) {
process(root_effects) {
queued_root_effects = [];
/** @type {Map<Source, { v: unknown, wv: number }> | null} */
@ -169,7 +171,7 @@ export class Batch {
current_values = new Map();
batch_deriveds = new Map();
for (const [source, current] of this.#current) {
for (const [source, current] of this.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}
@ -300,7 +302,7 @@ export class Batch {
this.#previous.set(source, value);
}
this.#current.set(source, source.v);
this.current.set(source, source.v);
}
activate() {
@ -346,49 +348,7 @@ export class Batch {
}
flush_effects() {
var was_updating_effect = is_updating_effect;
is_flushing = true;
try {
var flush_count = 0;
set_is_updating_effect(true);
while (queued_root_effects.length > 0) {
if (flush_count++ > 1000) {
if (DEV) {
var updates = new Map();
for (const source of this.#current.keys()) {
for (const [stack, update] of source.updated ?? []) {
var entry = updates.get(stack);
if (!entry) {
entry = { error: update.error, count: 0 };
updates.set(stack, entry);
}
entry.count += update.count;
}
}
for (const update of updates.values()) {
// eslint-disable-next-line no-console
console.error(update.error);
}
}
infinite_loop_guard();
}
this.#process(queued_root_effects);
old_values.clear();
}
} finally {
is_flushing = false;
set_is_updating_effect(was_updating_effect);
last_scheduled_effect = null;
}
flush_effects();
}
/**
@ -412,19 +372,8 @@ export class Batch {
this.#pending -= 1;
if (this.#pending === 0) {
for (const e of this.#render_effects) {
set_signal_status(e, DIRTY);
schedule_effect(e);
}
for (const e of this.#effects) {
set_signal_status(e, DIRTY);
schedule_effect(e);
}
for (const e of this.#block_effects) {
set_signal_status(e, DIRTY);
schedule_effect(e);
for (const source of this.current.keys()) {
mark_reactions(source, DIRTY, false);
}
this.#render_effects = [];
@ -487,32 +436,88 @@ export function flushSync(fn) {
e.flush_sync_in_effect();
}
var result;
var prev_flushing_sync = flushing_sync;
flushing_sync = true;
const batch = Batch.ensure(false);
try {
var result;
if (fn) {
batch.flush_effects();
const batch = Batch.ensure(false);
result = fn();
}
if (fn) {
batch.flush_effects();
while (true) {
flush_tasks();
result = fn();
}
while (true) {
flush_tasks();
if (queued_root_effects.length === 0) {
// TODO this might need adjustment
if (batch === current_batch) {
batch.flush();
}
// this would be reset in `batch.flush_effects()` but since we are early returning here,
// we need to reset it here as well in case the first time there's 0 queued root effects
last_scheduled_effect = null;
if (queued_root_effects.length === 0) {
if (batch === current_batch) {
batch.flush();
return /** @type {T} */ (result);
}
// this would be reset in `batch.flush_effects()` but since we are early returning here,
// we need to reset it here as well in case the first time there's 0 queued root effects
last_scheduled_effect = null;
batch.flush_effects();
}
} finally {
flushing_sync = prev_flushing_sync;
}
}
return /** @type {T} */ (result);
function flush_effects() {
var was_updating_effect = is_updating_effect;
is_flushing = true;
try {
var flush_count = 0;
var batch = /** @type {Batch} */ (current_batch);
set_is_updating_effect(true);
while (queued_root_effects.length > 0) {
if (flush_count++ > 1000) {
if (DEV) {
var updates = new Map();
for (const source of batch.current.keys()) {
for (const [stack, update] of source.updated ?? []) {
var entry = updates.get(stack);
if (!entry) {
entry = { error: update.error, count: 0 };
updates.set(stack, entry);
}
entry.count += update.count;
}
}
for (const update of updates.values()) {
// eslint-disable-next-line no-console
console.error(update.error);
}
}
infinite_loop_guard();
}
batch = /** @type {Batch} */ (current_batch);
batch.process(queued_root_effects);
old_values.clear();
}
} finally {
is_flushing = false;
set_is_updating_effect(was_updating_effect);
batch.flush_effects();
last_scheduled_effect = null;
}
}
@ -545,6 +550,7 @@ function flush_queued_effects(effects) {
if ((effect.f & (DESTROYED | INERT)) === 0) {
if (is_dirty(effect)) {
var wv = write_version;
var current_size = /** @type {Batch} */ (current_batch).current.size;
update_effect(effect);
@ -568,6 +574,22 @@ function flush_queued_effects(effects) {
// if state is written in a user effect, abort and re-schedule, lest we run
// effects that should be removed as a result of the state change
if (write_version > wv && (effect.f & USER_EFFECT) !== 0) {
// Work needs to happen in a separate batch, else prior sources would be mixed with
// newly updated sources, which could lead to infinite loops when effects run over and over again.
// We need to bring over the just written sources though to correctly mark the right reactions as dirty.
var old_batch = /** @type {Batch} */ (current_batch);
batches.delete(old_batch);
current_batch = null;
var new_batch = Batch.ensure(!flushing_sync);
var current_idx = 0;
// We're taking advantage of the spec here which says that entries in a Map are traversed by insertion order
for (const source of old_batch.current) {
if (current_idx >= current_size) {
new_batch.capture(source[0], source[1]);
}
current_idx++;
}
i++;
break;
}
}

@ -301,9 +301,10 @@ export function increment(source) {
/**
* @param {Value} signal
* @param {number} status should be DIRTY or MAYBE_DIRTY
* @param {boolean} schedule_async
* @returns {void}
*/
function mark_reactions(signal, status) {
export function mark_reactions(signal, status, schedule_async = true) {
var reactions = signal.reactions;
if (reactions === null) return;
@ -324,13 +325,13 @@ function mark_reactions(signal, status) {
}
// don't set a DIRTY reaction to MAYBE_DIRTY
if ((flags & DIRTY) === 0) {
if ((flags & DIRTY) === 0 && (schedule_async || (flags & ASYNC) === 0)) {
set_signal_status(reaction, status);
}
if ((flags & DERIVED) !== 0) {
mark_reactions(/** @type {Derived} */ (reaction), MAYBE_DIRTY);
} else if ((flags & DIRTY) === 0) {
} else if ((flags & DIRTY) === 0 && (schedule_async || (flags & ASYNC) === 0)) {
schedule_effect(/** @type {Effect} */ (reaction));
}
}

Loading…
Cancel
Save