From 7aa4a0e9eee3848ed607bfbd4ebc06f8508074ff Mon Sep 17 00:00:00 2001 From: Nicolas Giard Date: Tue, 11 Oct 2022 03:06:16 +0000 Subject: [PATCH] feat: admin scheduler + worker --- server/app/data.yml | 1 + server/core/config.js | 14 +- server/core/db.js | 18 +- server/core/scheduler.js | 139 ++++++++------- server/db/migrations/3.0.0.js | 10 +- server/graph/resolvers/system.js | 26 +-- server/graph/schemas/system.graphql | 3 +- server/tasks/simple/clean-job-history.js | 17 ++ server/tasks/workers/purge-uploads.js | 10 +- server/worker.js | 29 ++- ux/src/i18n/locales/en.json | 11 +- ux/src/pages/AdminScheduler.vue | 215 ++++++++++++++++++++--- 12 files changed, 364 insertions(+), 129 deletions(-) create mode 100644 server/tasks/simple/clean-job-history.js diff --git a/server/app/data.yml b/server/app/data.yml index 403863dc..091e9fb6 100644 --- a/server/app/data.yml +++ b/server/app/data.yml @@ -35,6 +35,7 @@ defaults: scheduledCheck: 300 maxRetries: 5 retryBackoff: 60 + historyExpiration: 90000 # DB defaults api: isEnabled: false diff --git a/server/core/config.js b/server/core/config.js index cd32c511..6108a54a 100644 --- a/server/core/config.js +++ b/server/core/config.js @@ -9,7 +9,7 @@ module.exports = { /** * Load root config from disk */ - init() { + init(silent = false) { let confPaths = { config: path.join(WIKI.ROOTPATH, 'config.yml'), data: path.join(WIKI.SERVERPATH, 'app/data.yml'), @@ -24,7 +24,9 @@ module.exports = { confPaths.config = path.resolve(WIKI.ROOTPATH, process.env.CONFIG_FILE) } - process.stdout.write(chalk.blue(`Loading configuration from ${confPaths.config}... `)) + if (!silent) { + process.stdout.write(chalk.blue(`Loading configuration from ${confPaths.config}... `)) + } let appconfig = {} let appdata = {} @@ -37,7 +39,9 @@ module.exports = { ) appdata = yaml.load(fs.readFileSync(confPaths.data, 'utf8')) appdata.regex = require(confPaths.dataRegex) - console.info(chalk.green.bold(`OK`)) + if (!silent) { + console.info(chalk.green.bold(`OK`)) + } } catch (err) { console.error(chalk.red.bold(`FAILED`)) console.error(err.message) @@ -66,7 +70,9 @@ module.exports = { // Load DB Password from Docker Secret File if (process.env.DB_PASS_FILE) { - console.info(chalk.blue(`DB_PASS_FILE is defined. Will use secret from file.`)) + if (!silent) { + console.info(chalk.blue(`DB_PASS_FILE is defined. Will use secret from file.`)) + } try { appconfig.db.pass = fs.readFileSync(process.env.DB_PASS_FILE, 'utf8').trim() } catch (err) { diff --git a/server/core/db.js b/server/core/db.js index 6b22bd0f..ac45cfb2 100644 --- a/server/core/db.js +++ b/server/core/db.js @@ -4,6 +4,7 @@ const path = require('path') const Knex = require('knex') const fs = require('fs') const Objection = require('objection') +const PGPubSub = require('pg-pubsub') const migrationSource = require('../db/migrator-source') const migrateFromLegacy = require('../db/legacy') @@ -87,7 +88,7 @@ module.exports = { ...WIKI.config.pool, async afterCreate(conn, done) { // -> Set Connection App Name - await conn.query(`set application_name = 'Wiki.js'`) + await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}:MAIN'`) done() } }, @@ -159,9 +160,18 @@ module.exports = { * Subscribe to database LISTEN / NOTIFY for multi-instances events */ async subscribeToNotifications () { - const PGPubSub = require('pg-pubsub') - - this.listener = new PGPubSub(this.knex.client.connectionSettings, { + let connSettings = this.knex.client.connectionSettings + if (typeof connSettings === 'string') { + const encodedName = encodeURIComponent(`Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`) + if (connSettings.indexOf('?') > 0) { + connSettings = `${connSettings}&ApplicationName=${encodedName}` + } else { + connSettings = `${connSettings}?ApplicationName=${encodedName}` + } + } else { + connSettings.application_name = `Wiki.js - ${WIKI.INSTANCE_ID}:PSUB` + } + this.listener = new PGPubSub(connSettings, { log (ev) { WIKI.logger.debug(ev) } diff --git a/server/core/scheduler.js b/server/core/scheduler.js index c0a7fd7e..b26a9f59 100644 --- a/server/core/scheduler.js +++ b/server/core/scheduler.js @@ -13,7 +13,8 @@ module.exports = { scheduledRef: null, tasks: null, async init () { - this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? os.cpus().length : WIKI.config.scheduler.workers + this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? (os.cpus().length - 1) : WIKI.config.scheduler.workers + if (this.maxWorkers < 1) { this.maxWorkers = 1 } WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`) this.workerPool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', { errorHandler: (err) => WIKI.logger.warn(err), @@ -77,80 +78,87 @@ module.exports = { } }, async processJob () { - let jobId = null + let jobIds = [] try { + const availableWorkers = this.maxWorkers - this.activeWorkers + if (availableWorkers < 1) { + WIKI.logger.debug('All workers are busy. Cannot process more jobs at the moment.') + return + } + await WIKI.db.knex.transaction(async trx => { const jobs = await trx('jobs') - .where('id', WIKI.db.knex.raw('(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1)')) + .whereIn('id', WIKI.db.knex.raw(`(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT ${availableWorkers})`)) .returning('*') .del() - if (jobs && jobs.length === 1) { - const job = jobs[0] - WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`) - jobId = job.id - // -> Add to Job History - await WIKI.db.knex('jobHistory').insert({ - id: job.id, - task: job.task, - state: 'active', - useWorker: job.useWorker, - wasScheduled: job.isScheduled, - payload: job.payload, - attempt: job.retries + 1, - maxRetries: job.maxRetries, - createdAt: job.createdAt - }).onConflict('id').merge({ - startedAt: new Date() - }) - // -> Start working on it - try { - if (job.useWorker) { - await this.workerPool.execute({ - id: job.id, - name: job.task, - data: job.payload - }) - } else { - await this.tasks[job.task](job.payload) - } - // -> Update job history (success) - await WIKI.db.knex('jobHistory').where({ - id: job.id - }).update({ - state: 'completed', - completedAt: new Date() - }) - WIKI.logger.info(`Completed job ${job.id}: ${job.task} [ SUCCESS ]`) - } catch (err) { - WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`) - WIKI.logger.warn(err) - // -> Update job history (fail) - await WIKI.db.knex('jobHistory').where({ - id: job.id - }).update({ - state: 'failed', - lastErrorMessage: err.message + if (jobs && jobs.length > 0) { + for (const job of jobs) { + WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`) + // -> Add to Job History + await WIKI.db.knex('jobHistory').insert({ + id: job.id, + task: job.task, + state: 'active', + useWorker: job.useWorker, + wasScheduled: job.isScheduled, + payload: job.payload, + attempt: job.retries + 1, + maxRetries: job.maxRetries, + executedBy: WIKI.INSTANCE_ID, + createdAt: job.createdAt + }).onConflict('id').merge({ + executedBy: WIKI.INSTANCE_ID, + startedAt: new Date() }) - // -> Reschedule for retry - if (job.retries < job.maxRetries) { - const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff - await trx('jobs').insert({ - ...job, - retries: job.retries + 1, - waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(), - updatedAt: new Date() + jobIds.push(job.id) + + // -> Start working on it + try { + if (job.useWorker) { + await this.workerPool.execute({ + ...job, + INSTANCE_ID: `${WIKI.INSTANCE_ID}:WKR` + }) + } else { + await this.tasks[job.task](job.payload) + } + // -> Update job history (success) + await WIKI.db.knex('jobHistory').where({ + id: job.id + }).update({ + state: 'completed', + completedAt: new Date() + }) + WIKI.logger.info(`Completed job ${job.id}: ${job.task}`) + } catch (err) { + WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`) + WIKI.logger.warn(err) + // -> Update job history (fail) + await WIKI.db.knex('jobHistory').where({ + id: job.id + }).update({ + state: 'failed', + lastErrorMessage: err.message }) - WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`) + // -> Reschedule for retry + if (job.retries < job.maxRetries) { + const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff + await trx('jobs').insert({ + ...job, + retries: job.retries + 1, + waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(), + updatedAt: new Date() + }) + WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`) + } } } } }) } catch (err) { WIKI.logger.warn(err) - if (jobId) { - WIKI.db.knex('jobHistory').where({ - id: jobId - }).update({ + if (jobIds && jobIds.length > 0) { + WIKI.db.knex('jobHistory').whereIn('id', jobIds).update({ state: 'interrupted', lastErrorMessage: err.message }) @@ -181,6 +189,7 @@ module.exports = { if (scheduledJobs?.length > 0) { // -> Get existing scheduled jobs const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true) + let totalAdded = 0 for (const job of scheduledJobs) { // -> Get next planned iterations const plannedIterations = cronparser.parseExpression(job.cron, { @@ -205,6 +214,7 @@ module.exports = { notify: false }) addedFutureJobs++ + totalAdded++ } // -> No more iterations for this period or max iterations count reached if (next.done || addedFutureJobs >= 10) { break } @@ -213,6 +223,11 @@ module.exports = { } } } + if (totalAdded > 0) { + WIKI.logger.info(`Scheduled ${totalAdded} new future planned jobs: [ OK ]`) + } else { + WIKI.logger.info(`No new future planned jobs to schedule: [ OK ]`) + } } } }) diff --git a/server/db/migrations/3.0.0.js b/server/db/migrations/3.0.0.js index ae8f4e92..c873b02d 100644 --- a/server/db/migrations/3.0.0.js +++ b/server/db/migrations/3.0.0.js @@ -132,6 +132,7 @@ exports.up = async knex => { table.integer('attempt').notNullable().defaultTo(1) table.integer('maxRetries').notNullable().defaultTo(0) table.text('lastErrorMessage') + table.string('executedBy') table.timestamp('createdAt').notNullable() table.timestamp('startedAt').notNullable().defaultTo(knex.fn.now()) table.timestamp('completedAt') @@ -684,12 +685,17 @@ exports.up = async knex => { await knex('jobSchedule').insert([ { - task: 'updateLocales', + task: 'checkVersion', cron: '0 0 * * *', type: 'system' }, { - task: 'checkVersion', + task: 'cleanJobHistory', + cron: '5 0 * * *', + type: 'system' + }, + { + task: 'updateLocales', cron: '0 0 * * *', type: 'system' } diff --git a/server/graph/resolvers/system.js b/server/graph/resolvers/system.js index bfd8533f..c4642c6c 100644 --- a/server/graph/resolvers/system.js +++ b/server/graph/resolvers/system.js @@ -27,25 +27,13 @@ module.exports = { return WIKI.config.security }, async systemJobs (obj, args) { - switch (args.state) { - case 'ACTIVE': { - // const result = await WIKI.scheduler.boss.fetch('*', 25, { includeMeta: true }) - return [] - } - case 'COMPLETED': { - return [] - } - case 'FAILED': { - return [] - } - case 'INTERRUPTED': { - return [] - } - default: { - WIKI.logger.warn('Invalid Job State requested.') - return [] - } - } + const results = args.states?.length > 0 ? + await WIKI.db.knex('jobHistory').whereIn('state', args.states.map(s => s.toLowerCase())).orderBy('startedAt') : + await WIKI.db.knex('jobHistory').orderBy('startedAt') + return results.map(r => ({ + ...r, + state: r.state.toUpperCase() + })) }, async systemJobsScheduled (obj, args) { return WIKI.db.knex('jobSchedule').orderBy('task') diff --git a/server/graph/schemas/system.graphql b/server/graph/schemas/system.graphql index e9aaed7f..a531fd63 100644 --- a/server/graph/schemas/system.graphql +++ b/server/graph/schemas/system.graphql @@ -8,7 +8,7 @@ extend type Query { systemInfo: SystemInfo systemSecurity: SystemSecurity systemJobs( - state: SystemJobState + states: [SystemJobState] ): [SystemJob] systemJobsScheduled: [SystemJobScheduled] systemJobsUpcoming: [SystemJobUpcoming] @@ -159,6 +159,7 @@ type SystemJob { attempt: Int maxRetries: Int lastErrorMessage: String + executedBy: String createdAt: Date startedAt: Date completedAt: Date diff --git a/server/tasks/simple/clean-job-history.js b/server/tasks/simple/clean-job-history.js new file mode 100644 index 00000000..484642ca --- /dev/null +++ b/server/tasks/simple/clean-job-history.js @@ -0,0 +1,17 @@ +const { DateTime } = require('luxon') + +module.exports = async (payload) => { + WIKI.logger.info('Cleaning scheduler job history...') + + try { + await WIKI.db.knex('jobHistory') + .whereNot('state', 'active') + .andWhere('startedAt', '<=', DateTime.utc().minus({ seconds: WIKI.config.scheduler.historyExpiration }).toISO()) + .del() + + WIKI.logger.info('Cleaned scheduler job history: [ COMPLETED ]') + } catch (err) { + WIKI.logger.error('Cleaning scheduler job history: [ FAILED ]') + WIKI.logger.error(err.message) + } +} diff --git a/server/tasks/workers/purge-uploads.js b/server/tasks/workers/purge-uploads.js index 6b4ce9e2..062e9514 100644 --- a/server/tasks/workers/purge-uploads.js +++ b/server/tasks/workers/purge-uploads.js @@ -2,8 +2,8 @@ const path = require('node:path') const fs = require('fs-extra') const { DateTime } = require('luxon') -module.exports = async (payload, helpers) => { - helpers.logger.info('Purging orphaned upload files...') +module.exports = async ({ payload }) => { + WIKI.logger.info('Purging orphaned upload files...') try { const uplTempPath = path.resolve(WIKI.ROOTPATH, WIKI.config.dataPath, 'uploads') @@ -18,9 +18,9 @@ module.exports = async (payload, helpers) => { } } - helpers.logger.info('Purging orphaned upload files: [ COMPLETED ]') + WIKI.logger.info('Purging orphaned upload files: [ COMPLETED ]') } catch (err) { - helpers.logger.error('Purging orphaned upload files: [ FAILED ]') - helpers.logger.error(err.message) + WIKI.logger.error('Purging orphaned upload files: [ FAILED ]') + WIKI.logger.error(err.message) } } diff --git a/server/worker.js b/server/worker.js index 2309354d..1b8b4e28 100644 --- a/server/worker.js +++ b/server/worker.js @@ -1,6 +1,31 @@ const { ThreadWorker } = require('poolifier') +const { kebabCase } = require('lodash') +const path = require('node:path') + +// ---------------------------------------- +// Init Minimal Core +// ---------------------------------------- + +let WIKI = { + IS_DEBUG: process.env.NODE_ENV === 'development', + ROOTPATH: process.cwd(), + INSTANCE_ID: 'worker', + SERVERPATH: path.join(process.cwd(), 'server'), + Error: require('./helpers/error'), + configSvc: require('./core/config') +} +global.WIKI = WIKI + +WIKI.configSvc.init(true) +WIKI.logger = require('./core/logger').init() + +// ---------------------------------------- +// Execute Task +// ---------------------------------------- module.exports = new ThreadWorker(async (job) => { - // TODO: Call external task file - return { ok: true } + WIKI.INSTANCE_ID = job.INSTANCE_ID + const task = require(`./tasks/workers/${kebabCase(job.task)}.js`) + await task(job) + return true }, { async: true }) diff --git a/ux/src/i18n/locales/en.json b/ux/src/i18n/locales/en.json index c7cf59e8..607ac088 100644 --- a/ux/src/i18n/locales/en.json +++ b/ux/src/i18n/locales/en.json @@ -1523,7 +1523,16 @@ "admin.scheduler.updatedAt": "Last Updated", "common.field.task": "Task", "admin.scheduler.upcomingNone": "There are no upcoming job for the moment.", + "admin.scheduler.failedNone": "There are no recently failed job to display.", "admin.scheduler.waitUntil": "Start", "admin.scheduler.attempt": "Attempt", - "admin.scheduler.useWorker": "Execution Mode" + "admin.scheduler.useWorker": "Execution Mode", + "admin.scheduler.schedule": "Schedule", + "admin.scheduler.state": "State", + "admin.scheduler.startedAt": "Started", + "admin.scheduler.result": "Result", + "admin.scheduler.completedIn": "Completed in {duration}", + "admin.scheduler.pending": "Pending", + "admin.scheduler.error": "Error", + "admin.scheduler.interrupted": "Interrupted" } diff --git a/ux/src/pages/AdminScheduler.vue b/ux/src/pages/AdminScheduler.vue index 34b19df5..1ce37238 100644 --- a/ux/src/pages/AdminScheduler.vue +++ b/ux/src/pages/AdminScheduler.vue @@ -17,8 +17,9 @@ q-page.admin-terminal :text-color='$q.dark.isActive ? `white` : `black`' :color='$q.dark.isActive ? `dark-1` : `white`' :options=`[ - { label: t('admin.scheduler.scheduled'), value: 'scheduled' }, + { label: t('admin.scheduler.schedule'), value: 'scheduled' }, { label: t('admin.scheduler.upcoming'), value: 'upcoming' }, + { label: t('admin.scheduler.active'), value: 'active' }, { label: t('admin.scheduler.completed'), value: 'completed' }, { label: t('admin.scheduler.failed'), value: 'failed' }, ]` @@ -67,14 +68,37 @@ q-page.admin-terminal color='indigo' size='xs' ) - //- q-icon(name='las la-stopwatch', color='primary', size='sm') template(v-slot:body-cell-task='props') q-td(:props='props') strong {{props.value}} div: small.text-grey {{props.row.id}} template(v-slot:body-cell-cron='props') q-td(:props='props') - span {{ props.value }} + q-chip( + square + size='md' + color='blue' + text-color='white' + ) + span.font-robotomono {{ props.value }} + template(v-slot:body-cell-type='props') + q-td(:props='props') + q-chip( + square + size='md' + dense + color='deep-orange' + text-color='white' + ) + small.text-uppercase {{ props.value }} + template(v-slot:body-cell-created='props') + q-td(:props='props') + span {{props.value}} + div: small.text-grey {{humanizeDate(props.row.createdAt)}} + template(v-slot:body-cell-updated='props') + q-td(:props='props') + span {{props.value}} + div: small.text-grey {{humanizeDate(props.row.updatedAt)}} template(v-else-if='state.displayMode === `upcoming`') q-card.rounded-borders( v-if='state.upcomingJobs.length < 1' @@ -97,7 +121,7 @@ q-page.admin-terminal ) template(v-slot:body-cell-id='props') q-td(:props='props') - q-icon(name='las la-chess-knight', color='primary', size='sm') + q-icon(name='las la-clock', color='primary', size='sm') template(v-slot:body-cell-task='props') q-td(:props='props') strong {{props.value}} @@ -133,8 +157,87 @@ q-page.admin-terminal q-card-section.items-center(horizontal) q-card-section.col-auto.q-pr-none q-icon(name='las la-info-circle', size='sm') - q-card-section.text-caption {{ t('admin.scheduler.completedNone') }} - q-card.shadow-1(v-else) --- + q-card-section.text-caption {{ t('admin.scheduler.' + state.displayMode + 'None') }} + q-card.shadow-1(v-else) + q-table( + :rows='state.jobs' + :columns='jobsHeaders' + row-key='name' + flat + hide-bottom + :rows-per-page-options='[0]' + :loading='state.loading > 0' + ) + template(v-slot:body-cell-id='props') + q-td(:props='props') + q-avatar( + v-if='props.row.state === `completed`' + icon='las la-check' + color='positive' + text-color='white' + size='sm' + rounded + ) + q-avatar( + v-else-if='props.row.state === `failed`' + icon='las la-times' + color='negative' + text-color='white' + size='sm' + rounded + ) + q-avatar( + v-else-if='props.row.state === `interrupted`' + icon='las la-square-full' + color='orange' + text-color='white' + size='sm' + rounded + ) + q-circular-progress( + v-else-if='props.row.state === `active`' + indeterminate + size='sm' + :thickness='0.4' + color='blue' + track-color='blue-1' + center-color='blue-2' + ) + template(v-slot:body-cell-task='props') + q-td(:props='props') + strong {{props.value}} + div: small.text-grey {{props.row.id}} + template(v-slot:body-cell-state='props') + q-td(:props='props') + template(v-if='props.value === `completed`') + i18n-t(keypath='admin.scheduler.completedIn', tag='span') + template(#duration) + strong {{humanizeDuration(props.row.startedAt, props.row.completedAt)}} + div: small.text-grey {{ humanizeDate(props.row.completedAt) }} + template(v-else-if='props.value === `active`') + em.text-grey {{ t('admin.scheduler.pending') }} + template(v-else) + strong.text-negative {{ props.value === 'failed' ? t('admin.scheduler.error') : t('admin.scheduler.interrupted') }} + div: small {{ props.row.lastErrorMessage }} + template(v-slot:body-cell-attempt='props') + q-td(:props='props') + span #[strong {{props.value}}] #[span.text-grey / {{props.row.maxRetries}}] + template(v-slot:body-cell-useworker='props') + q-td(:props='props') + template(v-if='props.value') + q-icon(name='las la-microchip', color='brown', size='sm') + small.q-ml-xs.text-brown Worker + template(v-else) + q-icon(name='las la-leaf', color='teal', size='sm') + small.q-ml-xs.text-teal In-Process + template(v-slot:body-cell-date='props') + q-td(:props='props') + span {{props.value}} + div: small.text-grey {{humanizeDate(props.row.startedAt)}} + div + i18n-t.text-grey(keypath='admin.scheduler.createdBy', tag='small') + template(#instance) + strong {{props.row.executedBy}} @@ -143,7 +246,7 @@ import { onMounted, reactive, watch } from 'vue' import { useMeta, useQuasar } from 'quasar' import { useI18n } from 'vue-i18n' import gql from 'graphql-tag' -import { DateTime } from 'luxon' +import { DateTime, Duration, Interval } from 'luxon' import { useSiteStore } from 'src/stores/site' @@ -168,7 +271,7 @@ useMeta({ // DATA const state = reactive({ - displayMode: 'upcoming', + displayMode: 'completed', scheduledJobs: [], upcomingJobs: [], jobs: [], @@ -260,7 +363,7 @@ const upcomingJobsHeaders = [ sortable: true }, { - label: t('admin.scheduler.createdAt'), + label: t('admin.scheduler.scheduled'), align: 'left', field: 'createdAt', name: 'date', @@ -269,6 +372,52 @@ const upcomingJobsHeaders = [ } ] +const jobsHeaders = [ + { + align: 'center', + field: 'id', + name: 'id', + sortable: false, + style: 'width: 15px; padding-right: 0;' + }, + { + label: t('common.field.task'), + align: 'left', + field: 'task', + name: 'task', + sortable: true + }, + { + label: t('admin.scheduler.result'), + align: 'left', + field: 'state', + name: 'state', + sortable: true + }, + { + label: t('admin.scheduler.attempt'), + align: 'left', + field: 'attempt', + name: 'attempt', + sortable: true + }, + { + label: t('admin.scheduler.useWorker'), + align: 'left', + field: 'useWorker', + name: 'useworker', + sortable: true + }, + { + label: t('admin.scheduler.startedAt'), + align: 'left', + field: 'startedAt', + name: 'date', + sortable: true, + format: v => DateTime.fromISO(v).toRelative() + } +] + // WATCHERS watch(() => state.displayMode, (newValue) => { @@ -281,6 +430,17 @@ function humanizeDate (val) { return DateTime.fromISO(val).toFormat('fff') } +function humanizeDuration (start, end) { + const dur = Interval.fromDateTimes(DateTime.fromISO(start), DateTime.fromISO(end)) + .toDuration(['hours', 'minutes', 'seconds', 'milliseconds']) + return Duration.fromObject({ + ...dur.hours > 0 && { hours: dur.hours }, + ...dur.minutes > 0 && { minutes: dur.minutes }, + ...dur.seconds > 0 && { seconds: dur.seconds }, + ...dur.milliseconds > 0 && { milliseconds: dur.milliseconds } + }).toHuman({ unitDisplay: 'narrow', listStyle: 'short' }) +} + async function load () { state.loading++ try { @@ -323,27 +483,36 @@ async function load () { }) state.upcomingJobs = resp?.data?.systemJobsUpcoming } else { + const states = state.displayMode === 'failed' ? ['FAILED', 'INTERRUPTED'] : [state.displayMode.toUpperCase()] const resp = await APOLLO_CLIENT.query({ query: gql` query getSystemJobs ( - $state: SystemJobState! + $states: [SystemJobState] ) { systemJobs ( - state: $state + states: $states ) { - id - name - priority - state + id + task + state + useWorker + wasScheduled + attempt + maxRetries + lastErrorMessage + executedBy + createdAt + startedAt + completedAt } } `, variables: { - state: state.displayMode.toUpperCase() + states }, fetchPolicy: 'network-only' }) - state.jobs = resp?.data?.systemJobs + state.jobs = resp?.data?.systemJobs?.map(j => ({ ...j, state: j.state.toLowerCase() })) } } catch (err) { $q.notify({ @@ -361,15 +530,3 @@ onMounted(() => { load() }) - -