cleanup code

pull/3386/head
Yuxuan Zhang 2 years ago
parent 346e54d57a
commit 77f708a2e5
No known key found for this signature in database
GPG Key ID: 6910B04F3351EF7D

@ -52,7 +52,7 @@ export async function build(
const unlinkVue = linkVue() const unlinkVue = linkVue()
if (shouldUseParallel(siteConfig)) { if (shouldUseParallel(siteConfig)) {
await launchWorkers(siteConfig.concurrency, { launchWorkers(siteConfig.concurrency, {
config: siteConfig, config: siteConfig,
options: buildOptions options: buildOptions
}) })
@ -189,7 +189,7 @@ export async function build(
await generateSitemap(siteConfig) await generateSitemap(siteConfig)
await siteConfig.buildEnd?.(siteConfig) await siteConfig.buildEnd?.(siteConfig)
clearCache() clearCache()
await stopWorkers('build complete') stopWorkers('build complete')
const timeEnd = performance.now() const timeEnd = performance.now()
const duration = humanizeDuration(timeEnd - timeStart, { const duration = humanizeDuration(timeEnd - timeStart, {
maxDecimalPoints: 2 maxDecimalPoints: 2

@ -24,13 +24,12 @@ async function bundleWorkload(
ssr: boolean, ssr: boolean,
plugins: PluginOption[] plugins: PluginOption[]
) { ) {
return build( const config = await resolveViteConfig(ssr, {
await resolveViteConfig(ssr, { config: this.config,
config: this.config, options: this.options,
options: this.options, plugins
plugins })
}) return build(config) as Promise<Rollup.RollupOutput>
) as Promise<Rollup.RollupOutput>
} }
async function bundleMPA( async function bundleMPA(

@ -5,6 +5,7 @@ import c from 'picocolors'
import Queue from './utils/queue' import Queue from './utils/queue'
import _debug from 'debug' import _debug from 'debug'
import type { SiteConfig } from 'siteConfig' import type { SiteConfig } from 'siteConfig'
import humanizeDuration from 'humanize-duration'
export type SupportsParallel = 'bundle' | 'render' | 'local-search' export type SupportsParallel = 'bundle' | 'render' | 'local-search'
@ -23,17 +24,6 @@ export function shouldUseParallel(config: SiteConfig, task?: SupportsParallel) {
let debug = _debug('vitepress:worker:main') let debug = _debug('vitepress:worker:main')
const WORKER_MAGIC = 'vitepress:worker' const WORKER_MAGIC = 'vitepress:worker'
function debugArgv(...argv: any[]) {
if (!debug.enabled) return ''
return argv
.map((v) => {
const t = typeof v
if (v?.length !== undefined) return `${t}[${v.length}]`
else return t
})
.join(', ')
}
/*=============================== Main Thread ===============================*/ /*=============================== Main Thread ===============================*/
interface WorkerTask { interface WorkerTask {
name: string name: string
@ -43,36 +33,41 @@ interface WorkerTask {
} }
// Owned by main thread, will be distributed to workers // Owned by main thread, will be distributed to workers
const taskQueue = new Queue<WorkerTask>() let taskQueue: Queue<WorkerTask> | null = null
function dispatchWork(name: string, ...argv: any[]): Promise<any> { function dispatchWork(name: string, ...argv: any[]): Promise<any> {
if (workerMeta) { if (workerMeta) {
return workerMeta.dispatchWork(name, ...argv) return workerMeta.dispatchWork(name, ...argv)
} else if (taskQueue) {
const { promise, resolve, reject } = deferPromise()
taskQueue.enqueue({ name, argv, resolve, reject })
return promise
} else { } else {
return new Promise((resolve, reject) => throw new Error(`trying to dispatch ${name} before launching workers.`)
taskQueue.enqueue({ name, argv, resolve, reject })
)
} }
} }
type WorkerWithHooks = Worker & { type WorkerInstance = Worker & {
workerId: string
hooks: { hooks: {
// Update worker's context // Update worker's context
updateContext: (ctx: Object | null) => void updateContext: (ctx: Object | null) => void
} }
} }
const workers: Array<WorkerWithHooks> = [] const workers: Array<WorkerInstance> = []
export async function launchWorkers(numWorkers: number, context: Object) { export async function launchWorkers(numWorkers: number, context: Object) {
debug(`launching ${numWorkers} workers`)
taskQueue = new Queue<WorkerTask>()
const allInitialized: Array<Promise<void>> = [] const allInitialized: Array<Promise<void>> = []
const ctx = new RpcContext() const ctx = new RpcContext()
const getNextTask = () => taskQueue.dequeue() const getNextTask = () => taskQueue?.dequeue() ?? null
for (let i = 0; i < numWorkers; i++) { for (let i = 0; i < numWorkers; i++) {
const workerId = (i + 1).toString().padStart(2, '0') const workerId = (i + 1).toString().padStart(2, '0')
const { promise, resolve } = deferPromise() const { promise, resolve } = deferPromise()
const initWorkerHooks = (hooks: WorkerWithHooks['hooks']) => { const initWorkerHooks = (hooks: WorkerInstance['hooks']) => {
worker.hooks = hooks Object.assign(worker, { workerId, hooks })
resolve() resolve()
} }
const debug = _debug(`vitepress:worker:${workerId.padEnd(4)}`) const debug = _debug(`vitepress:worker:${workerId.padEnd(4)}`)
@ -80,6 +75,7 @@ export async function launchWorkers(numWorkers: number, context: Object) {
workerMeta: { workerMeta: {
workerId, workerId,
dispatchWork, dispatchWork,
// Save some RPC overhead when debugger is not active
debug: debug.enabled ? debug : null, debug: debug.enabled ? debug : null,
task, task,
updateCurrentTask updateCurrentTask
@ -90,7 +86,7 @@ export async function launchWorkers(numWorkers: number, context: Object) {
}) })
const worker = new Worker(new URL(import.meta.url), { const worker = new Worker(new URL(import.meta.url), {
workerData: { [WORKER_MAGIC]: payload } workerData: { [WORKER_MAGIC]: payload }
}) as WorkerWithHooks }) as WorkerInstance
ctx.bind(worker) ctx.bind(worker)
workers.push(worker) workers.push(worker)
allInitialized.push(promise) allInitialized.push(promise)
@ -105,14 +101,17 @@ export function updateContext(context: Object) {
// Wait for workers to drain the taskQueue and exit. // Wait for workers to drain the taskQueue and exit.
export async function stopWorkers(reason: string = 'exit') { export async function stopWorkers(reason: string = 'exit') {
const allClosed = workers.map( debug('stopping workers:', reason)
(w) => new Promise((res) => w.once('exit', res)) const allClosed = workers.map((w) =>
new Promise<void>((res) => w.once('exit', () => res())).then(() =>
debug(`worker:${w.workerId} confirmed exit`)
)
) )
taskQueue.close() taskQueue?.close()
debug('waiting for workers, exiting because', reason) taskQueue = null
const success = await Promise.any([ const success = await Promise.any([
Promise.all(allClosed).then(() => true), Promise.all(allClosed).then(() => true),
new Promise<false>((res) => setTimeout(() => res(false), 1000)) new Promise<false>((res) => setTimeout(() => res(false), 2000))
]) ])
if (!success) { if (!success) {
console.warn('forcefully terminating workers') console.warn('forcefully terminating workers')
@ -141,11 +140,10 @@ export function registerWorkload<T extends Object, K extends any[], V>(
main: (this: T, ...args: K) => V, main: (this: T, ...args: K) => V,
init?: (this: T, ...args: void[]) => void init?: (this: T, ...args: void[]) => void
) { ) {
// Only register workload in worker threads
if (!isMainThread) { if (!isMainThread) {
// Only register workload in worker threads if (registry.has(name))
if (registry.has(name)) {
throw new Error(`Workload "${name}" already registered.`) throw new Error(`Workload "${name}" already registered.`)
}
registry.set(name, { main, init }) registry.set(name, { main, init })
} }
return (...args: Parameters<typeof main>) => return (...args: Parameters<typeof main>) =>
@ -170,6 +168,7 @@ async function workerMainLoop() {
workerMeta = _workerMeta! workerMeta = _workerMeta!
if (workerMeta.debug) debug = workerMeta.debug if (workerMeta.debug) debug = workerMeta.debug
else debug = (() => {}) as any as typeof debug else debug = (() => {}) as any as typeof debug
debug(`started`)
// Upon worker initialization, report back the hooks that main thread can use // Upon worker initialization, report back the hooks that main thread can use
// to reach this worker. // to reach this worker.
await initWorkerHooks({ await initWorkerHooks({
@ -179,21 +178,24 @@ async function workerMainLoop() {
} }
}) })
let workTime = 0
while (true) { while (true) {
const task = await getNextTask() const task = await getNextTask()
if (task === null) break if (task === null) break
const { name, argv, resolve, reject } = task const { name, argv, resolve, reject } = task
debug('got task', name, '(', debugArgv(...argv), ')')
if (!registry.has(name)) throw new Error(`No task "${name}" registered.`) if (!registry.has(name)) throw new Error(`No task "${name}" registered.`)
const el = registry.get(name)! const el = registry.get(name)!
const { main, init } = el const { main, init } = el
const timeStart = performance.now()
if (init) { if (init) {
try { try {
await init.apply(context) await init.apply(context)
} catch (e) { } catch (e) {
console.error(c.red(`worker: failed to init workload "${name}":`), e) console.error(c.red(`worker: failed to init workload "${name}":`), e)
reject(e)
} finally {
el.init = undefined
} }
el.init = undefined
} }
try { try {
resolve(await main.apply(context, argv)) resolve(await main.apply(context, argv))
@ -203,9 +205,16 @@ async function workerMainLoop() {
e e
) )
reject(e) reject(e)
} finally {
workTime += performance.now() - timeStart
} }
} }
ctx.reset() ctx.reset()
const duration = humanizeDuration(workTime, {
maxDecimalPoints: 2
})
await debug(`stopped - total workload: ${duration}`)
} }
if (!isMainThread && WORKER_MAGIC in workerData) workerMainLoop() if (!isMainThread && workerData?.[WORKER_MAGIC])
workerMainLoop().then(() => process.exit())

Loading…
Cancel
Save