|
|
|
@ -11,6 +11,25 @@ interface WorkerTask {
|
|
|
|
|
reject: (error?: any) => void
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
interface WorkerHooks {
|
|
|
|
|
// Update worker's context
|
|
|
|
|
context: (ctx: Object | null) => void
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function deferPromise<T>(): {
|
|
|
|
|
promise: Promise<T>
|
|
|
|
|
resolve: (val: T) => void
|
|
|
|
|
reject: (error?: any) => void
|
|
|
|
|
} {
|
|
|
|
|
let resolve: (val: T) => void
|
|
|
|
|
let reject: (error?: any) => void
|
|
|
|
|
const promise = new Promise<T>((res, rej) => {
|
|
|
|
|
resolve = res
|
|
|
|
|
reject = rej
|
|
|
|
|
})
|
|
|
|
|
return { promise, resolve: resolve!, reject: reject! }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*=============================== Main Thread ===============================*/
|
|
|
|
|
|
|
|
|
|
// Owned by main thread, will be distributed to workers
|
|
|
|
@ -27,29 +46,35 @@ export function dispatchWork(name: string, ...argv: any[]): Promise<any> {
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const workers: Worker[] = []
|
|
|
|
|
const workers: Array<Worker & { hooks: WorkerHooks }> = []
|
|
|
|
|
|
|
|
|
|
export async function launchWorkers(numWorkers: number, context: Object) {
|
|
|
|
|
const allInitialized: Array<Promise<void>> = []
|
|
|
|
|
const ctx = new RpcContext()
|
|
|
|
|
const workerData = await ctx.serialize({
|
|
|
|
|
[WORKER_MAGIC]: '',
|
|
|
|
|
getNextTask,
|
|
|
|
|
context
|
|
|
|
|
})
|
|
|
|
|
for (let i = 0; i < numWorkers; i++) {
|
|
|
|
|
const worker = new Worker(new URL(import.meta.url), { workerData })
|
|
|
|
|
const { promise, resolve } = deferPromise<void>()
|
|
|
|
|
const initWorkerHooks = (hooks: WorkerHooks) => {
|
|
|
|
|
worker.hooks = hooks
|
|
|
|
|
resolve()
|
|
|
|
|
}
|
|
|
|
|
const payload = await ctx.serialize({
|
|
|
|
|
initWorkerHooks,
|
|
|
|
|
getNextTask,
|
|
|
|
|
context
|
|
|
|
|
})
|
|
|
|
|
const worker = new Worker(new URL(import.meta.url), {
|
|
|
|
|
workerData: { [WORKER_MAGIC]: payload }
|
|
|
|
|
}) as Worker & { hooks: WorkerHooks }
|
|
|
|
|
ctx.bind(worker)
|
|
|
|
|
workers.push(worker)
|
|
|
|
|
allInitialized.push(promise)
|
|
|
|
|
worker.on('error', console.error)
|
|
|
|
|
}
|
|
|
|
|
await Promise.all(allInitialized)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export function updateContext(context: Object) {
|
|
|
|
|
for (const worker of workers) {
|
|
|
|
|
worker.postMessage({
|
|
|
|
|
[WORKER_MAGIC]: 'update/context',
|
|
|
|
|
context
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return Promise.all(workers.map(({ hooks }) => hooks.context(context)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for workers to drain the taskQueue and exit.
|
|
|
|
@ -82,16 +107,20 @@ export function registerWorkload(
|
|
|
|
|
async function workerMain() {
|
|
|
|
|
const ctx = new RpcContext(parentPort!)
|
|
|
|
|
const {
|
|
|
|
|
initWorkerHooks,
|
|
|
|
|
getNextTask,
|
|
|
|
|
context
|
|
|
|
|
}: {
|
|
|
|
|
getNextTask: () => Promise<WorkerTask | null>
|
|
|
|
|
initWorkerHooks: (hooks: Object) => Promise<void>
|
|
|
|
|
context: Object
|
|
|
|
|
} = ctx.deserialize(workerData)
|
|
|
|
|
|
|
|
|
|
parentPort!.on('message', (msg) => {
|
|
|
|
|
if (msg?.[WORKER_MAGIC] === 'update/context') {
|
|
|
|
|
Object.assign(context, msg.context)
|
|
|
|
|
} = ctx.deserialize(workerData[WORKER_MAGIC])
|
|
|
|
|
// Upon worker initialization, report back the hooks that main thread can use
|
|
|
|
|
// to reach this worker.
|
|
|
|
|
await initWorkerHooks({
|
|
|
|
|
context(ctx: Object | null) {
|
|
|
|
|
if (ctx === null) for (const k in context) delete (context as any)[k]
|
|
|
|
|
else Object.assign(context, ctx)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
@ -100,15 +129,25 @@ async function workerMain() {
|
|
|
|
|
if (task === null) break
|
|
|
|
|
const { name, argv, resolve, reject } = task
|
|
|
|
|
if (!registry.has(name)) throw new Error(`No task "${name}" registered.`)
|
|
|
|
|
const { main, init } = registry.get(name)!
|
|
|
|
|
const el = registry.get(name)!
|
|
|
|
|
const { main, init } = el
|
|
|
|
|
if (init) {
|
|
|
|
|
init.apply(context)
|
|
|
|
|
delete registry.get(name)!.init
|
|
|
|
|
try {
|
|
|
|
|
await init.apply(context)
|
|
|
|
|
} catch (e) {
|
|
|
|
|
console.error(`worker: failed to init workload "${name}": ${e}`)
|
|
|
|
|
}
|
|
|
|
|
el.init = undefined
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
resolve(await main.apply(context, argv))
|
|
|
|
|
} catch (e) {
|
|
|
|
|
console.error(`worker: task "${name}" error`, e)
|
|
|
|
|
reject(e)
|
|
|
|
|
}
|
|
|
|
|
await (main.apply(context, argv) as Promise<any>).then(resolve, reject)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx.reset()
|
|
|
|
|
process.exit(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!isMainThread && WORKER_MAGIC in workerData) workerMain()
|
|
|
|
|