diff --git a/src/node/build/build.ts b/src/node/build/build.ts index c8ede08e..f6467fb4 100644 --- a/src/node/build/build.ts +++ b/src/node/build/build.ts @@ -52,7 +52,7 @@ export async function build( const unlinkVue = linkVue() if (shouldUseParallel(siteConfig)) { - await launchWorkers(siteConfig.concurrency, { + launchWorkers(siteConfig.concurrency, { config: siteConfig, options: buildOptions }) @@ -189,7 +189,7 @@ export async function build( await generateSitemap(siteConfig) await siteConfig.buildEnd?.(siteConfig) clearCache() - await stopWorkers('build complete') + stopWorkers('build complete') const timeEnd = performance.now() const duration = humanizeDuration(timeEnd - timeStart, { maxDecimalPoints: 2 diff --git a/src/node/build/bundle.ts b/src/node/build/bundle.ts index db4c28b1..f381785f 100644 --- a/src/node/build/bundle.ts +++ b/src/node/build/bundle.ts @@ -24,13 +24,12 @@ async function bundleWorkload( ssr: boolean, plugins: PluginOption[] ) { - return build( - await resolveViteConfig(ssr, { - config: this.config, - options: this.options, - plugins - }) - ) as Promise + const config = await resolveViteConfig(ssr, { + config: this.config, + options: this.options, + plugins + }) + return build(config) as Promise } async function bundleMPA( diff --git a/src/node/worker.ts b/src/node/worker.ts index 32c846d4..97045ea6 100644 --- a/src/node/worker.ts +++ b/src/node/worker.ts @@ -5,6 +5,7 @@ import c from 'picocolors' import Queue from './utils/queue' import _debug from 'debug' import type { SiteConfig } from 'siteConfig' +import humanizeDuration from 'humanize-duration' export type SupportsParallel = 'bundle' | 'render' | 'local-search' @@ -23,17 +24,6 @@ export function shouldUseParallel(config: SiteConfig, task?: SupportsParallel) { let debug = _debug('vitepress:worker:main') const WORKER_MAGIC = 'vitepress:worker' - -function debugArgv(...argv: any[]) { - if (!debug.enabled) return '' - return argv - .map((v) => { - const t = typeof v - if (v?.length !== undefined) return `${t}[${v.length}]` - else return t - }) - .join(', ') -} /*=============================== Main Thread ===============================*/ interface WorkerTask { name: string @@ -43,36 +33,41 @@ interface WorkerTask { } // Owned by main thread, will be distributed to workers -const taskQueue = new Queue() +let taskQueue: Queue | null = null function dispatchWork(name: string, ...argv: any[]): Promise { if (workerMeta) { return workerMeta.dispatchWork(name, ...argv) + } else if (taskQueue) { + const { promise, resolve, reject } = deferPromise() + taskQueue.enqueue({ name, argv, resolve, reject }) + return promise } else { - return new Promise((resolve, reject) => - taskQueue.enqueue({ name, argv, resolve, reject }) - ) + throw new Error(`trying to dispatch ${name} before launching workers.`) } } -type WorkerWithHooks = Worker & { +type WorkerInstance = Worker & { + workerId: string hooks: { // Update worker's context updateContext: (ctx: Object | null) => void } } -const workers: Array = [] +const workers: Array = [] export async function launchWorkers(numWorkers: number, context: Object) { + debug(`launching ${numWorkers} workers`) + taskQueue = new Queue() const allInitialized: Array> = [] const ctx = new RpcContext() - const getNextTask = () => taskQueue.dequeue() + const getNextTask = () => taskQueue?.dequeue() ?? null for (let i = 0; i < numWorkers; i++) { const workerId = (i + 1).toString().padStart(2, '0') const { promise, resolve } = deferPromise() - const initWorkerHooks = (hooks: WorkerWithHooks['hooks']) => { - worker.hooks = hooks + const initWorkerHooks = (hooks: WorkerInstance['hooks']) => { + Object.assign(worker, { workerId, hooks }) resolve() } const debug = _debug(`vitepress:worker:${workerId.padEnd(4)}`) @@ -80,6 +75,7 @@ export async function launchWorkers(numWorkers: number, context: Object) { workerMeta: { workerId, dispatchWork, + // Save some RPC overhead when debugger is not active debug: debug.enabled ? debug : null, task, updateCurrentTask @@ -90,7 +86,7 @@ export async function launchWorkers(numWorkers: number, context: Object) { }) const worker = new Worker(new URL(import.meta.url), { workerData: { [WORKER_MAGIC]: payload } - }) as WorkerWithHooks + }) as WorkerInstance ctx.bind(worker) workers.push(worker) allInitialized.push(promise) @@ -105,14 +101,17 @@ export function updateContext(context: Object) { // Wait for workers to drain the taskQueue and exit. export async function stopWorkers(reason: string = 'exit') { - const allClosed = workers.map( - (w) => new Promise((res) => w.once('exit', res)) + debug('stopping workers:', reason) + const allClosed = workers.map((w) => + new Promise((res) => w.once('exit', () => res())).then(() => + debug(`worker:${w.workerId} confirmed exit`) + ) ) - taskQueue.close() - debug('waiting for workers, exiting because', reason) + taskQueue?.close() + taskQueue = null const success = await Promise.any([ Promise.all(allClosed).then(() => true), - new Promise((res) => setTimeout(() => res(false), 1000)) + new Promise((res) => setTimeout(() => res(false), 2000)) ]) if (!success) { console.warn('forcefully terminating workers') @@ -141,11 +140,10 @@ export function registerWorkload( main: (this: T, ...args: K) => V, init?: (this: T, ...args: void[]) => void ) { + // Only register workload in worker threads if (!isMainThread) { - // Only register workload in worker threads - if (registry.has(name)) { + if (registry.has(name)) throw new Error(`Workload "${name}" already registered.`) - } registry.set(name, { main, init }) } return (...args: Parameters) => @@ -170,6 +168,7 @@ async function workerMainLoop() { workerMeta = _workerMeta! if (workerMeta.debug) debug = workerMeta.debug else debug = (() => {}) as any as typeof debug + debug(`started`) // Upon worker initialization, report back the hooks that main thread can use // to reach this worker. await initWorkerHooks({ @@ -179,21 +178,24 @@ async function workerMainLoop() { } }) + let workTime = 0 while (true) { const task = await getNextTask() if (task === null) break const { name, argv, resolve, reject } = task - debug('got task', name, '(', debugArgv(...argv), ')') if (!registry.has(name)) throw new Error(`No task "${name}" registered.`) const el = registry.get(name)! const { main, init } = el + const timeStart = performance.now() if (init) { try { await init.apply(context) } catch (e) { console.error(c.red(`worker: failed to init workload "${name}":`), e) + reject(e) + } finally { + el.init = undefined } - el.init = undefined } try { resolve(await main.apply(context, argv)) @@ -203,9 +205,16 @@ async function workerMainLoop() { e ) reject(e) + } finally { + workTime += performance.now() - timeStart } } ctx.reset() + const duration = humanizeDuration(workTime, { + maxDecimalPoints: 2 + }) + await debug(`stopped - total workload: ${duration}`) } -if (!isMainThread && WORKER_MAGIC in workerData) workerMainLoop() +if (!isMainThread && workerData?.[WORKER_MAGIC]) + workerMainLoop().then(() => process.exit())