WIP sorta working but very messy, tests failing, huge memory leaks

async-changeset
Rich Harris 7 months ago
parent 4a89744830
commit 8c2106efbc

@ -1,6 +1,8 @@
/** @import { TemplateNode, Value } from '#client' */
/** @import { Effect, TemplateNode, Value } from '#client' */
import { DESTROYED } from '../../constants.js';
import { async_derived } from '../../reactivity/deriveds.js';
import { active_effect } from '../../runtime.js';
import { capture, suspend } from './boundary.js';
/**
@ -11,11 +13,18 @@ import { capture, suspend } from './boundary.js';
export function async(node, expressions, fn) {
// TODO handle hydration
var effect = /** @type {Effect} */ (active_effect);
var restore = capture();
var unsuspend = suspend();
var { unsuspend } = suspend();
Promise.all(expressions.map((fn) => async_derived(fn))).then((result) => {
restore();
if ((effect.f & DESTROYED) !== 0) {
return;
}
fn(node, ...result);
unsuspend();
});

@ -1,4 +1,4 @@
/** @import { Effect, TemplateNode, } from '#client' */
/** @import { Effect, Source, TemplateNode, } from '#client' */
import {
BOUNDARY_EFFECT,
@ -15,8 +15,6 @@ import {
set_active_effect,
set_active_reaction,
reset_is_throwing_error,
schedule_effect,
check_dirtiness,
update_effect
} from '../../runtime.js';
import {
@ -43,6 +41,9 @@ import { from_async_derived, set_from_async_derived } from '../../reactivity/der
var flags = EFFECT_TRANSPARENT | EFFECT_PRESERVED | BOUNDARY_EFFECT;
/** @type {Fork | null} */
export var active_fork = null;
/**
* @param {TemplateNode} node
* @param {BoundaryProps} props
@ -73,16 +74,7 @@ export class Boundary {
#children;
/** @type {Effect} */
#effect;
/** @type {Set<() => void>} */
#callbacks = new Set();
/** @type {Effect[]} */
#render_effects = [];
/** @type {Effect[]} */
#effects = [];
effect;
/** @type {Effect | null} */
#main_effect = null;
@ -96,6 +88,12 @@ export class Boundary {
/** @type {DocumentFragment | null} */
#offscreen_fragment = null;
/** @type {Set<Fork>} */
#forks = new Set();
/** @type {Map<Source, any>} */
values = new Map();
#pending_count = 0;
#is_creating_fallback = false;
@ -105,6 +103,8 @@ export class Boundary {
* @param {((anchor: Node) => void)} children
*/
constructor(node, props, children) {
window.boundary = this;
this.#anchor = node;
this.#props = props;
this.#children = children;
@ -113,7 +113,7 @@ export class Boundary {
this.parent = /** @type {Effect} */ (active_effect).b;
this.#effect = block(() => {
this.effect = block(() => {
/** @type {Effect} */ (active_effect).b = this;
if (hydrating) {
@ -146,7 +146,7 @@ export class Boundary {
if (this.#pending_count > 0) {
this.suspended = true;
this.#show_pending_snippet(true);
this.#show_pending_snippet();
}
}
@ -172,9 +172,9 @@ export class Boundary {
var previous_reaction = active_reaction;
var previous_ctx = component_context;
set_active_effect(this.#effect);
set_active_reaction(this.#effect);
set_component_context(this.#effect.ctx);
set_active_effect(this.effect);
set_active_reaction(this.effect);
set_component_context(this.effect.ctx);
try {
return fn();
@ -185,10 +185,7 @@ export class Boundary {
}
}
/**
* @param {boolean} initial
*/
#show_pending_snippet(initial) {
#show_pending_snippet() {
const pending = this.#props.pending;
if (pending !== undefined) {
@ -208,36 +205,7 @@ export class Boundary {
}
}
/** @param {() => void} fn */
add_callback(fn) {
this.#callbacks.add(fn);
}
/** @param {Effect} effect */
add_effect(effect) {
((effect.f & RENDER_EFFECT) !== 0 ? this.#render_effects : this.#effects).push(effect);
}
commit() {
if (this.#pending_count > 0) {
return;
}
this.suspended = false;
for (const e of this.#render_effects) {
try {
if (check_dirtiness(e)) {
update_effect(e);
}
} catch (error) {
handle_error(error, e, null, e.ctx);
}
}
for (const fn of this.#callbacks) fn();
this.#callbacks.clear();
hide_pending_snippet() {
if (this.#pending_effect) {
pause_effect(this.#pending_effect, () => {
this.#pending_effect = null;
@ -248,31 +216,21 @@ export class Boundary {
this.#anchor.before(this.#offscreen_fragment);
this.#offscreen_fragment = null;
}
for (const e of this.#effects) {
try {
if (check_dirtiness(e)) {
update_effect(e);
}
} catch (error) {
handle_error(error, e, null, e.ctx);
}
}
}
increment() {
this.suspended = true;
this.#pending_count++;
if (active_fork) {
active_fork.increment();
} else if (this.#pending_count++ === 0) {
this.#show_pending_snippet();
}
}
decrement() {
if (--this.#pending_count === 0) {
this.commit();
if (this.#main_effect !== null) {
// TODO do we also need to `resume_effect` here?
schedule_effect(this.#main_effect);
}
if (active_fork) {
active_fork.decrement();
} else if (--this.#pending_count === 0) {
this.hide_pending_snippet();
}
}
@ -303,7 +261,7 @@ export class Boundary {
if (this.#pending_count > 0) {
this.suspended = true;
this.#show_pending_snippet(true);
this.#show_pending_snippet();
}
};
@ -350,7 +308,7 @@ export class Boundary {
);
});
} catch (error) {
handle_error(error, this.#effect, null, this.#effect.ctx);
handle_error(error, this.effect, null, this.effect.ctx);
return null;
} finally {
reset_is_throwing_error();
@ -360,6 +318,168 @@ export class Boundary {
});
}
}
/**
* @param {Set<Source>} changeset
* @param {(fork: Fork) => void} fn
*/
fork(changeset, fn) {
if (!active_fork || !this.#forks.has(active_fork)) {
active_fork = new Fork(this, changeset);
this.#forks.add(active_fork);
}
fn(active_fork);
if (!active_fork.suspended) {
active_fork.commit();
}
active_fork = null;
}
/**
* @param {Source} source
*/
get(source) {
if (!this.values.has(source)) {
this.values.set(source, source.v);
}
return this.values.get(source);
}
/**
* @param {Fork} fork
*/
commit_fork(fork) {
for (const source of fork.changeset) {
this.values.set(source, source.v);
}
this.delete_fork(fork);
}
/**
* @param {Fork} fork
*/
delete_fork(fork) {
this.#forks.delete(fork);
if (this.#forks.size === 0) {
// TODO we need to clear this at some point otherwise
// it's a huge memory leak that will make dominic mad
}
}
}
export class Fork {
/** @type {Boundary} */
#boundary;
/** @type {Set<Source>} */
changeset; // TODO make private
/** @type {Set<() => void>} */
#callbacks = new Set();
/** @type {Effect[]} */
#render_effects = [];
/** @type {Effect[]} */
#effects = [];
#pending_count = 0;
/**
*
* @param {Boundary} boundary
* @param {Set<Source>} changeset
*/
constructor(boundary, changeset) {
this.#boundary = boundary;
this.changeset = new Set(changeset);
}
/**
*
* @param {Source} source
*/
get(source) {
// console.log(
// 'fork#get',
// [...this.changeset].map((s) => s.v),
// this.changeset.has(source),
// source.v
// );
if (this.changeset.has(source)) {
return source.v;
}
return this.#boundary.get(source);
}
/** @param {() => void} fn */
add_callback(fn) {
this.#callbacks.add(fn);
}
/** @param {Effect} effect */
add_effect(effect) {
((effect.f & RENDER_EFFECT) !== 0 ? this.#render_effects : this.#effects).push(effect);
}
commit() {
if (this.#pending_count > 0) {
return;
}
this.suspended = false;
for (const e of this.#render_effects) {
try {
// if (check_dirtiness(e)) {
update_effect(e);
// }
} catch (error) {
handle_error(error, e, null, e.ctx);
}
}
for (const fn of this.#callbacks) fn();
this.#callbacks.clear();
this.#boundary.hide_pending_snippet();
for (const e of this.#effects) {
try {
// if (check_dirtiness(e)) {
update_effect(e);
// }
} catch (error) {
handle_error(error, e, null, e.ctx);
}
}
this.#boundary.commit_fork(this);
}
increment() {
this.#pending_count++;
this.suspended = true;
}
decrement() {
if (--this.#pending_count === 0) {
this.suspended = false;
// this.commit();
}
}
discard() {
this.#boundary.delete_fork(this);
}
}
/**
@ -381,6 +501,7 @@ function move_effect(effect, fragment) {
}
export function capture(track = true) {
var previous_fork = active_fork;
var previous_effect = active_effect;
var previous_reaction = active_reaction;
var previous_component_context = component_context;
@ -391,6 +512,7 @@ export function capture(track = true) {
return function restore() {
if (track) {
active_fork = previous_fork;
set_active_effect(previous_effect);
set_active_reaction(previous_reaction);
set_component_context(previous_component_context);
@ -419,10 +541,30 @@ export function suspend() {
e.await_outside_boundary();
}
boundary.increment();
let fork = active_fork;
if (fork) {
fork.increment();
} else {
boundary.increment();
}
return {
discard() {
if (fork) {
fork.discard();
} else {
// TODO ???
}
},
return function unsuspend() {
boundary.decrement();
unsuspend() {
if (fork) {
fork.decrement();
} else {
boundary.decrement();
}
}
};
}

@ -39,6 +39,7 @@ import { queue_micro_task } from '../task.js';
import { active_effect, get } from '../../runtime.js';
import { DEV } from 'esm-env';
import { derived_safe_equal } from '../../reactivity/deriveds.js';
import { active_fork } from './boundary.js';
/**
* The row of a keyed each block that is currently updating. We track this
@ -267,7 +268,7 @@ export function each(node, flags, get_collection, get_key, render_fn, fallback_f
fallback = branch(() => fallback_fn(anchor));
}
} else {
if (boundary !== null && should_defer_append()) {
if (active_fork !== null && should_defer_append()) {
for (i = 0; i < length; i += 1) {
value = array[i];
key = get_key(value, i);
@ -298,7 +299,7 @@ export function each(node, flags, get_collection, get_key, render_fn, fallback_f
}
}
boundary?.add_callback(commit);
active_fork.add_callback(commit);
} else {
commit();
}

@ -12,6 +12,7 @@ import { block, branch, pause_effect, resume_effect } from '../../reactivity/eff
import { HYDRATION_START_ELSE, UNINITIALIZED } from '../../../../constants.js';
import { create_text, should_defer_append } from '../operations.js';
import { active_effect } from '../../runtime.js';
import { active_fork } from './boundary.js';
/**
* @param {TemplateNode} node
@ -109,7 +110,7 @@ export function if_block(node, fn, elseif = false) {
}
}
var defer = boundary !== null && should_defer_append();
var defer = active_fork !== null && should_defer_append();
var target = anchor;
if (defer) {
@ -122,7 +123,7 @@ export function if_block(node, fn, elseif = false) {
}
if (defer) {
boundary?.add_callback(commit);
active_fork?.add_callback(commit);
target.remove();
} else {
commit();

@ -6,6 +6,7 @@ import { is_runes } from '../../context.js';
import { hydrate_next, hydrate_node, hydrating } from '../hydration.js';
import { create_text, should_defer_append } from '../operations.js';
import { active_effect } from '../../runtime.js';
import { active_fork } from './boundary.js';
/**
* @template V
@ -57,7 +58,7 @@ export function key_block(node, get_key, render_fn) {
if (changed(key, (key = get_key()))) {
var target = anchor;
var defer = boundary !== null && should_defer_append();
var defer = active_fork !== null && should_defer_append();
if (defer) {
offscreen_fragment = document.createDocumentFragment();
@ -67,7 +68,7 @@ export function key_block(node, get_key, render_fn) {
pending_effect = branch(() => render_fn(target));
if (defer) {
boundary?.add_callback(commit);
active_fork?.add_callback(commit);
target.remove();
} else {
commit();

@ -4,6 +4,7 @@ import { block, branch, pause_effect } from '../../reactivity/effects.js';
import { active_effect } from '../../runtime.js';
import { hydrate_next, hydrate_node, hydrating } from '../hydration.js';
import { create_text, should_defer_append } from '../operations.js';
import { active_fork } from './boundary.js';
/**
* @template P
@ -51,7 +52,7 @@ export function component(node, get_component, render_fn) {
block(() => {
if (component === (component = get_component())) return;
var defer = boundary !== null && should_defer_append();
var defer = active_fork !== null && should_defer_append();
if (component) {
var target = anchor;
@ -69,7 +70,7 @@ export function component(node, get_component, render_fn) {
}
if (defer) {
boundary?.add_callback(commit);
active_fork?.add_callback(commit);
} else {
commit();
}

@ -26,7 +26,7 @@ import { block, destroy_effect } from './effects.js';
import { inspect_effects, internal_set, set_inspect_effects, source } from './sources.js';
import { get_stack } from '../dev/tracing.js';
import { tracing_mode_flag } from '../../flags/index.js';
import { capture, suspend } from '../dom/blocks/boundary.js';
import { active_fork, capture, suspend } from '../dom/blocks/boundary.js';
import { component_context } from '../context.js';
import { noop } from '../../shared/utils.js';
import { UNINITIALIZED } from '../../../constants.js';
@ -107,6 +107,9 @@ export function async_derived(fn, location) {
/** @type {(() => void) | null} */
var unsuspend = null;
/** @type {(() => void) | null} */
var discard = null;
// TODO this isn't a block
block(() => {
if (DEV) from_async_derived = active_effect;
@ -114,7 +117,11 @@ export function async_derived(fn, location) {
if (DEV) from_async_derived = null;
var restore = capture();
if (should_suspend) unsuspend ??= suspend();
if (should_suspend) {
discard?.();
({ discard, unsuspend } = suspend());
}
promise.then(
(v) => {
@ -126,7 +133,13 @@ export function async_derived(fn, location) {
restore();
from_async_derived = null;
internal_set(signal, v);
if (signal.v === UNINITIALIZED) {
signal.v = v;
} else {
internal_set(signal, v);
}
active_fork?.changeset.add(signal);
if (DEV && location !== undefined) {
recent_async_deriveds.add(signal);

@ -349,7 +349,7 @@ export function template_effect(fn, sync = [], async = [], d = derived) {
if (async.length > 0) {
var restore = capture();
var unsuspend = suspend();
var { unsuspend } = suspend();
Promise.all(async.map((expression) => async_derived(expression))).then((result) => {
restore();

@ -34,6 +34,7 @@ import * as e from '../errors.js';
import { legacy_mode_flag, tracing_mode_flag } from '../../flags/index.js';
import { get_stack } from '../dev/tracing.js';
import { component_context, is_runes } from '../context.js';
import { active_fork } from '../dom/blocks/boundary.js';
export let inspect_effects = new Set();
@ -184,7 +185,7 @@ export function internal_set(source, value) {
}
}
if (!changeset.has(source)) {
if (!active_fork && !changeset.has(source)) {
changeset.add(source);
}

@ -49,8 +49,9 @@ import {
set_component_context,
set_dev_current_component_function
} from './context.js';
import { Boundary } from './dom/blocks/boundary.js';
import { active_fork, Boundary, Fork } from './dom/blocks/boundary.js';
import * as w from './warnings.js';
import { log_effect_tree } from './dev/debug.js';
const FLUSH_MICROTASK = 0;
const FLUSH_SYNC = 1;
@ -806,10 +807,10 @@ export function schedule_effect(signal) {
* effects to be flushed.
*
* @param {Effect} effect
* @param {Boundary} [boundary]
* @param {Fork} [fork]
* @returns {Effect[]}
*/
function process_effects(effect, boundary) {
function process_effects(effect, fork) {
/** @type {Effect[]} */
var effects = [];
@ -822,18 +823,18 @@ function process_effects(effect, boundary) {
var sibling = current_effect.next;
if (!is_skippable_branch && (flags & INERT) === 0) {
if (boundary !== undefined && (flags & (BLOCK_EFFECT | BRANCH_EFFECT)) === 0) {
// Inside a boundary, defer everything except block/branch effects
boundary.add_effect(current_effect);
} else if ((flags & BOUNDARY_EFFECT) !== 0) {
var b = /** @type {Boundary} */ (current_effect.b);
process_effects(current_effect, b);
if (fork !== undefined && (flags & (BLOCK_EFFECT | BRANCH_EFFECT)) === 0) {
if (check_dirtiness(current_effect)) {
// Inside a boundary, defer everything except block/branch effects
fork.add_effect(current_effect);
if (!b.suspended) {
// no more async work to happen
b.commit();
// Mark the effect clean, so that `mark_reactions` has the desired outcome
set_signal_status(current_effect, CLEAN);
}
} else if ((flags & BOUNDARY_EFFECT) !== 0) {
/** @type {Boundary} */ (current_effect.b).fork(changeset, (fork) => {
process_effects(/** @type {Effect} */ (current_effect), fork);
});
} else if ((flags & EFFECT) !== 0) {
effects.push(current_effect);
} else if (is_branch) {
@ -1059,6 +1060,15 @@ export function get(signal) {
recent_async_deriveds.delete(signal);
}
if (active_fork) {
return active_fork.get(signal);
}
var boundary = active_effect?.b;
if (boundary) {
return boundary.get(signal);
}
return signal.v;
}

Loading…
Cancel
Save