From d86be7ae232f2710efde2622e97432df4923cf92 Mon Sep 17 00:00:00 2001 From: Yuxuan Zhang Date: Wed, 3 Jan 2024 05:23:03 -0500 Subject: [PATCH] render and mini search splitter both working under the new API --- package.json | 2 +- pnpm-lock.yaml | 8 +-- src/node/build/build.ts | 53 ++++++++++++----- src/node/build/render-worker.ts | 4 +- src/node/config.ts | 4 +- src/node/plugins/localSearchPlugin.ts | 6 +- src/node/server.ts | 3 +- src/node/siteConfig.ts | 7 ++- src/node/worker.ts | 85 +++++++++++++++++++-------- 9 files changed, 115 insertions(+), 57 deletions(-) diff --git a/package.json b/package.json index 108c6de0..83a6532b 100644 --- a/package.json +++ b/package.json @@ -193,7 +193,7 @@ "rollup": "^4.9.2", "rollup-plugin-dts": "^6.1.0", "rollup-plugin-esbuild": "^6.1.0", - "rpc-magic-proxy": "0.0.0-beta.3", + "rpc-magic-proxy": "^1.0.2", "semver": "^7.5.4", "simple-git-hooks": "^2.9.0", "sirv": "^2.0.4", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8ab80fe3..34b61d87 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -280,8 +280,8 @@ importers: specifier: ^6.1.0 version: 6.1.0(esbuild@0.19.11)(rollup@4.9.2)(supports-color@9.4.0) rpc-magic-proxy: - specifier: 0.0.0-beta.3 - version: 0.0.0-beta.3 + specifier: ^1.0.2 + version: 1.0.2 semver: specifier: ^7.5.4 version: 7.5.4 @@ -4016,8 +4016,8 @@ packages: '@rollup/rollup-win32-x64-msvc': 4.9.2 fsevents: 2.3.3 - /rpc-magic-proxy@0.0.0-beta.3: - resolution: {integrity: sha512-k1hVDnaX4TBxVWpW6tRoc3qCyuuJ8luOeK4ArqkFJKinBa0yVnZzgYVtvLzC9iWnVzNKIQNNsxkth445nKWL1Q==} + /rpc-magic-proxy@1.0.2: + resolution: {integrity: sha512-JsYJRMVi1rbkcA3ByfuwdkfB2eTXWrmTy4PM6ayQiqTiG6gWaV3hQD4EpHrCFw8CHqmqGNFPMe3iOWvqM071PA==} dev: true /rrweb-cssom@0.6.0: diff --git a/src/node/build/build.ts b/src/node/build/build.ts index 526d4850..53b90f00 100644 --- a/src/node/build/build.ts +++ b/src/node/build/build.ts @@ -9,7 +9,7 @@ import { pathToFileURL } from 'url' import type { BuildOptions, Rollup } from 'vite' import { resolveConfig, type SiteConfig } from '../config' import { clearCache } from '../markdownToVue' -import { slash, type HeadConfig } from '../shared' +import { slash, type HeadConfig, type SSGContext } from '../shared' import { deserializeFunctions, serializeFunctions } from '../utils/fnSerialize' import { task } from '../utils/task' import { bundle } from './bundle' @@ -17,6 +17,23 @@ import { generateSitemap } from './generateSitemap' import { renderPage, type RenderPageContext } from './render' import humanizeDuration from 'humanize-duration' import { launchWorkers, waitWorkers } from '../worker' +import { registerWorkload, updateContext, dispatchWork } from '../worker' + +type RenderFn = (path: string) => Promise + +// Worker proxy (worker thread) +registerWorkload( + 'build::render-page', + function workload( + this: RenderPageContext & { render: RenderFn }, + page: string + ) { + return renderPage(this.render, page, this) + }, + async function init(this: { renderEntry: string; render: RenderFn }) { + this.render = (await import(this.renderEntry)).render as RenderFn + } +) export async function build( root?: string, @@ -29,7 +46,7 @@ export async function build( const unlinkVue = linkVue() if (siteConfig.parallel) - launchWorkers(siteConfig.buildConcurrency, { config: siteConfig }) + launchWorkers(siteConfig.concurrency, { config: siteConfig }) if (buildOptions.base) { siteConfig.site.base = buildOptions.base @@ -57,7 +74,7 @@ export async function build( } await task('rendering pages', async (updateProgress) => { - const entryPath = + const renderEntry = pathToFileURL(path.join(siteConfig.tempDir, 'app.js')).toString() + '?t=' + Date.now() @@ -126,23 +143,27 @@ export async function build( additionalHeadTags } - const pages = ['404.md', ...siteConfig.pages] + let task: (page: string) => Promise if (siteConfig.parallel) { - const { default: cluster } = await import('./render-worker') - await cluster(entryPath, context, pages, updateProgress) + const { config, ...additionalContext } = context + await updateContext({ renderEntry, ...additionalContext }) + console.log('all context updated') + task = (page) => dispatchWork('build::render-page', page) } else { - let count_done = 0 - const { render } = await import(entryPath) - await pMap( - pages, - async (page) => { - await renderPage(render, page, context) - updateProgress(++count_done, pages.length) - }, - { concurrency: siteConfig.buildConcurrency } - ) + const { render } = await import(renderEntry) + task = (page) => renderPage(render, page, context) } + + const pages = ['404.md', ...siteConfig.pages] + let count_done = 0 + await pMap( + pages, + (page) => task(page).then(updateProgress(++count_done, pages.length)), + { + concurrency: siteConfig.concurrency + } + ) }) // emit page hash map for the case where a user session is open diff --git a/src/node/build/render-worker.ts b/src/node/build/render-worker.ts index 48dfad13..ab799093 100644 --- a/src/node/build/render-worker.ts +++ b/src/node/build/render-worker.ts @@ -17,7 +17,7 @@ export default async function cluster( // - Excess worker will cause too much RPC workload for main thread, // therefore harm the overall performance. const concurrency = Math.round( - Math.max((context.config.buildConcurrency - 1) / 1.5, 1) + Math.max((context.config.concurrency - 1) / 1.5, 1) ) const num_tasks = pages.length @@ -76,7 +76,7 @@ async function renderWorker() { await renderPage(render, page, context) } } - const concurrency = Math.max(context.config.buildConcurrency, 1) + const concurrency = Math.max(context.config.concurrency, 1) await Promise.all(Array.from({ length: concurrency }, () => executor())) } catch (e) { console.error(e) diff --git a/src/node/config.ts b/src/node/config.ts index 9408d1fa..7a6aa084 100644 --- a/src/node/config.ts +++ b/src/node/config.ts @@ -144,8 +144,8 @@ export async function resolveConfig( rewrites, userConfig, sitemap: userConfig.sitemap, - buildConcurrency: Math.max( - userConfig.buildConcurrency ?? cpus().length, + concurrency: Math.max( + userConfig.concurrency ?? Math.round(cpus().length / 1.5), 1 // At least one thread required ), parallel: userConfig.parallel ?? true diff --git a/src/node/plugins/localSearchPlugin.ts b/src/node/plugins/localSearchPlugin.ts index 88d0f543..6becb146 100644 --- a/src/node/plugins/localSearchPlugin.ts +++ b/src/node/plugins/localSearchPlugin.ts @@ -163,7 +163,7 @@ export async function localSearchPlugin( } async function indexAll() { - const concurrency = siteConfig.buildConcurrency + const concurrency = siteConfig.concurrency let numIndexed = 0 const updateProgress = () => @@ -174,7 +174,7 @@ export async function localSearchPlugin( ) await pMap( siteConfig.pages, - (page) => indexFile(page, siteConfig.parallel).then(updateProgress), + (page) => indexFile(page, !!siteConfig.parallel).then(updateProgress), { concurrency } ) @@ -209,7 +209,6 @@ export async function localSearchPlugin( async load(id) { if (id === LOCAL_SEARCH_INDEX_REQUEST_PATH) { - console.log('\n🔍️ load', id) scanForLocales() let records: string[] = [] for (const [locale] of indexByLocales) { @@ -221,7 +220,6 @@ export async function localSearchPlugin( } return `export default {${records.join(',')}}` } else if (id.startsWith(LOCAL_SEARCH_INDEX_REQUEST_PATH)) { - console.log('\n🔍️ load', id) const locale = id.slice(LOCAL_SEARCH_INDEX_REQUEST_PATH.length + 1) await (indexAllPromise ??= indexAll()) return `export default ${JSON.stringify( diff --git a/src/node/server.ts b/src/node/server.ts index 8e6f2349..d1792edd 100644 --- a/src/node/server.ts +++ b/src/node/server.ts @@ -10,8 +10,7 @@ export async function createServer( ) { const config = await resolveConfig(root) - if (config.parallel) - launchWorkers(config.buildConcurrency, { config: config }) + if (config.parallel) launchWorkers(config.concurrency, { config: config }) if (serverOptions.base) { config.site.base = serverOptions.base diff --git a/src/node/siteConfig.ts b/src/node/siteConfig.ts index 7881993f..2a861aec 100644 --- a/src/node/siteConfig.ts +++ b/src/node/siteConfig.ts @@ -150,11 +150,12 @@ export interface UserConfig /** * This option allows you to configure the concurrency of the build. * A lower number will reduce the memory usage but will increase the build time. + * When parallel is enabled, this option indicates the number of threads. * * @experimental - * @default "Number of CPU cores available" + * @default "Number of CPU cores available / 150%" */ - buildConcurrency?: number + concurrency?: number /** * This option is the general switch for enabling parallel computing. When @@ -261,6 +262,6 @@ export interface SiteConfig } logger: Logger userConfig: UserConfig - buildConcurrency: number + concurrency: number parallel: boolean } diff --git a/src/node/worker.ts b/src/node/worker.ts index 359af0df..4b7e4e97 100644 --- a/src/node/worker.ts +++ b/src/node/worker.ts @@ -11,6 +11,25 @@ interface WorkerTask { reject: (error?: any) => void } +interface WorkerHooks { + // Update worker's context + context: (ctx: Object | null) => void +} + +function deferPromise(): { + promise: Promise + resolve: (val: T) => void + reject: (error?: any) => void +} { + let resolve: (val: T) => void + let reject: (error?: any) => void + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + return { promise, resolve: resolve!, reject: reject! } +} + /*=============================== Main Thread ===============================*/ // Owned by main thread, will be distributed to workers @@ -27,29 +46,35 @@ export function dispatchWork(name: string, ...argv: any[]): Promise { ) } -const workers: Worker[] = [] +const workers: Array = [] export async function launchWorkers(numWorkers: number, context: Object) { + const allInitialized: Array> = [] const ctx = new RpcContext() - const workerData = await ctx.serialize({ - [WORKER_MAGIC]: '', - getNextTask, - context - }) for (let i = 0; i < numWorkers; i++) { - const worker = new Worker(new URL(import.meta.url), { workerData }) + const { promise, resolve } = deferPromise() + const initWorkerHooks = (hooks: WorkerHooks) => { + worker.hooks = hooks + resolve() + } + const payload = await ctx.serialize({ + initWorkerHooks, + getNextTask, + context + }) + const worker = new Worker(new URL(import.meta.url), { + workerData: { [WORKER_MAGIC]: payload } + }) as Worker & { hooks: WorkerHooks } ctx.bind(worker) workers.push(worker) + allInitialized.push(promise) + worker.on('error', console.error) } + await Promise.all(allInitialized) } export function updateContext(context: Object) { - for (const worker of workers) { - worker.postMessage({ - [WORKER_MAGIC]: 'update/context', - context - }) - } + return Promise.all(workers.map(({ hooks }) => hooks.context(context))) } // Wait for workers to drain the taskQueue and exit. @@ -82,16 +107,20 @@ export function registerWorkload( async function workerMain() { const ctx = new RpcContext(parentPort!) const { + initWorkerHooks, getNextTask, context }: { getNextTask: () => Promise + initWorkerHooks: (hooks: Object) => Promise context: Object - } = ctx.deserialize(workerData) - - parentPort!.on('message', (msg) => { - if (msg?.[WORKER_MAGIC] === 'update/context') { - Object.assign(context, msg.context) + } = ctx.deserialize(workerData[WORKER_MAGIC]) + // Upon worker initialization, report back the hooks that main thread can use + // to reach this worker. + await initWorkerHooks({ + context(ctx: Object | null) { + if (ctx === null) for (const k in context) delete (context as any)[k] + else Object.assign(context, ctx) } }) @@ -100,15 +129,25 @@ async function workerMain() { if (task === null) break const { name, argv, resolve, reject } = task if (!registry.has(name)) throw new Error(`No task "${name}" registered.`) - const { main, init } = registry.get(name)! + const el = registry.get(name)! + const { main, init } = el if (init) { - init.apply(context) - delete registry.get(name)!.init + try { + await init.apply(context) + } catch (e) { + console.error(`worker: failed to init workload "${name}": ${e}`) + } + el.init = undefined + } + try { + resolve(await main.apply(context, argv)) + } catch (e) { + console.error(`worker: task "${name}" error`, e) + reject(e) } - await (main.apply(context, argv) as Promise).then(resolve, reject) } + ctx.reset() - process.exit(0) } if (!isMainThread && WORKER_MAGIC in workerData) workerMain()