Merge branch 'perf/local-search' into feat/multithread-render

pull/3386/head
Yuxuan Zhang 2 years ago
commit 046b6530b4
No known key found for this signature in database
GPG Key ID: 6910B04F3351EF7D

@ -98,6 +98,7 @@
"@vueuse/core": "^10.7.1",
"@vueuse/integrations": "^10.7.1",
"focus-trap": "^7.5.4",
"jsdom": "^23.0.1",
"mark.js": "8.11.1",
"minisearch": "^6.3.0",
"shikiji": "^0.9.16",
@ -138,6 +139,7 @@
"@types/escape-html": "^1.0.4",
"@types/fs-extra": "^11.0.4",
"@types/humanize-duration": "^3.27.3",
"@types/jsdom": "^21.1.6",
"@types/lodash.template": "^4.5.3",
"@types/mark.js": "^8.11.12",
"@types/markdown-it-attrs": "^4.1.3",
@ -154,6 +156,7 @@
"conventional-changelog-cli": "^4.1.0",
"cross-spawn": "^7.0.3",
"debug": "^4.3.4",
"dom-traverse": "^0.0.1",
"esbuild": "^0.19.11",
"escape-html": "^1.0.3",
"execa": "^8.0.1",

@ -16,6 +16,7 @@ import { bundle } from './bundle'
import { generateSitemap } from './generateSitemap'
import { renderPage, type RenderPageContext } from './render'
import humanizeDuration from 'humanize-duration'
import { launchWorkers, waitWorkers } from '../worker'
export async function build(
root?: string,
@ -27,6 +28,9 @@ export async function build(
const siteConfig = await resolveConfig(root, 'build', 'production')
const unlinkVue = linkVue()
if (siteConfig.parallel)
launchWorkers(siteConfig.buildConcurrency, { config: siteConfig })
if (buildOptions.base) {
siteConfig.site.base = buildOptions.base
delete buildOptions.base
@ -124,7 +128,7 @@ export async function build(
const pages = ['404.md', ...siteConfig.pages]
if (siteConfig.multithreadRender) {
if (siteConfig.parallel) {
const { default: cluster } = await import('./render-worker')
await cluster(entryPath, context, pages, updateProgress)
} else {
@ -156,6 +160,8 @@ export async function build(
await siteConfig.buildEnd?.(siteConfig)
clearCache()
if (siteConfig.parallel) await waitWorkers()
const timeEnd = performance.now()
const duration = humanizeDuration(timeEnd - timeStart, {
maxDecimalPoints: 2

@ -148,7 +148,7 @@ export async function resolveConfig(
userConfig.buildConcurrency ?? cpus().length,
1 // At least one thread required
),
multithreadRender: userConfig.multithreadRender ?? false
parallel: userConfig.parallel ?? true
}
// to be shared with content loaders

@ -13,6 +13,10 @@ import {
type MarkdownEnv
} from '../shared'
import { processIncludes } from '../utils/processIncludes'
import { updateCurrentTask } from '../utils/task'
import type { PageSplitSection } from '../../../types/local-search'
import { registerWorkload, dispatchWork } from '../worker'
import Queue from '../utils/queue'
const debug = _debug('vitepress:local-search')
@ -122,7 +126,16 @@ export async function localSearchPlugin(
return id
}
async function indexFile(page: string) {
function scanForLocales() {
for (const page of siteConfig.pages) {
const file = path.join(siteConfig.srcDir, page)
const locale = getLocaleForPath(file)
// dry-fetch the index for this locale
getIndexByLocale(locale)
}
}
async function indexFile(page: string, parallel: boolean = false) {
const file = path.join(siteConfig.srcDir, page)
// get file metadata
const fileId = getDocId(file)
@ -133,8 +146,10 @@ export async function localSearchPlugin(
const sections =
// user provided generator
(await options.miniSearch?._splitIntoSections?.(file, html)) ??
// default implementation (parallel)
(parallel ? parallelSplitter(html, fileId) : undefined) ??
// default implementation
splitPageIntoSections(html)
splitPageIntoSections(html, fileId)
// add sections to the locale index
for await (const section of sections) {
if (!section || !(section.text || section.titles)) break
@ -149,14 +164,27 @@ export async function localSearchPlugin(
}
}
async function scanForBuild() {
debug('🔍️ Indexing files for search...')
await pMap(siteConfig.pages, indexFile, {
concurrency: siteConfig.buildConcurrency
})
debug('✅ Indexing finished...')
async function indexAll() {
const concurrency = siteConfig.buildConcurrency
let numIndexed = 0
const updateProgress = () =>
updateCurrentTask(
++numIndexed,
siteConfig.pages.length,
'indexing local search'
)
await pMap(
siteConfig.pages,
(page) => indexFile(page, siteConfig.parallel).then(updateProgress),
{ concurrency }
)
updateCurrentTask()
}
let indexAllPromise: Promise<void> | undefined
return {
name: 'vitepress:local-search',
@ -172,7 +200,6 @@ export async function localSearchPlugin(
async configureServer(_server) {
server = _server
await scanForBuild()
onIndexUpdated()
},
@ -184,25 +211,23 @@ export async function localSearchPlugin(
async load(id) {
if (id === LOCAL_SEARCH_INDEX_REQUEST_PATH) {
if (process.env.NODE_ENV === 'production') {
await scanForBuild()
}
console.log('\n🔍 load', id)
scanForLocales()
let records: string[] = []
for (const [locale] of indexByLocales) {
records.push(
`${JSON.stringify(
locale
)}: () => import('@localSearchIndex${locale}')`
)}: () => import('${LOCAL_SEARCH_INDEX_ID}-${locale}')`
)
}
return `export default {${records.join(',')}}`
} else if (id.startsWith(LOCAL_SEARCH_INDEX_REQUEST_PATH)) {
console.log('\n🔍 load', id)
const locale = id.slice(LOCAL_SEARCH_INDEX_REQUEST_PATH.length + 1)
await (indexAllPromise ??= indexAll())
return `export default ${JSON.stringify(
JSON.stringify(
indexByLocales.get(
id.replace(LOCAL_SEARCH_INDEX_REQUEST_PATH, '')
) ?? {}
)
JSON.stringify(indexByLocales.get(locale) ?? {})
)}`
}
},
@ -217,40 +242,82 @@ export async function localSearchPlugin(
}
}
const headingRegex = /<h(\d*).*?>(.*?<a.*? href="#.*?".*?>.*?<\/a>)<\/h\1>/gi
const headingContentRegex = /(.*?)<a.*? href="#(.*?)".*?>.*?<\/a>/i
/**
* Splits HTML into sections based on headings
*/
function* splitPageIntoSections(html: string) {
const result = html.split(headingRegex)
result.shift()
let parentTitles: string[] = []
for (let i = 0; i < result.length; i += 3) {
const level = parseInt(result[i]) - 1
const heading = result[i + 1]
const headingResult = headingContentRegex.exec(heading)
const title = clearHtmlTags(headingResult?.[1] ?? '').trim()
const anchor = headingResult?.[2] ?? ''
const content = result[i + 2]
if (!title || !content) continue
const titles = parentTitles.slice(0, level)
titles[level] = title
yield { anchor, titles, text: getSearchableText(content) }
if (level === 0) {
parentTitles = [title]
} else {
parentTitles[level] = title
}
}
async function* splitPageIntoSections(html: string, fileId: string) {
const { JSDOM } = await import('jsdom')
const { default: traverse, Node } = await import('dom-traverse')
const dom = JSDOM.fragment(html)
// Stack of title hierarchy for current working section
const titleStack: Array<{ level: number; text: string }> = []
// Set of all used ids (for duplicate id detection)
const existingIdSet = new Set()
// Current working section
let section: PageSplitSection = { text: '', titles: [''] }
function submit() {
section.text = section.text.replace(/\W+/gs, ' ').trim()
return section
}
// Traverse the DOM
for (const [node, skipChildren] of traverse.skippable(dom)) {
if (node.nodeType === Node.ELEMENT_NODE) {
const el = node as Element
if (!/^H\d+$/i.test(el.tagName)) continue
if (!el.hasAttribute('id')) continue
const id = el.getAttribute('id')!
if (existingIdSet.has(id)) {
console.error(`\x1b[2K\r⚠ Duplicate heading id "${id}" in ${fileId}`)
continue
}
existingIdSet.add(id)
// Submit previous section
if (section.text || section.anchor) yield submit()
// Pop adjacent titles depending on level
const level = parseInt(el.tagName.slice(1))
while (titleStack.length > 0) {
if (titleStack.at(-1)!.level >= level) titleStack.pop()
else break
}
titleStack.push({ level, text: el.textContent ?? '' })
// Create new section
section = {
text: '',
anchor: id,
titles: titleStack.map((_) => _.text)
}
skipChildren()
} else if (node.nodeType === Node.TEXT_NODE) {
// Collect text content
section.text += node.textContent
}
}
// Submit last section
yield submit()
}
function getSearchableText(content: string) {
content = clearHtmlTags(content)
return content
// Worker proxy in main thread
function parallelSplitter(html: string, fileId: string) {
const queue = new Queue<PageSplitSection>()
dispatchWork(
'local-search::split',
html,
fileId,
queue.enqueue.bind(queue),
queue.close.bind(queue)
)
return queue.items()
}
function clearHtmlTags(str: string) {
return str.replace(/<[^>]*>/g, '')
// Worker proxy in worker thread
registerWorkload(
'local-search::split',
async (
html: string,
fileId: string,
_yield: (section: PageSplitSection) => Promise<void>,
_end: () => Promise<void>
) => {
for await (const section of splitPageIntoSections(html, fileId)) {
await _yield(section)
}
await _end()
}
)

@ -1,6 +1,7 @@
import { createServer as createViteServer, type ServerOptions } from 'vite'
import { resolveConfig } from './config'
import { createVitePressPlugin } from './plugin'
import { launchWorkers } from './worker'
export async function createServer(
root: string = process.cwd(),
@ -9,6 +10,9 @@ export async function createServer(
) {
const config = await resolveConfig(root)
if (config.parallel)
launchWorkers(config.buildConcurrency, { config: config })
if (serverOptions.base) {
config.site.base = serverOptions.base
delete serverOptions.base

@ -157,11 +157,16 @@ export interface UserConfig<ThemeConfig = any>
buildConcurrency?: number
/**
* This option allows you to enable or disable the multithread render.
* This option is the general switch for enabling parallel computing. When
* enabled, vitepress will create worker threads and distribute workload to
* them. Currently, the following features are supported:
* 1. Parallel SPA Bundling
* 2. Parallel SSR Rendering
* 3. Parallel Local Search Indexing (when using default splitter)
* @experimental
* @default false
* @default true
*/
multithreadRender?: boolean
parallel?: boolean
/**
* @experimental
@ -257,5 +262,5 @@ export interface SiteConfig<ThemeConfig = any>
logger: Logger
userConfig: UserConfig
buildConcurrency: number
multithreadRender: boolean
parallel: boolean
}

@ -0,0 +1,36 @@
// Asynchronous queue with a close method
export default class Queue<T> {
private queue: Array<T> = []
private pending: Array<(data: T | null) => void> = []
#closed: boolean = false
get closed() {
return this.#closed
}
async *items() {
while (true) {
const item = await this.dequeue()
if (item === null) break
yield item
}
}
enqueue(data: T) {
if (this.closed)
throw new Error(`Failed to enqueue ${data}, queue already closed`)
if (this.pending.length) this.pending.shift()!(data)
else this.queue.push(data)
}
async dequeue(): Promise<T | null> {
if (this.closed) return null
if (this.queue.length) return this.queue.shift()!
return new Promise((res) => this.pending.push(res))
}
close() {
this.#closed = true
for (const res of this.pending) res(null)
}
}

@ -0,0 +1,114 @@
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'
import RpcContext from 'rpc-magic-proxy'
import Queue from './utils/queue'
const WORKER_MAGIC = '::vitepress::build-worker::'
interface WorkerTask {
name: string
argv: any[]
resolve: (retVal: any) => void
reject: (error?: any) => void
}
/*=============================== Main Thread ===============================*/
// Owned by main thread, will be distributed to workers
const taskQueue = new Queue<WorkerTask>()
// This function will be exposed to workers via magic proxy
function getNextTask() {
return taskQueue.dequeue()
}
export function dispatchWork(name: string, ...argv: any[]): Promise<any> {
return new Promise((resolve, reject) =>
taskQueue.enqueue({ name, argv, resolve, reject })
)
}
const workers: Worker[] = []
export async function launchWorkers(numWorkers: number, context: Object) {
const ctx = new RpcContext()
const workerData = await ctx.serialize({
[WORKER_MAGIC]: '',
getNextTask,
context
})
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(new URL(import.meta.url), { workerData })
ctx.bind(worker)
workers.push(worker)
}
}
export function updateContext(context: Object) {
for (const worker of workers) {
worker.postMessage({
[WORKER_MAGIC]: 'update/context',
context
})
}
}
// Wait for workers to drain the taskQueue and exit.
export function waitWorkers() {
const allClosed = workers.map(
(w) => new Promise((res) => w.once('exit', res))
)
taskQueue.close()
return Promise.all(allClosed)
}
/*============================== Worker Thread ==============================*/
const registry: Map<string, { main: Function; init?: Function }> = new Map()
export function registerWorkload(
name: string,
main: (...argv: any[]) => any,
init?: () => void
) {
// Only register workload in worker threads
if (isMainThread) return
if (registry.has(name)) {
throw new Error(`Workload "${name}" already registered.`)
}
registry.set(name, { main, init })
}
// Will keep querying next workload from main thread
async function workerMain() {
const ctx = new RpcContext(parentPort!)
const {
getNextTask,
context
}: {
getNextTask: () => Promise<WorkerTask | null>
context: Object
} = ctx.deserialize(workerData)
parentPort!.on('message', (msg) => {
if (msg?.[WORKER_MAGIC] === 'update/context') {
Object.assign(context, msg.context)
}
})
while (true) {
const task = await getNextTask()
if (task === null) break
const { name, argv, resolve, reject } = task
if (!registry.has(name)) throw new Error(`No task "${name}" registered.`)
const { main, init } = registry.get(name)!
if (init) {
init.apply(context)
delete registry.get(name)!.init
}
await (main.apply(context, argv) as Promise<any>).then(resolve, reject)
}
ctx.reset()
process.exit(0)
}
if (!isMainThread && WORKER_MAGIC in workerData) workerMain()
Loading…
Cancel
Save