From 25ca443e27c60ad375420397e38fd72bb0b45aeb Mon Sep 17 00:00:00 2001 From: Yuxuan Zhang Date: Tue, 2 Jan 2024 22:34:39 -0500 Subject: [PATCH 1/2] infrastructure: worker context and dispatch manager --- package.json | 1 + pnpm-lock.yaml | 7 +++ src/node/build/build.ts | 6 +++ src/node/config.ts | 3 +- src/node/server.ts | 4 ++ src/node/siteConfig.ts | 13 +++++ src/node/utils/queue.ts | 36 +++++++++++++ src/node/worker.ts | 114 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 src/node/utils/queue.ts create mode 100644 src/node/worker.ts diff --git a/package.json b/package.json index 75c2c130..7bedad70 100644 --- a/package.json +++ b/package.json @@ -191,6 +191,7 @@ "rollup": "^4.9.2", "rollup-plugin-dts": "^6.1.0", "rollup-plugin-esbuild": "^6.1.0", + "rpc-magic-proxy": "0.0.0-beta.3", "semver": "^7.5.4", "simple-git-hooks": "^2.9.0", "sirv": "^2.0.4", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 114185c9..1360b0f3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -273,6 +273,9 @@ importers: rollup-plugin-esbuild: specifier: ^6.1.0 version: 6.1.0(esbuild@0.19.11)(rollup@4.9.2)(supports-color@9.4.0) + rpc-magic-proxy: + specifier: 0.0.0-beta.3 + version: 0.0.0-beta.3 semver: specifier: ^7.5.4 version: 7.5.4 @@ -3890,6 +3893,10 @@ packages: '@rollup/rollup-win32-x64-msvc': 4.9.2 fsevents: 2.3.3 + /rpc-magic-proxy@0.0.0-beta.3: + resolution: {integrity: sha512-k1hVDnaX4TBxVWpW6tRoc3qCyuuJ8luOeK4ArqkFJKinBa0yVnZzgYVtvLzC9iWnVzNKIQNNsxkth445nKWL1Q==} + dev: true + /run-parallel@1.2.0: resolution: {integrity: sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==} dependencies: diff --git a/src/node/build/build.ts b/src/node/build/build.ts index 8d5cf937..336f106c 100644 --- a/src/node/build/build.ts +++ b/src/node/build/build.ts @@ -16,6 +16,7 @@ import { bundle } from './bundle' import { generateSitemap } from './generateSitemap' import { renderPage } from './render' import humanizeDuration from 'humanize-duration' +import { launchWorkers, waitWorkers } from '../worker' export async function build( root?: string, @@ -27,6 +28,9 @@ export async function build( const siteConfig = await resolveConfig(root, 'build', 'production') const unlinkVue = linkVue() + if (siteConfig.parallel) + launchWorkers(siteConfig.buildConcurrency, { config: siteConfig }) + if (buildOptions.base) { siteConfig.site.base = buildOptions.base delete buildOptions.base @@ -147,6 +151,8 @@ export async function build( await siteConfig.buildEnd?.(siteConfig) clearCache() + if (siteConfig.parallel) await waitWorkers() + const timeEnd = performance.now() const duration = humanizeDuration(timeEnd - timeStart, { maxDecimalPoints: 2 diff --git a/src/node/config.ts b/src/node/config.ts index 6d1bbd49..585933de 100644 --- a/src/node/config.ts +++ b/src/node/config.ts @@ -143,7 +143,8 @@ export async function resolveConfig( rewrites, userConfig, sitemap: userConfig.sitemap, - buildConcurrency: userConfig.buildConcurrency ?? 64 + buildConcurrency: userConfig.buildConcurrency ?? 64, + parallel: userConfig.parallel ?? true } // to be shared with content loaders diff --git a/src/node/server.ts b/src/node/server.ts index 4105edfe..8e6f2349 100644 --- a/src/node/server.ts +++ b/src/node/server.ts @@ -1,6 +1,7 @@ import { createServer as createViteServer, type ServerOptions } from 'vite' import { resolveConfig } from './config' import { createVitePressPlugin } from './plugin' +import { launchWorkers } from './worker' export async function createServer( root: string = process.cwd(), @@ -9,6 +10,9 @@ export async function createServer( ) { const config = await resolveConfig(root) + if (config.parallel) + launchWorkers(config.buildConcurrency, { config: config }) + if (serverOptions.base) { config.site.base = serverOptions.base delete serverOptions.base diff --git a/src/node/siteConfig.ts b/src/node/siteConfig.ts index 74174280..3ba79649 100644 --- a/src/node/siteConfig.ts +++ b/src/node/siteConfig.ts @@ -156,6 +156,18 @@ export interface UserConfig */ buildConcurrency?: number + /** + * This option is the general switch for enabling parallel computing. When + * enabled, vitepress will create worker threads and distribute workload to + * them. Currently, the following features are supported: + * 1. Parallel SPA Bundling + * 2. Parallel SSR Rendering + * 3. Parallel Local Search Indexing (when using default splitter) + * @experimental + * @default true + */ + parallel?: boolean + /** * @experimental * @@ -250,4 +262,5 @@ export interface SiteConfig logger: Logger userConfig: UserConfig buildConcurrency: number + parallel: boolean } diff --git a/src/node/utils/queue.ts b/src/node/utils/queue.ts new file mode 100644 index 00000000..26a8123e --- /dev/null +++ b/src/node/utils/queue.ts @@ -0,0 +1,36 @@ +// Asynchronous queue with a close method +export default class Queue { + private queue: Array = [] + private pending: Array<(data: T | null) => void> = [] + #closed: boolean = false + + get closed() { + return this.#closed + } + + async *items() { + while (true) { + const item = await this.dequeue() + if (item === null) break + yield item + } + } + + enqueue(data: T) { + if (this.closed) + throw new Error(`Failed to enqueue ${data}, queue already closed`) + if (this.pending.length) this.pending.shift()!(data) + else this.queue.push(data) + } + + async dequeue(): Promise { + if (this.closed) return null + if (this.queue.length) return this.queue.shift()! + return new Promise((res) => this.pending.push(res)) + } + + close() { + this.#closed = true + for (const res of this.pending) res(null) + } +} diff --git a/src/node/worker.ts b/src/node/worker.ts new file mode 100644 index 00000000..359af0df --- /dev/null +++ b/src/node/worker.ts @@ -0,0 +1,114 @@ +import { Worker, isMainThread, parentPort, workerData } from 'worker_threads' +import RpcContext from 'rpc-magic-proxy' +import Queue from './utils/queue' + +const WORKER_MAGIC = '::vitepress::build-worker::' + +interface WorkerTask { + name: string + argv: any[] + resolve: (retVal: any) => void + reject: (error?: any) => void +} + +/*=============================== Main Thread ===============================*/ + +// Owned by main thread, will be distributed to workers +const taskQueue = new Queue() + +// This function will be exposed to workers via magic proxy +function getNextTask() { + return taskQueue.dequeue() +} + +export function dispatchWork(name: string, ...argv: any[]): Promise { + return new Promise((resolve, reject) => + taskQueue.enqueue({ name, argv, resolve, reject }) + ) +} + +const workers: Worker[] = [] + +export async function launchWorkers(numWorkers: number, context: Object) { + const ctx = new RpcContext() + const workerData = await ctx.serialize({ + [WORKER_MAGIC]: '', + getNextTask, + context + }) + for (let i = 0; i < numWorkers; i++) { + const worker = new Worker(new URL(import.meta.url), { workerData }) + ctx.bind(worker) + workers.push(worker) + } +} + +export function updateContext(context: Object) { + for (const worker of workers) { + worker.postMessage({ + [WORKER_MAGIC]: 'update/context', + context + }) + } +} + +// Wait for workers to drain the taskQueue and exit. +export function waitWorkers() { + const allClosed = workers.map( + (w) => new Promise((res) => w.once('exit', res)) + ) + taskQueue.close() + return Promise.all(allClosed) +} + +/*============================== Worker Thread ==============================*/ + +const registry: Map = new Map() + +export function registerWorkload( + name: string, + main: (...argv: any[]) => any, + init?: () => void +) { + // Only register workload in worker threads + if (isMainThread) return + if (registry.has(name)) { + throw new Error(`Workload "${name}" already registered.`) + } + registry.set(name, { main, init }) +} + +// Will keep querying next workload from main thread +async function workerMain() { + const ctx = new RpcContext(parentPort!) + const { + getNextTask, + context + }: { + getNextTask: () => Promise + context: Object + } = ctx.deserialize(workerData) + + parentPort!.on('message', (msg) => { + if (msg?.[WORKER_MAGIC] === 'update/context') { + Object.assign(context, msg.context) + } + }) + + while (true) { + const task = await getNextTask() + if (task === null) break + const { name, argv, resolve, reject } = task + if (!registry.has(name)) throw new Error(`No task "${name}" registered.`) + const { main, init } = registry.get(name)! + if (init) { + init.apply(context) + delete registry.get(name)!.init + } + await (main.apply(context, argv) as Promise).then(resolve, reject) + } + ctx.reset() + process.exit(0) +} + +if (!isMainThread && WORKER_MAGIC in workerData) workerMain() From 9d5aedc8017beb3a348fb7c8a1a667026bdceccf Mon Sep 17 00:00:00 2001 From: Yuxuan Zhang Date: Wed, 3 Jan 2024 00:03:10 -0500 Subject: [PATCH 2/2] perf(localSearch): Use JSDOM for section split, adapt to worker API --- package.json | 3 + src/node/plugins/localSearchPlugin.ts | 165 ++++++++++++++++++-------- 2 files changed, 119 insertions(+), 49 deletions(-) diff --git a/package.json b/package.json index 7bedad70..6c127ae8 100644 --- a/package.json +++ b/package.json @@ -98,6 +98,7 @@ "@vueuse/core": "^10.7.1", "@vueuse/integrations": "^10.7.1", "focus-trap": "^7.5.4", + "jsdom": "^23.0.1", "mark.js": "8.11.1", "minisearch": "^6.3.0", "mrmime": "^2.0.0", @@ -139,6 +140,7 @@ "@types/escape-html": "^1.0.4", "@types/fs-extra": "^11.0.4", "@types/humanize-duration": "^3.27.3", + "@types/jsdom": "^21.1.6", "@types/lodash.template": "^4.5.3", "@types/mark.js": "^8.11.12", "@types/markdown-it-attrs": "^4.1.3", @@ -155,6 +157,7 @@ "conventional-changelog-cli": "^4.1.0", "cross-spawn": "^7.0.3", "debug": "^4.3.4", + "dom-traverse": "^0.0.1", "esbuild": "^0.19.11", "escape-html": "^1.0.3", "execa": "^8.0.1", diff --git a/src/node/plugins/localSearchPlugin.ts b/src/node/plugins/localSearchPlugin.ts index e94eafb7..b71a9dfd 100644 --- a/src/node/plugins/localSearchPlugin.ts +++ b/src/node/plugins/localSearchPlugin.ts @@ -13,6 +13,10 @@ import { type MarkdownEnv } from '../shared' import { processIncludes } from '../utils/processIncludes' +import { updateCurrentTask } from '../utils/task' +import type { PageSplitSection } from '../../../types/local-search' +import { registerWorkload, dispatchWork } from '../worker' +import Queue from '../utils/queue' const debug = _debug('vitepress:local-search') @@ -122,7 +126,16 @@ export async function localSearchPlugin( return id } - async function indexFile(page: string) { + function scanForLocales() { + for (const page of siteConfig.pages) { + const file = path.join(siteConfig.srcDir, page) + const locale = getLocaleForPath(file) + // dry-fetch the index for this locale + getIndexByLocale(locale) + } + } + + async function indexFile(page: string, parallel: boolean = false) { const file = path.join(siteConfig.srcDir, page) // get file metadata const fileId = getDocId(file) @@ -133,8 +146,10 @@ export async function localSearchPlugin( const sections = // user provided generator (await options.miniSearch?._splitIntoSections?.(file, html)) ?? + // default implementation (parallel) + (parallel ? parallelSplitter(html, fileId) : undefined) ?? // default implementation - splitPageIntoSections(html) + splitPageIntoSections(html, fileId) // add sections to the locale index for await (const section of sections) { if (!section || !(section.text || section.titles)) break @@ -149,14 +164,27 @@ export async function localSearchPlugin( } } - async function scanForBuild() { - debug('🔍️ Indexing files for search...') - await pMap(siteConfig.pages, indexFile, { - concurrency: siteConfig.buildConcurrency - }) - debug('✅ Indexing finished...') + async function indexAll() { + const concurrency = siteConfig.buildConcurrency + let numIndexed = 0 + + const updateProgress = () => + updateCurrentTask( + ++numIndexed, + siteConfig.pages.length, + 'indexing local search' + ) + await pMap( + siteConfig.pages, + (page) => indexFile(page, siteConfig.parallel).then(updateProgress), + { concurrency } + ) + + updateCurrentTask() } + let indexAllPromise: Promise | undefined + return { name: 'vitepress:local-search', @@ -172,7 +200,6 @@ export async function localSearchPlugin( async configureServer(_server) { server = _server - await scanForBuild() onIndexUpdated() }, @@ -184,25 +211,23 @@ export async function localSearchPlugin( async load(id) { if (id === LOCAL_SEARCH_INDEX_REQUEST_PATH) { - if (process.env.NODE_ENV === 'production') { - await scanForBuild() - } + console.log('\n🔍️ load', id) + scanForLocales() let records: string[] = [] for (const [locale] of indexByLocales) { records.push( `${JSON.stringify( locale - )}: () => import('@localSearchIndex${locale}')` + )}: () => import('${LOCAL_SEARCH_INDEX_ID}-${locale}')` ) } return `export default {${records.join(',')}}` } else if (id.startsWith(LOCAL_SEARCH_INDEX_REQUEST_PATH)) { + console.log('\n🔍️ load', id) + const locale = id.slice(LOCAL_SEARCH_INDEX_REQUEST_PATH.length + 1) + await (indexAllPromise ??= indexAll()) return `export default ${JSON.stringify( - JSON.stringify( - indexByLocales.get( - id.replace(LOCAL_SEARCH_INDEX_REQUEST_PATH, '') - ) ?? {} - ) + JSON.stringify(indexByLocales.get(locale) ?? {}) )}` } }, @@ -217,40 +242,82 @@ export async function localSearchPlugin( } } -const headingRegex = /(.*?.*?<\/a>)<\/h\1>/gi -const headingContentRegex = /(.*?).*?<\/a>/i - -/** - * Splits HTML into sections based on headings - */ -function* splitPageIntoSections(html: string) { - const result = html.split(headingRegex) - result.shift() - let parentTitles: string[] = [] - for (let i = 0; i < result.length; i += 3) { - const level = parseInt(result[i]) - 1 - const heading = result[i + 1] - const headingResult = headingContentRegex.exec(heading) - const title = clearHtmlTags(headingResult?.[1] ?? '').trim() - const anchor = headingResult?.[2] ?? '' - const content = result[i + 2] - if (!title || !content) continue - const titles = parentTitles.slice(0, level) - titles[level] = title - yield { anchor, titles, text: getSearchableText(content) } - if (level === 0) { - parentTitles = [title] - } else { - parentTitles[level] = title +async function* splitPageIntoSections(html: string, fileId: string) { + const { JSDOM } = await import('jsdom') + const { default: traverse, Node } = await import('dom-traverse') + const dom = JSDOM.fragment(html) + // Stack of title hierarchy for current working section + const titleStack: Array<{ level: number; text: string }> = [] + // Set of all used ids (for duplicate id detection) + const existingIdSet = new Set() + // Current working section + let section: PageSplitSection = { text: '', titles: [''] } + function submit() { + section.text = section.text.replace(/\W+/gs, ' ').trim() + return section + } + // Traverse the DOM + for (const [node, skipChildren] of traverse.skippable(dom)) { + if (node.nodeType === Node.ELEMENT_NODE) { + const el = node as Element + if (!/^H\d+$/i.test(el.tagName)) continue + if (!el.hasAttribute('id')) continue + const id = el.getAttribute('id')! + if (existingIdSet.has(id)) { + console.error(`\x1b[2K\r⚠️ Duplicate heading id "${id}" in ${fileId}`) + continue + } + existingIdSet.add(id) + // Submit previous section + if (section.text || section.anchor) yield submit() + // Pop adjacent titles depending on level + const level = parseInt(el.tagName.slice(1)) + while (titleStack.length > 0) { + if (titleStack.at(-1)!.level >= level) titleStack.pop() + else break + } + titleStack.push({ level, text: el.textContent ?? '' }) + // Create new section + section = { + text: '', + anchor: id, + titles: titleStack.map((_) => _.text) + } + skipChildren() + } else if (node.nodeType === Node.TEXT_NODE) { + // Collect text content + section.text += node.textContent } } + // Submit last section + yield submit() } -function getSearchableText(content: string) { - content = clearHtmlTags(content) - return content +// Worker proxy in main thread +function parallelSplitter(html: string, fileId: string) { + const queue = new Queue() + dispatchWork( + 'local-search::split', + html, + fileId, + queue.enqueue.bind(queue), + queue.close.bind(queue) + ) + return queue.items() } -function clearHtmlTags(str: string) { - return str.replace(/<[^>]*>/g, '') -} +// Worker proxy in worker thread +registerWorkload( + 'local-search::split', + async ( + html: string, + fileId: string, + _yield: (section: PageSplitSection) => Promise, + _end: () => Promise + ) => { + for await (const section of splitPageIntoSections(html, fileId)) { + await _yield(section) + } + await _end() + } +)