infrastructure: worker context and dispatch manager

pull/3386/head
Yuxuan Zhang 2 years ago
parent 007fa592da
commit 25ca443e27
No known key found for this signature in database
GPG Key ID: 6910B04F3351EF7D

@ -191,6 +191,7 @@
"rollup": "^4.9.2",
"rollup-plugin-dts": "^6.1.0",
"rollup-plugin-esbuild": "^6.1.0",
"rpc-magic-proxy": "0.0.0-beta.3",
"semver": "^7.5.4",
"simple-git-hooks": "^2.9.0",
"sirv": "^2.0.4",

@ -273,6 +273,9 @@ importers:
rollup-plugin-esbuild:
specifier: ^6.1.0
version: 6.1.0(esbuild@0.19.11)(rollup@4.9.2)(supports-color@9.4.0)
rpc-magic-proxy:
specifier: 0.0.0-beta.3
version: 0.0.0-beta.3
semver:
specifier: ^7.5.4
version: 7.5.4
@ -3890,6 +3893,10 @@ packages:
'@rollup/rollup-win32-x64-msvc': 4.9.2
fsevents: 2.3.3
/rpc-magic-proxy@0.0.0-beta.3:
resolution: {integrity: sha512-k1hVDnaX4TBxVWpW6tRoc3qCyuuJ8luOeK4ArqkFJKinBa0yVnZzgYVtvLzC9iWnVzNKIQNNsxkth445nKWL1Q==}
dev: true
/run-parallel@1.2.0:
resolution: {integrity: sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==}
dependencies:

@ -16,6 +16,7 @@ import { bundle } from './bundle'
import { generateSitemap } from './generateSitemap'
import { renderPage } from './render'
import humanizeDuration from 'humanize-duration'
import { launchWorkers, waitWorkers } from '../worker'
export async function build(
root?: string,
@ -27,6 +28,9 @@ export async function build(
const siteConfig = await resolveConfig(root, 'build', 'production')
const unlinkVue = linkVue()
if (siteConfig.parallel)
launchWorkers(siteConfig.buildConcurrency, { config: siteConfig })
if (buildOptions.base) {
siteConfig.site.base = buildOptions.base
delete buildOptions.base
@ -147,6 +151,8 @@ export async function build(
await siteConfig.buildEnd?.(siteConfig)
clearCache()
if (siteConfig.parallel) await waitWorkers()
const timeEnd = performance.now()
const duration = humanizeDuration(timeEnd - timeStart, {
maxDecimalPoints: 2

@ -143,7 +143,8 @@ export async function resolveConfig(
rewrites,
userConfig,
sitemap: userConfig.sitemap,
buildConcurrency: userConfig.buildConcurrency ?? 64
buildConcurrency: userConfig.buildConcurrency ?? 64,
parallel: userConfig.parallel ?? true
}
// to be shared with content loaders

@ -1,6 +1,7 @@
import { createServer as createViteServer, type ServerOptions } from 'vite'
import { resolveConfig } from './config'
import { createVitePressPlugin } from './plugin'
import { launchWorkers } from './worker'
export async function createServer(
root: string = process.cwd(),
@ -9,6 +10,9 @@ export async function createServer(
) {
const config = await resolveConfig(root)
if (config.parallel)
launchWorkers(config.buildConcurrency, { config: config })
if (serverOptions.base) {
config.site.base = serverOptions.base
delete serverOptions.base

@ -156,6 +156,18 @@ export interface UserConfig<ThemeConfig = any>
*/
buildConcurrency?: number
/**
* This option is the general switch for enabling parallel computing. When
* enabled, vitepress will create worker threads and distribute workload to
* them. Currently, the following features are supported:
* 1. Parallel SPA Bundling
* 2. Parallel SSR Rendering
* 3. Parallel Local Search Indexing (when using default splitter)
* @experimental
* @default true
*/
parallel?: boolean
/**
* @experimental
*
@ -250,4 +262,5 @@ export interface SiteConfig<ThemeConfig = any>
logger: Logger
userConfig: UserConfig
buildConcurrency: number
parallel: boolean
}

@ -0,0 +1,36 @@
// Asynchronous queue with a close method
export default class Queue<T> {
private queue: Array<T> = []
private pending: Array<(data: T | null) => void> = []
#closed: boolean = false
get closed() {
return this.#closed
}
async *items() {
while (true) {
const item = await this.dequeue()
if (item === null) break
yield item
}
}
enqueue(data: T) {
if (this.closed)
throw new Error(`Failed to enqueue ${data}, queue already closed`)
if (this.pending.length) this.pending.shift()!(data)
else this.queue.push(data)
}
async dequeue(): Promise<T | null> {
if (this.closed) return null
if (this.queue.length) return this.queue.shift()!
return new Promise((res) => this.pending.push(res))
}
close() {
this.#closed = true
for (const res of this.pending) res(null)
}
}

@ -0,0 +1,114 @@
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'
import RpcContext from 'rpc-magic-proxy'
import Queue from './utils/queue'
const WORKER_MAGIC = '::vitepress::build-worker::'
interface WorkerTask {
name: string
argv: any[]
resolve: (retVal: any) => void
reject: (error?: any) => void
}
/*=============================== Main Thread ===============================*/
// Owned by main thread, will be distributed to workers
const taskQueue = new Queue<WorkerTask>()
// This function will be exposed to workers via magic proxy
function getNextTask() {
return taskQueue.dequeue()
}
export function dispatchWork(name: string, ...argv: any[]): Promise<any> {
return new Promise((resolve, reject) =>
taskQueue.enqueue({ name, argv, resolve, reject })
)
}
const workers: Worker[] = []
export async function launchWorkers(numWorkers: number, context: Object) {
const ctx = new RpcContext()
const workerData = await ctx.serialize({
[WORKER_MAGIC]: '',
getNextTask,
context
})
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(new URL(import.meta.url), { workerData })
ctx.bind(worker)
workers.push(worker)
}
}
export function updateContext(context: Object) {
for (const worker of workers) {
worker.postMessage({
[WORKER_MAGIC]: 'update/context',
context
})
}
}
// Wait for workers to drain the taskQueue and exit.
export function waitWorkers() {
const allClosed = workers.map(
(w) => new Promise((res) => w.once('exit', res))
)
taskQueue.close()
return Promise.all(allClosed)
}
/*============================== Worker Thread ==============================*/
const registry: Map<string, { main: Function; init?: Function }> = new Map()
export function registerWorkload(
name: string,
main: (...argv: any[]) => any,
init?: () => void
) {
// Only register workload in worker threads
if (isMainThread) return
if (registry.has(name)) {
throw new Error(`Workload "${name}" already registered.`)
}
registry.set(name, { main, init })
}
// Will keep querying next workload from main thread
async function workerMain() {
const ctx = new RpcContext(parentPort!)
const {
getNextTask,
context
}: {
getNextTask: () => Promise<WorkerTask | null>
context: Object
} = ctx.deserialize(workerData)
parentPort!.on('message', (msg) => {
if (msg?.[WORKER_MAGIC] === 'update/context') {
Object.assign(context, msg.context)
}
})
while (true) {
const task = await getNextTask()
if (task === null) break
const { name, argv, resolve, reject } = task
if (!registry.has(name)) throw new Error(`No task "${name}" registered.`)
const { main, init } = registry.get(name)!
if (init) {
init.apply(context)
delete registry.get(name)!.init
}
await (main.apply(context, argv) as Promise<any>).then(resolve, reject)
}
ctx.reset()
process.exit(0)
}
if (!isMainThread && WORKER_MAGIC in workerData) workerMain()
Loading…
Cancel
Save