]> cat aescling's git repositories - mastodon.git/blobdiff - streaming/index.js
Add check for missing tag param in streaming API (#8955)
[mastodon.git] / streaming / index.js
index d7bfa65422ff860641c205962632787971c25a1b..3a01be66a5f43c5a5785246a7230ee7aee52bc0c 100644 (file)
@@ -9,6 +9,7 @@ const log = require('npmlog');
 const url = require('url');
 const WebSocket = require('uws');
 const uuid = require('uuid');
+const fs = require('fs');
 
 const env = process.env.NODE_ENV || 'development';
 
@@ -70,6 +71,9 @@ const redisUrlToClient = (defaultConfig, redisUrl) => {
 const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
 
 const startMaster = () => {
+  if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
+    log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
+  }
   log.info(`Starting streaming API server master with ${numWorkers} workers`);
 };
 
@@ -445,9 +449,20 @@ const startWorker = (workerId) => {
     });
   };
 
+  const httpNotFound = res => {
+    res.writeHead(404, { 'Content-Type': 'application/json' });
+    res.end(JSON.stringify({ error: 'Not found' }));
+  };
+
   app.use(setRequestId);
   app.use(setRemoteAddress);
   app.use(allowCrossDomain);
+
+  app.get('/api/v1/streaming/health', (req, res) => {
+    res.writeHead(200, { 'Content-Type': 'text/plain' });
+    res.end('OK');
+  });
+
   app.use(authenticationMiddleware);
   app.use(errorMiddleware);
 
@@ -475,15 +490,30 @@ const startWorker = (workerId) => {
   });
 
   app.get('/api/v1/streaming/direct', (req, res) => {
-    streamFrom(`timeline:direct:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+    const channel = `timeline:direct:${req.accountId}`;
+    streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
   });
 
   app.get('/api/v1/streaming/hashtag', (req, res) => {
-    streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+    const { tag } = req.query;
+
+    if (!tag || tag.length === 0) {
+      httpNotFound(res);
+      return;
+    }
+
+    streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
   });
 
   app.get('/api/v1/streaming/hashtag/local', (req, res) => {
-    streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+    const { tag } = req.query;
+
+    if (!tag || tag.length === 0) {
+      httpNotFound(res);
+      return;
+    }
+
+    streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
   });
 
   app.get('/api/v1/streaming/list', (req, res) => {
@@ -491,8 +521,7 @@ const startWorker = (workerId) => {
 
     authorizeListAccess(listId, req, authorized => {
       if (!authorized) {
-        res.writeHead(404, { 'Content-Type': 'application/json' });
-        res.end(JSON.stringify({ error: 'Not found' }));
+        httpNotFound(res);
         return;
       }
 
@@ -515,9 +544,11 @@ const startWorker = (workerId) => {
       ws.isAlive = true;
     });
 
+    let channel;
+
     switch(location.query.stream) {
     case 'user':
-      const channel = `timeline:${req.accountId}`;
+      channel = `timeline:${req.accountId}`;
       streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
       break;
     case 'user:notification':
@@ -536,12 +567,23 @@ const startWorker = (workerId) => {
       streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
       break;
     case 'direct':
-      streamFrom(`timeline:direct:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
+      channel = `timeline:direct:${req.accountId}`;
+      streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
       break;
     case 'hashtag':
+      if (!location.query.tag || location.query.tag.length === 0) {
+        ws.close();
+        return;
+      }
+
       streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
       break;
     case 'hashtag:local':
+      if (!location.query.tag || location.query.tag.length === 0) {
+        ws.close();
+        return;
+      }
+
       streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
       break;
     case 'list':
@@ -553,7 +595,7 @@ const startWorker = (workerId) => {
           return;
         }
 
-        const channel = `timeline:list:${listId}`;
+        channel = `timeline:list:${listId}`;
         streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
       });
       break;
@@ -574,9 +616,16 @@ const startWorker = (workerId) => {
     });
   }, 30000);
 
-  server.listen(process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
-    log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
-  });
+  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
+    server.listen(process.env.SOCKET || process.env.PORT, () => {
+      fs.chmodSync(server.address(), 0o666);
+      log.info(`Worker ${workerId} now listening on ${server.address()}`);
+    });
+  } else {
+    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
+      log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
+    });
+  }
 
   const onExit = () => {
     log.info(`Worker ${workerId} exiting, bye bye`);
This page took 0.037209 seconds and 3 git commands to generate.