From 25ca443e27c60ad375420397e38fd72bb0b45aeb Mon Sep 17 00:00:00 2001 From: Yuxuan Zhang Date: Tue, 2 Jan 2024 22:34:39 -0500 Subject: [PATCH] infrastructure: worker context and dispatch manager --- package.json | 1 + pnpm-lock.yaml | 7 +++ src/node/build/build.ts | 6 +++ src/node/config.ts | 3 +- src/node/server.ts | 4 ++ src/node/siteConfig.ts | 13 +++++ src/node/utils/queue.ts | 36 +++++++++++++ src/node/worker.ts | 114 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 src/node/utils/queue.ts create mode 100644 src/node/worker.ts diff --git a/package.json b/package.json index 75c2c130..7bedad70 100644 --- a/package.json +++ b/package.json @@ -191,6 +191,7 @@ "rollup": "^4.9.2", "rollup-plugin-dts": "^6.1.0", "rollup-plugin-esbuild": "^6.1.0", + "rpc-magic-proxy": "0.0.0-beta.3", "semver": "^7.5.4", "simple-git-hooks": "^2.9.0", "sirv": "^2.0.4", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 114185c9..1360b0f3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -273,6 +273,9 @@ importers: rollup-plugin-esbuild: specifier: ^6.1.0 version: 6.1.0(esbuild@0.19.11)(rollup@4.9.2)(supports-color@9.4.0) + rpc-magic-proxy: + specifier: 0.0.0-beta.3 + version: 0.0.0-beta.3 semver: specifier: ^7.5.4 version: 7.5.4 @@ -3890,6 +3893,10 @@ packages: '@rollup/rollup-win32-x64-msvc': 4.9.2 fsevents: 2.3.3 + /rpc-magic-proxy@0.0.0-beta.3: + resolution: {integrity: sha512-k1hVDnaX4TBxVWpW6tRoc3qCyuuJ8luOeK4ArqkFJKinBa0yVnZzgYVtvLzC9iWnVzNKIQNNsxkth445nKWL1Q==} + dev: true + /run-parallel@1.2.0: resolution: {integrity: sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==} dependencies: diff --git a/src/node/build/build.ts b/src/node/build/build.ts index 8d5cf937..336f106c 100644 --- a/src/node/build/build.ts +++ b/src/node/build/build.ts @@ -16,6 +16,7 @@ import { bundle } from './bundle' import { generateSitemap } from './generateSitemap' import { renderPage } from './render' import humanizeDuration from 'humanize-duration' +import { launchWorkers, waitWorkers } from '../worker' export async function build( root?: string, @@ -27,6 +28,9 @@ export async function build( const siteConfig = await resolveConfig(root, 'build', 'production') const unlinkVue = linkVue() + if (siteConfig.parallel) + launchWorkers(siteConfig.buildConcurrency, { config: siteConfig }) + if (buildOptions.base) { siteConfig.site.base = buildOptions.base delete buildOptions.base @@ -147,6 +151,8 @@ export async function build( await siteConfig.buildEnd?.(siteConfig) clearCache() + if (siteConfig.parallel) await waitWorkers() + const timeEnd = performance.now() const duration = humanizeDuration(timeEnd - timeStart, { maxDecimalPoints: 2 diff --git a/src/node/config.ts b/src/node/config.ts index 6d1bbd49..585933de 100644 --- a/src/node/config.ts +++ b/src/node/config.ts @@ -143,7 +143,8 @@ export async function resolveConfig( rewrites, userConfig, sitemap: userConfig.sitemap, - buildConcurrency: userConfig.buildConcurrency ?? 64 + buildConcurrency: userConfig.buildConcurrency ?? 64, + parallel: userConfig.parallel ?? true } // to be shared with content loaders diff --git a/src/node/server.ts b/src/node/server.ts index 4105edfe..8e6f2349 100644 --- a/src/node/server.ts +++ b/src/node/server.ts @@ -1,6 +1,7 @@ import { createServer as createViteServer, type ServerOptions } from 'vite' import { resolveConfig } from './config' import { createVitePressPlugin } from './plugin' +import { launchWorkers } from './worker' export async function createServer( root: string = process.cwd(), @@ -9,6 +10,9 @@ export async function createServer( ) { const config = await resolveConfig(root) + if (config.parallel) + launchWorkers(config.buildConcurrency, { config: config }) + if (serverOptions.base) { config.site.base = serverOptions.base delete serverOptions.base diff --git a/src/node/siteConfig.ts b/src/node/siteConfig.ts index 74174280..3ba79649 100644 --- a/src/node/siteConfig.ts +++ b/src/node/siteConfig.ts @@ -156,6 +156,18 @@ export interface UserConfig */ buildConcurrency?: number + /** + * This option is the general switch for enabling parallel computing. When + * enabled, vitepress will create worker threads and distribute workload to + * them. Currently, the following features are supported: + * 1. Parallel SPA Bundling + * 2. Parallel SSR Rendering + * 3. Parallel Local Search Indexing (when using default splitter) + * @experimental + * @default true + */ + parallel?: boolean + /** * @experimental * @@ -250,4 +262,5 @@ export interface SiteConfig logger: Logger userConfig: UserConfig buildConcurrency: number + parallel: boolean } diff --git a/src/node/utils/queue.ts b/src/node/utils/queue.ts new file mode 100644 index 00000000..26a8123e --- /dev/null +++ b/src/node/utils/queue.ts @@ -0,0 +1,36 @@ +// Asynchronous queue with a close method +export default class Queue { + private queue: Array = [] + private pending: Array<(data: T | null) => void> = [] + #closed: boolean = false + + get closed() { + return this.#closed + } + + async *items() { + while (true) { + const item = await this.dequeue() + if (item === null) break + yield item + } + } + + enqueue(data: T) { + if (this.closed) + throw new Error(`Failed to enqueue ${data}, queue already closed`) + if (this.pending.length) this.pending.shift()!(data) + else this.queue.push(data) + } + + async dequeue(): Promise { + if (this.closed) return null + if (this.queue.length) return this.queue.shift()! + return new Promise((res) => this.pending.push(res)) + } + + close() { + this.#closed = true + for (const res of this.pending) res(null) + } +} diff --git a/src/node/worker.ts b/src/node/worker.ts new file mode 100644 index 00000000..359af0df --- /dev/null +++ b/src/node/worker.ts @@ -0,0 +1,114 @@ +import { Worker, isMainThread, parentPort, workerData } from 'worker_threads' +import RpcContext from 'rpc-magic-proxy' +import Queue from './utils/queue' + +const WORKER_MAGIC = '::vitepress::build-worker::' + +interface WorkerTask { + name: string + argv: any[] + resolve: (retVal: any) => void + reject: (error?: any) => void +} + +/*=============================== Main Thread ===============================*/ + +// Owned by main thread, will be distributed to workers +const taskQueue = new Queue() + +// This function will be exposed to workers via magic proxy +function getNextTask() { + return taskQueue.dequeue() +} + +export function dispatchWork(name: string, ...argv: any[]): Promise { + return new Promise((resolve, reject) => + taskQueue.enqueue({ name, argv, resolve, reject }) + ) +} + +const workers: Worker[] = [] + +export async function launchWorkers(numWorkers: number, context: Object) { + const ctx = new RpcContext() + const workerData = await ctx.serialize({ + [WORKER_MAGIC]: '', + getNextTask, + context + }) + for (let i = 0; i < numWorkers; i++) { + const worker = new Worker(new URL(import.meta.url), { workerData }) + ctx.bind(worker) + workers.push(worker) + } +} + +export function updateContext(context: Object) { + for (const worker of workers) { + worker.postMessage({ + [WORKER_MAGIC]: 'update/context', + context + }) + } +} + +// Wait for workers to drain the taskQueue and exit. +export function waitWorkers() { + const allClosed = workers.map( + (w) => new Promise((res) => w.once('exit', res)) + ) + taskQueue.close() + return Promise.all(allClosed) +} + +/*============================== Worker Thread ==============================*/ + +const registry: Map = new Map() + +export function registerWorkload( + name: string, + main: (...argv: any[]) => any, + init?: () => void +) { + // Only register workload in worker threads + if (isMainThread) return + if (registry.has(name)) { + throw new Error(`Workload "${name}" already registered.`) + } + registry.set(name, { main, init }) +} + +// Will keep querying next workload from main thread +async function workerMain() { + const ctx = new RpcContext(parentPort!) + const { + getNextTask, + context + }: { + getNextTask: () => Promise + context: Object + } = ctx.deserialize(workerData) + + parentPort!.on('message', (msg) => { + if (msg?.[WORKER_MAGIC] === 'update/context') { + Object.assign(context, msg.context) + } + }) + + while (true) { + const task = await getNextTask() + if (task === null) break + const { name, argv, resolve, reject } = task + if (!registry.has(name)) throw new Error(`No task "${name}" registered.`) + const { main, init } = registry.get(name)! + if (init) { + init.apply(context) + delete registry.get(name)!.init + } + await (main.apply(context, argv) as Promise).then(resolve, reject) + } + ctx.reset() + process.exit(0) +} + +if (!isMainThread && WORKER_MAGIC in workerData) workerMain()