refactor: add concurrent promise pooling for render task

fixes #3362
pull/3366/head
Yuxuan Zhang 2 years ago
parent 93122eee20
commit c04a1e9746

@ -10,6 +10,7 @@ import { resolveConfig, type SiteConfig } from '../config'
import { slash, type HeadConfig } from '../shared' import { slash, type HeadConfig } from '../shared'
import { deserializeFunctions, serializeFunctions } from '../utils/fnSerialize' import { deserializeFunctions, serializeFunctions } from '../utils/fnSerialize'
import { task } from '../utils/task' import { task } from '../utils/task'
import Pool from '../utils/pool'
import { bundle } from './bundle' import { bundle } from './bundle'
import { generateSitemap } from './generateSitemap' import { generateSitemap } from './generateSitemap'
import { renderPage } from './render' import { renderPage } from './render'
@ -106,24 +107,38 @@ export async function build(
} }
} }
await Promise.all( /**
['404.md', ...siteConfig.pages] * Since this pool is not a thread pool, we can give it a "thread count"
.map((page) => siteConfig.rewrites.map[page] || page) * larger than the actual number of CPUs available.
.map((page) => * ---
renderPage( * https://github.com/vuejs/vitepress/issues/3362
render, * This fix will defer the memory allocation of render tasks.
siteConfig, * ---
page, * By limiting the max number of dispatched promises, it allows NodeJS to
clientResult, * reclaim memory associated to each promise, avoiding memory allocation
appChunk, * failures when handling large volume of pages.
cssChunk, * ---
assets, * Fixes: #3362
pageToHashMap, */
metadataScript, const renderPageTaskPool = new Pool(64)
additionalHeadTags // Pool all rendering tasks
) for (const page of ['404.md', ...siteConfig.pages])
renderPageTaskPool.add(() =>
renderPage(
render,
siteConfig,
siteConfig.rewrites.map[page] || page,
clientResult,
appChunk,
cssChunk,
assets,
pageToHashMap,
metadataScript,
additionalHeadTags
) )
) )
// Wait for all rendering tasks to finish
await renderPageTaskPool.drain()
}) })
// emit page hash map for the case where a user session is open // emit page hash map for the case where a user session is open

@ -0,0 +1,96 @@
/* ---------------------------------------------------------
* Copyright (c) 2023 Yuxuan Zhang, web-dev@z-yx.cc
* This source code is licensed under the MIT license.
* You may find the full license in project root directory.
* ------------------------------------------------------ */
import os from 'os'
/**
* Create a callback pool, with a maximum number of parallel tasks.
* Queued tasks will be executed directly when the pool is not full.
* Otherwise, they will be executed in a first-in-first-out manner
* when a dispatched task finishes.
* ---
* When used with 'node:child_process', you can control the pooling
* of parallel tasks, and avoid spawning too many child processes.
*/
export default class Pool extends EventTarget {
// Number of dispatched tasks (currently in progress)
private numExecutors: number = 0
// The pending task queue. Promise return type <T> varies by task
private pendingTaskQueue: Array<() => Promise<any>> = []
/**
* This executor will keep executing next available task in queue.
* Therefore, it always owns a running task until the queue drains.
* You can think of it as a "worker thread" in a thread pool.
*/
private executor() {
// Only launch new executor when the pool is not full
if (this.numExecutors >= this.maxThreads) return
// Register new executor
this.numExecutors++
;(async () => {
while (this.pendingTaskQueue.length > 0) {
try {
const nextTaskInQueue = this.pendingTaskQueue.shift()!
await nextTaskInQueue()
} catch (e) {}
}
// Unregister the executor
this.numExecutors--
// Dispatch "drain" event when the last executor finishes
if (this.numExecutors === 0) this.dispatchEvent(new Event('drain'))
})()
}
/**
* @param maxThreads Maximum number of parallel tasks
*/
constructor(private maxThreads: number = os.cpus().length) {
super()
}
/**
* Dispatch a task to the pool. Use it like `new Promise<T>(task)`
* @param task The task call back, just like the one in Promise constructor
* @returns
* The constructed promise object, will resolve to the value
* passed to resolve callback
*/
add<T = any>(task: (...args: any[]) => T, ...args: any[]): Promise<T> {
// Create deferred promise handlers
let resolve: (v: T) => void, reject: (reason?: any) => void
const promise = new Promise<T>(
(...callbacks) => ([resolve, reject] = callbacks)
)
// Push the task to queue for deferred resolution
// Notice the task may NOT be invoked right away
this.pendingTaskQueue.push(async () => {
try {
resolve(await task(...args))
} catch (error: any) {
reject(error)
}
})
// Try to launch a new executor
this.executor()
// Return the promise
return promise
}
/**
* Wait for all dispatched executors to finish.
* Return immediately if no executor is running.
* @returns A promise that resolves when the pool drains
*/
async drain() {
if (this.numExecutors > 0)
await new Promise<void>((resolve) =>
this.addEventListener('drain', () => resolve(), { once: true })
)
return
}
}
Loading…
Cancel
Save