feat: queue handling

pull/621/head
NGPixel 7 years ago
parent 2020e457cf
commit 749766e9bd

@ -42,6 +42,15 @@ redis:
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
# Background Workers # Background Workers
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
# Leave 0 for auto based on CPU cores # Leave 0 for auto based on CPU cores
workers: 0 workers: 0
# ---------------------------------------------------------------------
# High Availability
# ---------------------------------------------------------------------
# Read the docs BEFORE changing these settings!
ha:
nodeuid: primary
readonly: false

@ -21,6 +21,12 @@ defaults:
db: 0 db: 0
password: null password: null
workers: 0 workers: 0
ha:
nodeuid: primary
readonly: false
queues:
- gitSync
- uplClearTemp
authProviders: authProviders:
- local - local
- microsoft - microsoft

@ -48,18 +48,27 @@ if (numWorkers > numCPUs) {
} }
if (cluster.isMaster) { if (cluster.isMaster) {
wiki.logger.info('--------------------------')
wiki.logger.info('Wiki.js is initializing...') wiki.logger.info('Wiki.js is initializing...')
wiki.logger.info('--------------------------')
require('./master') require('./master').then(() => {
// -> Create background workers
for (let i = 0; i < numWorkers; i++) { for (let i = 0; i < numWorkers; i++) {
cluster.fork() cluster.fork()
} }
// -> Queue post-init tasks
wiki.queue.uplClearTemp.add({}, {
repeat: { cron: '*/15 * * * *' }
})
})
cluster.on('exit', (worker, code, signal) => { cluster.on('exit', (worker, code, signal) => {
wiki.logger.info(`Worker #${worker.id} died.`) wiki.logger.info(`Background Worker #${worker.id} was terminated.`)
}) })
} else { } else {
wiki.logger.info(`Background Worker #${cluster.worker.id} is starting...`) wiki.logger.info(`Background Worker #${cluster.worker.id} is initializing...`)
require('./worker') require('./worker')
} }

@ -4,20 +4,23 @@
const Promise = require('bluebird') const Promise = require('bluebird')
wiki.redis = require('./modules/redis').init()
wiki.queue = require('./modules/queue').init()
module.exports = Promise.join( module.exports = Promise.join(
wiki.db.onReady, wiki.db.onReady,
wiki.configSvc.loadFromDb() wiki.configSvc.loadFromDb(),
wiki.queue.clean()
).then(() => { ).then(() => {
// ---------------------------------------- // ----------------------------------------
// Load global modules // Load global modules
// ---------------------------------------- // ----------------------------------------
wiki.disk = require('./modules/disk').init() wiki.disk = require('./modules/disk').init()
wiki.entries = require('./modules/entries').init() wiki.docs = require('./modules/documents').init()
wiki.git = require('./modules/git').init(false) wiki.git = require('./modules/git').init(false)
wiki.lang = require('i18next') wiki.lang = require('i18next')
wiki.mark = require('./modules/markdown') wiki.mark = require('./modules/markdown')
wiki.redis = require('./modules/redis').init()
wiki.search = require('./modules/search').init() wiki.search = require('./modules/search').init()
wiki.upl = require('./modules/uploads').init() wiki.upl = require('./modules/uploads').init()
@ -75,7 +78,7 @@ module.exports = Promise.join(
// Passport Authentication // Passport Authentication
// ---------------------------------------- // ----------------------------------------
require('./modules/auth')(passport) require('./modules/auth').init(passport)
wiki.rights = require('./modules/rights') wiki.rights = require('./modules/rights')
// wiki.rights.init() // wiki.rights.init()

@ -4,7 +4,8 @@
const _ = require('lodash') const _ = require('lodash')
module.exports = (passport) => { module.exports = {
init(passport) {
// Serialization user methods // Serialization user methods
passport.serializeUser(function (user, done) { passport.serializeUser(function (user, done) {
@ -87,4 +88,5 @@ module.exports = (passport) => {
// }) // })
// } else { return true } // } else { return true }
// }) // })
}
} }

@ -13,9 +13,6 @@ module.exports = {
/** /**
* Load root config from disk * Load root config from disk
*
* @param {any} confPaths
* @returns
*/ */
init() { init() {
let confPaths = { let confPaths = {

@ -63,12 +63,32 @@ module.exports = {
require(path.join(dbModelsPath, '_relations.js'))(self) require(path.join(dbModelsPath, '_relations.js'))(self)
// Sync DB // Set init tasks
self.onReady = (wiki.IS_MASTER) ? self.inst.sync({ let initTasks = {
// -> Sync DB Schemas
syncSchemas() {
return self.inst.sync({
force: false, force: false,
logging: false logging: false
}) : Promise.resolve() })
},
// -> Set Connection App Name
setAppName() {
return self.inst.query(`set application_name = 'Wiki.js'`, { raw: true })
}
}
let initTasksQueue = (wiki.IS_MASTER) ? [
initTasks.syncSchemas,
initTasks.setAppName
] : [
initTasks.setAppName
]
// Perform init tasks
self.onReady = Promise.each(initTasksQueue, t => t())
return self return self
} }

@ -10,7 +10,7 @@ const _ = require('lodash')
const entryHelper = require('../helpers/entry') const entryHelper = require('../helpers/entry')
/** /**
* Entries Model * Documents Model
*/ */
module.exports = { module.exports = {

@ -0,0 +1,37 @@
'use strict'
/* global wiki */
const Bull = require('bull')
const Promise = require('bluebird')
module.exports = {
init() {
wiki.data.queues.forEach(queueName => {
this[queueName] = new Bull(queueName, {
prefix: `q-${wiki.config.ha.nodeuid}`,
redis: wiki.config.redis
})
})
return this
},
clean() {
return Promise.each(wiki.data.queues, queueName => {
return new Promise((resolve, reject) => {
let keyStream = wiki.redis.scanStream({
match: `q-${wiki.config.ha.nodeuid}:${queueName}:*`
})
keyStream.on('data', resultKeys => {
if (resultKeys.length > 0) {
wiki.redis.del(resultKeys)
}
})
keyStream.on('end', resolve)
})
}).then(() => {
wiki.logger.info('Purging old queue jobs: OK')
}).catch(err => {
wiki.logger.error(err)
})
}
}

@ -19,7 +19,11 @@ module.exports = {
*/ */
init() { init() {
if (isPlainObject(wiki.config.redis)) { if (isPlainObject(wiki.config.redis)) {
return new Redis(wiki.config.redis) let red = new Redis(wiki.config.redis)
red.on('ready', () => {
wiki.logger.info('Redis connection: OK')
})
return red
} else { } else {
wiki.logger.error('Invalid Redis configuration!') wiki.logger.error('Invalid Redis configuration!')
process.exit(1) process.exit(1)

@ -61,5 +61,8 @@ module.exports = (job, done) => {
}) })
return jobCbStreamDocs return jobCbStreamDocs
}).then(() => {
wiki.logger.info('Git remote repository sync: DONE')
return true
}) })
} }

@ -22,5 +22,8 @@ module.exports = (job, done) => {
} }
}) })
}) })
}).then(() => {
wiki.logger.info('Purging temporary upload files: DONE')
return true
}) })
} }

