cleanup worker code, solve server hanging issue (only happens in tests)

pull/3386/head
Yuxuan Zhang 2 years ago
parent 8b8fe146dd
commit c3df64af54
No known key found for this signature in database
GPG Key ID: 6910B04F3351EF7D

@ -16,7 +16,7 @@ import { bundle } from './bundle'
import { generateSitemap } from './generateSitemap' import { generateSitemap } from './generateSitemap'
import { renderPage, type RenderPageContext } from './render' import { renderPage, type RenderPageContext } from './render'
import humanizeDuration from 'humanize-duration' import humanizeDuration from 'humanize-duration'
import { launchWorkers, waitWorkers } from '../worker' import { launchWorkers, stopWorkers } from '../worker'
import { registerWorkload, updateContext } from '../worker' import { registerWorkload, updateContext } from '../worker'
import { createMarkdownRenderer } from '../markdown/markdown' import { createMarkdownRenderer } from '../markdown/markdown'
import type { DefaultTheme } from '../shared' import type { DefaultTheme } from '../shared'
@ -210,7 +210,7 @@ export async function build(
await siteConfig.buildEnd?.(siteConfig) await siteConfig.buildEnd?.(siteConfig)
clearCache() clearCache()
if (siteConfig.parallel) await waitWorkers('build finish') if (siteConfig.parallel) await stopWorkers('build finish')
const timeEnd = performance.now() const timeEnd = performance.now()
const duration = humanizeDuration(timeEnd - timeStart, { const duration = humanizeDuration(timeEnd - timeStart, {

@ -1,7 +1,7 @@
import { createServer as createViteServer, type ServerOptions } from 'vite' import { createServer as createViteServer, type ServerOptions } from 'vite'
import { resolveConfig } from './config' import { resolveConfig } from './config'
import { createVitePressPlugin } from './plugin' import { createVitePressPlugin } from './plugin'
import { launchWorkers } from './worker' import { launchWorkers, stopWorkers } from './worker'
export async function createServer( export async function createServer(
root: string = process.cwd(), root: string = process.cwd(),
@ -25,5 +25,9 @@ export async function createServer(
server: serverOptions, server: serverOptions,
customLogger: config.logger, customLogger: config.logger,
configFile: config.vite?.configFile configFile: config.vite?.configFile
}) }).then((server) =>
Object.assign({}, server, {
close: () => server.close().then(() => stopWorkers('server.close()'))
})
)
} }

@ -28,22 +28,10 @@ interface WorkerTask {
// Owned by main thread, will be distributed to workers // Owned by main thread, will be distributed to workers
const taskQueue = new Queue<WorkerTask>() const taskQueue = new Queue<WorkerTask>()
// This function will be exposed to workers via magic proxy
async function getNextTask() {
const task = await taskQueue.dequeue()
if (task !== null) {
debug('[proxy] got task', task.name, '(', debugArgv(...task.argv), ')')
}
debugger
return task
}
function dispatchWork(name: string, ...argv: any[]): Promise<any> { function dispatchWork(name: string, ...argv: any[]): Promise<any> {
if (workerMeta) { if (workerMeta) {
debug('dispatch', name, '(', debugArgv(...argv), ')')
return workerMeta.dispatchWork(name, ...argv) return workerMeta.dispatchWork(name, ...argv)
} else { } else {
debug('dispatch', name, '(', debugArgv(...argv), ')')
return new Promise((resolve, reject) => return new Promise((resolve, reject) =>
taskQueue.enqueue({ name, argv, resolve, reject }) taskQueue.enqueue({ name, argv, resolve, reject })
) )
@ -62,6 +50,7 @@ const workers: Array<WorkerWithHooks> = []
export async function launchWorkers(numWorkers: number, context: Object) { export async function launchWorkers(numWorkers: number, context: Object) {
const allInitialized: Array<Promise<void>> = [] const allInitialized: Array<Promise<void>> = []
const ctx = new RpcContext() const ctx = new RpcContext()
const getNextTask = () => taskQueue.dequeue()
for (let i = 0; i < numWorkers; i++) { for (let i = 0; i < numWorkers; i++) {
const workerId = (i + 1).toString().padStart(2, '0') const workerId = (i + 1).toString().padStart(2, '0')
const { promise, resolve } = deferPromise() const { promise, resolve } = deferPromise()
@ -98,12 +87,24 @@ export function updateContext(context: Object) {
} }
// Wait for workers to drain the taskQueue and exit. // Wait for workers to drain the taskQueue and exit.
export function waitWorkers() { export async function stopWorkers(reason: string = 'exit') {
const allClosed = workers.map( const allClosed = workers.map(
(w) => new Promise((res) => w.once('exit', res)) (w) => new Promise((res) => w.once('exit', res))
) )
taskQueue.close() taskQueue.close()
return Promise.all(allClosed) debug('waiting for workers, exiting because', reason)
const success = await Promise.any([
Promise.all(allClosed).then(() => true),
new Promise<false>((res) => setTimeout(() => res(false), 1000))
])
if (!success) {
console.warn('forcefully terminating workers')
for (const w of workers) {
try {
w.terminate()
} catch (e) {}
}
}
} }
/*============================== Worker Thread ==============================*/ /*============================== Worker Thread ==============================*/
@ -184,7 +185,6 @@ async function workerMainLoop() {
reject(e) reject(e)
} }
} }
ctx.reset() ctx.reset()
} }

Loading…
Cancel
Save