From d0e9953def284d3bbbfb8b5c45d4cd4d3d0703f9 Mon Sep 17 00:00:00 2001
From: Xiao Gui <xgui3783@gmail.com>
Date: Mon, 11 Nov 2019 15:24:08 +0100
Subject: [PATCH] bugfix: broken pipe on stream error feat: config retries

---
 deploy/datasets/index.js         |  7 ++++---
 deploy/datasets/util.js          | 23 ++++++++++++++++++-----
 deploy/nehubaConfig/index.js     |  3 ++-
 deploy/nehubaConfig/query.js     |  1 +
 deploy/preview/index.js          |  4 ++--
 deploy/templates/index.js        |  3 ++-
 deploy/util/streamHandleError.js |  4 ++++
 7 files changed, 33 insertions(+), 12 deletions(-)
 create mode 100644 deploy/util/streamHandleError.js

diff --git a/deploy/datasets/index.js b/deploy/datasets/index.js
index 3f58252f7..ecfa7aa78 100644
--- a/deploy/datasets/index.js
+++ b/deploy/datasets/index.js
@@ -6,6 +6,7 @@ const { init, getDatasets, getPreview, getDatasetFromId, getDatasetFileAsZip, ge
 const { retry } = require('./util')
 const url = require('url')
 const qs = require('querystring')
+const { getHandleErrorFn } = require('../util/streamHandleError')
 
 const bodyParser = require('body-parser')
 
@@ -144,7 +145,7 @@ datasetsRouter.get('/previewFile', cacheMaxAge24Hr, (req, res) => {
   res.removeHeader('Content-Encoding')
   
   if (filePath) {
-    fs.createReadStream(filePath).pipe(res)
+    fs.createReadStream(filePath).pipe(res).on('error', getHandleErrorFn(req, res))
   } else {
     res.status(404).send()
   }
@@ -168,7 +169,7 @@ datasetsRouter.get('/kgInfo', checkKgQuery, cacheMaxAge24Hr, async (req, res) =>
   const { kgId } = req.query
   const { user } = req
   const stream = await getDatasetFromId({ user, kgId, returnAsStream: true })
-  stream.pipe(res)
+  stream.pipe(res).on('error', getHandleErrorFn(req, res))
 })
 
 datasetsRouter.get('/downloadKgFiles', checkKgQuery, async (req, res) => {
@@ -178,7 +179,7 @@ datasetsRouter.get('/downloadKgFiles', checkKgQuery, async (req, res) => {
     const stream = await getDatasetFileAsZip({ user, kgId })
     res.setHeader('Content-Type', 'application/zip')
     res.setHeader('Content-Disposition', `attachment; filename="${kgId}.zip"`)
-    stream.pipe(res)
+    stream.pipe(res).on('error', getHandleErrorFn(req, res))
   } catch (e) {
     console.warn('datasets/index#downloadKgFiles', e)
     res.status(400).send(e.toString())
diff --git a/deploy/datasets/util.js b/deploy/datasets/util.js
index 8b1469a8c..f708fb28b 100644
--- a/deploy/datasets/util.js
+++ b/deploy/datasets/util.js
@@ -35,17 +35,30 @@ const init = async () => {
   getPublicAccessToken = getPublic
 }
 
-const retry = (fn) => {
-  let retryId
+const defaultConfig = {
+  retries: 3,
+  timeout: 5000
+}
+
+const retry = (fn, config = {}) => {
+  let retryId, retriesAttempted = 0
+  const timeout = config.timeout || defaultConfig.timeout || 5000
+  const retries = config.retries || defaultConfig.retries || 3
   retryId = setInterval(() => {
+    retriesAttempted += 1
     fn()
       .then(() => {
         console.log(`retry succeeded, clearing retryId`)
-        clearTimeout(retryId)
+        clearInterval(retryId)
       }).catch(e => {
-        console.warn(`retry failed, retrying in 5sec`)
+        console.warn(`retry ${retriesAttempted}/${retries} failed.`)
+        if (retriesAttempted >= retries) {
+          console.warn(`maximum retires exceeded, terminating`)
+          clearInterval(retryId)
+        }
+        else console.warn(`retrying in ${timeout} seconds`)
       })
-  }, 5000)
+  }, timeout)
 }
 
 module.exports = {
diff --git a/deploy/nehubaConfig/index.js b/deploy/nehubaConfig/index.js
index fab68127a..d6f9a44b3 100644
--- a/deploy/nehubaConfig/index.js
+++ b/deploy/nehubaConfig/index.js
@@ -1,6 +1,7 @@
 const express = require('express')
 const { getTemplateNehubaConfig } = require('./query')
 const { detEncoding } = require('nomiseco')
+const { getHandleErrorFn } = require('../util/streamHandleError')
 
 const nehubaConfigRouter = express.Router()
 
@@ -12,7 +13,7 @@ nehubaConfigRouter.get('/:configId', (req, res, next) => {
   const { configId } = req.params
   if (acceptedEncoding) res.set('Content-Encoding', acceptedEncoding)
 
-  getTemplateNehubaConfig({ configId, acceptedEncoding, returnAsStream:true}).pipe(res)
+  getTemplateNehubaConfig({ configId, acceptedEncoding, returnAsStream:true}).pipe(res).on('error', getHandleErrorFn(req, res))
 })
 
 module.exports = nehubaConfigRouter
\ No newline at end of file
diff --git a/deploy/nehubaConfig/query.js b/deploy/nehubaConfig/query.js
index 33017b63a..5bbcd012a 100644
--- a/deploy/nehubaConfig/query.js
+++ b/deploy/nehubaConfig/query.js
@@ -33,6 +33,7 @@ exports.getTemplateNehubaConfig = ({configId, acceptedEncoding, returnAsStream})
     else return getFileAsPromise(`${filepath}.gz`)
   }
 
+  // no need to handle error. handled downstream
   if (returnAsStream) return fs.createReadStream(filepath).pipe(
     through2.obj(function(file, enc, cb){
       cb(null, reconfigureUrl(file.toString()))
diff --git a/deploy/preview/index.js b/deploy/preview/index.js
index 508cae3bb..426d04d82 100644
--- a/deploy/preview/index.js
+++ b/deploy/preview/index.js
@@ -2,6 +2,7 @@ const router = require('express').Router()
 const request = require('request')
 const url = require('url')
 const stream = require('stream')
+const { getHandleErrorFn } = require('../util/streamHandleError')
 
 let PROXY_HOSTNAME_WHITELIST
 
@@ -19,8 +20,7 @@ const whiteList = new Set([
 router.get('/file', (req, res) => {
   const { fileUrl } = req.query
   const f = url.parse(fileUrl)
-  if(f && f.hostname && whiteList.has(f.hostname))
-    return request(fileUrl).pipe(res)
+  if(f && f.hostname && whiteList.has(f.hostname)) return request(fileUrl).pipe(res).on('error', getHandleErrorFn(req, res))
   else res.status(400).send()
 })
 
diff --git a/deploy/templates/index.js b/deploy/templates/index.js
index 77ac71014..3378428b1 100644
--- a/deploy/templates/index.js
+++ b/deploy/templates/index.js
@@ -3,6 +3,7 @@ const query = require('./query')
 const path = require('path')
 const { detEncoding } = require('nomiseco')
 const url = require('url')
+const { getHandleErrorFn } = require('../util/streamHandleError')
 
 /**
  * root path fetches all templates
@@ -38,7 +39,7 @@ router.get('/:template', (req, res, next) => {
         })
 
       if (acceptedEncoding) res.set('Content-Encoding', acceptedEncoding)
-      query.getTemplate({ template, acceptedEncoding, returnAsStream:true }).pipe(res)
+      query.getTemplate({ template, acceptedEncoding, returnAsStream:true }).pipe(res).on('error', getHandleErrorFn(req, res))
     })
     .catch(error => next({
       code: 500,
diff --git a/deploy/util/streamHandleError.js b/deploy/util/streamHandleError.js
new file mode 100644
index 000000000..bd24374e7
--- /dev/null
+++ b/deploy/util/streamHandleError.js
@@ -0,0 +1,4 @@
+exports.getHandleErrorFn = (req, res) => err => {
+  console.error('getHandleErrorFn', err)
+  res.status(501).send(err.toString())
+}
\ No newline at end of file
-- 
GitLab