const url = require('url');
const WebSocket = require('uws');
const uuid = require('uuid');
+const fs = require('fs');
const env = process.env.NODE_ENV || 'development';
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
const startMaster = () => {
+ if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
+ log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
+ }
log.info(`Starting streaming API server master with ${numWorkers} workers`);
};
});
};
+ const httpNotFound = res => {
+ res.writeHead(404, { 'Content-Type': 'application/json' });
+ res.end(JSON.stringify({ error: 'Not found' }));
+ };
+
app.use(setRequestId);
app.use(setRemoteAddress);
app.use(allowCrossDomain);
+
+ app.get('/api/v1/streaming/health', (req, res) => {
+ res.writeHead(200, { 'Content-Type': 'text/plain' });
+ res.end('OK');
+ });
+
app.use(authenticationMiddleware);
app.use(errorMiddleware);
});
app.get('/api/v1/streaming/direct', (req, res) => {
- streamFrom(`timeline:direct:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+ const channel = `timeline:direct:${req.accountId}`;
+ streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
});
app.get('/api/v1/streaming/hashtag', (req, res) => {
- streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+ const { tag } = req.query;
+
+ if (!tag || tag.length === 0) {
+ httpNotFound(res);
+ return;
+ }
+
+ streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
app.get('/api/v1/streaming/hashtag/local', (req, res) => {
- streamFrom(`timeline:hashtag:${req.query.tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
+ const { tag } = req.query;
+
+ if (!tag || tag.length === 0) {
+ httpNotFound(res);
+ return;
+ }
+
+ streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
});
app.get('/api/v1/streaming/list', (req, res) => {
authorizeListAccess(listId, req, authorized => {
if (!authorized) {
- res.writeHead(404, { 'Content-Type': 'application/json' });
- res.end(JSON.stringify({ error: 'Not found' }));
+ httpNotFound(res);
return;
}
ws.isAlive = true;
});
+ let channel;
+
switch(location.query.stream) {
case 'user':
- const channel = `timeline:${req.accountId}`;
+ channel = `timeline:${req.accountId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
break;
case 'user:notification':
streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'direct':
- streamFrom(`timeline:direct:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
+ channel = `timeline:direct:${req.accountId}`;
+ streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
break;
case 'hashtag':
+ if (!location.query.tag || location.query.tag.length === 0) {
+ ws.close();
+ return;
+ }
+
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'hashtag:local':
+ if (!location.query.tag || location.query.tag.length === 0) {
+ ws.close();
+ return;
+ }
+
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
break;
case 'list':
return;
}
- const channel = `timeline:list:${listId}`;
+ channel = `timeline:list:${listId}`;
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
});
break;
});
}, 30000);
- server.listen(process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
- log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
- });
+ if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
+ server.listen(process.env.SOCKET || process.env.PORT, () => {
+ fs.chmodSync(server.address(), 0o666);
+ log.info(`Worker ${workerId} now listening on ${server.address()}`);
+ });
+ } else {
+ server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
+ log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
+ });
+ }
const onExit = () => {
log.info(`Worker ${workerId} exiting, bye bye`);