From 81ff24b9b66fd54863125f6d79e7826ee381767d Mon Sep 17 00:00:00 2001
From: Nick <github@ngpixel.com>
Date: Sat, 23 Mar 2019 18:18:36 -0400
Subject: [PATCH] feat: elasticsearch engine

---
 .../search/elasticsearch/definition.yml       |  20 +-
 server/modules/search/elasticsearch/engine.js | 299 +++++++++++++++++-
 2 files changed, 299 insertions(+), 20 deletions(-)

diff --git a/server/modules/search/elasticsearch/definition.yml b/server/modules/search/elasticsearch/definition.yml
index 928f3b12..b66b04d4 100644
--- a/server/modules/search/elasticsearch/definition.yml
+++ b/server/modules/search/elasticsearch/definition.yml
@@ -4,7 +4,7 @@ description: Elasticsearch is a distributed, RESTful search and analytics engine
 author: requarks.io
 logo: https://static.requarks.io/logo/elasticsearch.svg
 website: https://www.elastic.co/products/elasticsearch
-isAvailable: false
+isAvailable: true
 props:
   apiVersion:
     type: String
@@ -17,29 +17,37 @@ props:
       - '6.4'
       - '6.3'
     default: '6.6'
-  host:
+  hosts:
     type: String
     title: Host(s)
-    hint: Comma-separated list of Elasticsearch hosts to connect to
+    hint: Comma-separated list of Elasticsearch hosts to connect to. (including the port)
     order: 2
   user:
     type: String
     title: Username
+    hint: (Optional) Username to use if using the security feature from X-Pack
     order: 3
   pass:
     type: String
     title: Password
+    hint: (Optional) Password to use if using the security feature from X-Pack
     order: 4
-  sniff:
+  indexName:
+    type: String
+    title: Index Name
+    hint: The index name to use during creation
+    default: wiki
+    order: 5
+  sniffOnStart:
     type: Boolean
     title: Sniff on start
     hint: 'Should Wiki.js attempt to detect the rest of the cluster on first connect? (Default: off)'
     default: false
-    order: 5
+    order: 6
   sniffInterval:
     type: Number
     title: Sniff Interval
     hint: '0 = disabled, Interval in seconds to check for updated list of nodes in cluster. (Default: 0)'
-    order: 6
     default: 0
+    order: 7
 