@ -2,34 +2,34 @@
/* global wiki */ /* global wiki */
const path = require('path') const Promise = require('bluebird')
wiki.REPOPATH = path.resolve(wiki.ROOTPATH, wiki.config.paths.repo)
wiki.DATAPATH = path.resolve(wiki.ROOTPATH, wiki.config.paths.data)
wiki.UPLTEMPPATH = path.join(wiki.DATAPATH, 'temp-upload')
// ---------------------------------------- module.exports = Promise.join(
// Load global modules wiki.db.onReady,
// ---------------------------------------- wiki.configSvc.loadFromDb(['features', 'git', 'logging', 'site', 'uploads'])
).then(() => {
const path = require('path')
// wiki.upl = require('./modules/uploads-agent').init() wiki.REPOPATH = path.resolve(wiki.ROOTPATH, wiki.config.paths.repo)
// wiki.git = require('./modules/git').init() wiki.DATAPATH = path.resolve(wiki.ROOTPATH, wiki.config.paths.data)
// wiki.entries = require('./modules/entries').init() wiki.UPLTEMPPATH = path.join(wiki.DATAPATH, 'temp-upload')
wiki.lang = require('i18next')
wiki.mark = require('./modules/markdown')
// ---------------------------------------- // ----------------------------------------
// Load local modules // Load global modules
// ---------------------------------------- // ----------------------------------------
const Promise = require('bluebird') // wiki.upl = require('./modules/uploads-agent').init()
const i18nBackend = require('i18next-node-fs-backend') // wiki.git = require('./modules/git').init()
// wiki.entries = require('./modules/entries').init()
wiki.lang = require('i18next')
wiki.mark = require('./modules/markdown')
// ---------------------------------------- // ----------------------------------------
// Localization Engine // Localization Engine
// ---------------------------------------- // ----------------------------------------
wiki.lang.use(i18nBackend).init({ const i18nBackend = require('i18next-node-fs-backend')
wiki.lang.use(i18nBackend).init({
load: 'languageOnly', load: 'languageOnly',
ns: ['common', 'admin', 'auth', 'errors', 'git'], ns: ['common', 'admin', 'auth', 'errors', 'git'],
defaultNS: 'common', defaultNS: 'common',
@ -40,31 +40,30 @@ wiki.lang.use(i18nBackend).init({
backend: { backend: {
loadPath: path.join(wiki.SERVERPATH, 'locales/{{lng}}/{{ns}}.json') loadPath: path.join(wiki.SERVERPATH, 'locales/{{lng}}/{{ns}}.json')
} }
}) })
// ---------------------------------------- // ----------------------------------------
// Start Queues // Start Queues
// ---------------------------------------- // ----------------------------------------
const Bull = require('bull') const Bull = require('bull')
const autoload = require('auto-load') const autoload = require('auto-load')
let queues = autoload(path.join(wiki.SERVERPATH, 'queues')) let queues = autoload(path.join(wiki.SERVERPATH, 'queues'))
Promise.join(
wiki.db.onReady
// wiki.upl.initialScan()
).then(() => {
for (let queueName in queues) { for (let queueName in queues) {
new Bull(queueName, { redis: wiki.config.redis }).process(queues[queueName]) new Bull(queueName, {
prefix: `q-${wiki.config.ha.nodeuid}`,
redis: wiki.config.redis
}).process(queues[queueName])
} }
})
// ---------------------------------------- // ----------------------------------------
// Shutdown gracefully // Shutdown gracefully
// ---------------------------------------- // ----------------------------------------
process.on('disconnect', () => { process.on('disconnect', () => {
wiki.logger.warn('Lost connection to Master. Exiting...') wiki.logger.warn('Lost connection to Master. Exiting...')
process.exit() process.exit()
})
}) })

Loading…
Cancel
Save