diff --git a/server/modules/search/elasticsearch/engine.js b/server/modules/search/elasticsearch/engine.js
index e7369ccd..19fa8963 100644
--- a/server/modules/search/elasticsearch/engine.js
+++ b/server/modules/search/elasticsearch/engine.js
@@ -1,26 +1,297 @@
-module.exports = {
-  activate() {
+const _ = require('lodash')
+const elasticsearch = require('elasticsearch')
+const { pipeline, Transform } = require('stream')
 
-  },
-  deactivate() {
+/* global WIKI */
 
+module.exports = {
+  async activate() {
+    // not used
   },
-  query() {
-
+  async deactivate() {
+    // not used
   },
-  created() {
+  /**
+   * INIT
+   */
+  async init() {
+    WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
+    this.client = new elasticsearch.Client({
+      apiVersion: this.config.apiVersion,
+      hosts: this.config.hosts.split(',').map(_.trim),
+      httpAuth: (this.config.user.length > 0) ? `${this.config.user}:${this.config.pass}` : null,
+      sniffOnStart: this.config.sniffOnStart,
+      sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false
+    })
 
-  },
-  updated() {
+    // -> Create Search Index
+    await this.createIndex()
 
+    WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
   },
-  deleted() {
-
+  /**
+   * Create Index
+   */
+  async createIndex() {
+    const indexExists = await this.client.indices.exists({ index: this.config.indexName })
+    if (!indexExists) {
+      WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
+      await this.client.indices.create({
+        index: this.config.indexName,
+        body: {
+          mappings: {
+            _doc: {
+              properties: {
+                suggest: { type: 'completion' },
+                title: { type: 'text', boost: 4.0 },
+                description: { type: 'text', boost: 3.0 },
+                content: { type: 'text', boost: 1.0 },
+                locale: { type: 'keyword' },
+                path: { type: 'text' }
+              }
+            }
+          }
+        }
+      })
+    }
   },
-  renamed() {
-
+  /**
+   * QUERY
+   *
+   * @param {String} q Query
+   * @param {Object} opts Additional options
+   */
+  async query(q, opts) {
+    try {
+      const results = await this.client.search({
+        index: this.config.indexName,
+        body: {
+          query: {
+            simple_query_string: {
+              query: q
+            }
+          },
+          from: 0,
+          size: 50,
+          _source: ['title', 'description', 'path', 'locale'],
+          suggest: {
+            suggestions: {
+              text: q,
+              completion: {
+                field: 'suggest',
+                size: 5,
+                skip_duplicates: true,
+                fuzzy: true
+              }
+            }
+          }
+        }
+      })
+      return {
+        results: _.get(results, 'hits.hits', []).map(r => ({
+          id: r._id,
+          locale: r._source.locale,
+          path: r._source.path,
+          title: r._source.title,
+          description: r._source.description
+        })),
+        suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
+        totalHits: results.hits.total
+      }
+    } catch (err) {
+      WIKI.logger.warn('Search Engine Error:')
+      WIKI.logger.warn(err)
+    }
+  },
+  /**
+   * Build suggest field
+   */
+  buildSuggest(page) {
+    return _.uniq(_.concat(
+      page.title.split(' ').map(s => ({
+        input: s,
+        weight: 4
+      })),
+      page.description.split(' ').map(s => ({
+        input: s,
+        weight: 3
+      })),
+      page.content.split(' ').map(s => ({
+        input: s,
+        weight: 1
+      }))
+    ))
+  },
+  /**
+   * CREATE
+   *
+   * @param {Object} page Page to create
+   */
+  async created(page) {
+    await this.client.index({
+      index: this.config.indexName,
+      type: '_doc',
+      id: page.hash,
+      body: {
+        suggest: this.buildSuggest(page),
+        locale: page.localeCode,
+        path: page.path,
+        title: page.title,
+        description: page.description,
+        content: page.content
+      },
+      refresh: true
+    })
+  },
+  /**
+   * UPDATE
+   *
+   * @param {Object} page Page to update
+   */
+  async updated(page) {
+    await this.client.index({
+      index: this.config.indexName,
+      type: '_doc',
+      id: page.hash,
+      body: {
+        suggest: this.buildSuggest(page),
+        locale: page.localeCode,
+        path: page.path,
+        title: page.title,
+        description: page.description,
+        content: page.content
+      },
+      refresh: true
+    })
   },
-  rebuild() {
+  /**
+   * DELETE
+   *
+   * @param {Object} page Page to delete
+   */
+  async deleted(page) {
+    await this.client.delete({
+      index: this.config.indexName,
+      type: '_doc',
+      id: page.hash,
+      refresh: true
+    })
+  },
+  /**
+   * RENAME
+   *
+   * @param {Object} page Page to rename
+   */
+  async renamed(page) {
+    await this.client.delete({
+      index: this.config.indexName,
+      type: '_doc',
+      id: page.sourceHash,
+      refresh: true
+    })
+    await this.client.index({
+      index: this.config.indexName,
+      type: '_doc',
+      id: page.destinationHash,
+      body: {
+        suggest: this.buildSuggest(page),
+        locale: page.localeCode,
+        path: page.destinationPath,
+        title: page.title,
+        description: page.description,
+        content: page.content
+      },
+      refresh: true
+    })
+  },
+  /**
+   * REBUILD INDEX
+   */
+  async rebuild() {
+    WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
+    await this.client.indices.delete({ index: this.config.indexName })
+    await this.createIndex()
+
+    const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
+    const MAX_INDEXING_COUNT = 1000
+    const COMMA_BYTES = Buffer.from(',').byteLength
+
+    let chunks = []
+    let bytes = 0
+
+    const processDocument = async (cb, doc) => {
+      try {
+        if (doc) {
+          const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
+
+          // -> Current batch exceeds size limit, flush
+          if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
+            await flushBuffer()
+          }
+
+          if (chunks.length > 0) {
+            bytes += COMMA_BYTES
+          }
+          bytes += docBytes
+          chunks.push(doc)
+
+          // -> Current batch exceeds count limit, flush
+          if (chunks.length >= MAX_INDEXING_COUNT) {
+            await flushBuffer()
+          }
+        } else {
+          // -> End of stream, flush
+          await flushBuffer()
+        }
+        cb()
+      } catch (err) {
+        cb(err)
+      }
+    }
+
+    const flushBuffer = async () => {
+      WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
+      try {
+        await this.client.bulk({
+          index: this.config.indexName,
+          body: _.reduce(chunks, (result, doc) => {
+            result.push({
+              index: {
+                _index: this.config.indexName,
+                _type: '_doc',
+                _id: doc.id
+              }
+            })
+            result.push({
+              suggest: this.buildSuggest(doc),
+              locale: doc.locale,
+              path: doc.path,
+              title: doc.title,
+              description: doc.description,
+              content: doc.content
+            })
+            return result
+          }, []),
+          refresh: true
+        })
+      } catch (err) {
+        WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
+      }
+      chunks.length = 0
+      bytes = 0
+    }
 
+    await pipeline(
+      WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'content').select().from('pages').where({
+        isPublished: true,
+        isPrivate: false
+      }).stream(),
+      new Transform({
+        objectMode: true,
+        transform: async (chunk, enc, cb) => processDocument(cb, chunk),
+        flush: async (cb) => processDocument(cb)
+      })
+    )
+    WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
   }
 }