]> cat aescling's git repositories - mastodon.git/blob - streaming/index.js
Allow to listen Unix socket (#2085)
[mastodon.git] / streaming / index.js
1 import os from 'os';
2 import cluster from 'cluster';
3 import dotenv from 'dotenv'
4 import express from 'express'
5 import http from 'http'
6 import redis from 'redis'
7 import pg from 'pg'
8 import log from 'npmlog'
9 import url from 'url'
10 import WebSocket from 'ws'
11 import uuid from 'uuid'
12
13 const env = process.env.NODE_ENV || 'development'
14
15 dotenv.config({
16 path: env === 'production' ? '.env.production' : '.env'
17 })
18
19 if (cluster.isMaster) {
20 // cluster master
21
22 const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : (os.cpus().length > 1 ? os.cpus().length - 1 : 1))
23 const fork = () => {
24 const worker = cluster.fork();
25 worker.on('exit', (code, signal) => {
26 log.error(`Worker died with exit code ${code}, signal ${signal} received.`);
27 setTimeout(() => fork(), 0);
28 });
29 };
30 for (let i = 0; i < core; i++) fork();
31 log.info(`Starting streaming API server master with ${core} workers`)
32
33 } else {
34 // cluster worker
35
36 const pgConfigs = {
37 development: {
38 database: 'mastodon_development',
39 host: '/var/run/postgresql',
40 max: 10
41 },
42
43 production: {
44 user: process.env.DB_USER || 'mastodon',
45 password: process.env.DB_PASS || '',
46 database: process.env.DB_NAME || 'mastodon_production',
47 host: process.env.DB_HOST || 'localhost',
48 port: process.env.DB_PORT || 5432,
49 max: 10
50 }
51 }
52
53 const app = express()
54 const pgPool = new pg.Pool(pgConfigs[env])
55 const server = http.createServer(app)
56 const wss = new WebSocket.Server({ server })
57
58 const redisClient = redis.createClient({
59 host: process.env.REDIS_HOST || '127.0.0.1',
60 port: process.env.REDIS_PORT || 6379,
61 password: process.env.REDIS_PASSWORD
62 })
63
64 const subs = {}
65
66 redisClient.on('pmessage', (_, channel, message) => {
67 const callbacks = subs[channel]
68
69 log.silly(`New message on channel ${channel}`)
70
71 if (!callbacks) {
72 return
73 }
74
75 callbacks.forEach(callback => callback(message))
76 })
77
78 redisClient.psubscribe('timeline:*')
79
80 const subscribe = (channel, callback) => {
81 log.silly(`Adding listener for ${channel}`)
82 subs[channel] = subs[channel] || []
83 subs[channel].push(callback)
84 }
85
86 const unsubscribe = (channel, callback) => {
87 log.silly(`Removing listener for ${channel}`)
88 subs[channel] = subs[channel].filter(item => item !== callback)
89 }
90
91 const allowCrossDomain = (req, res, next) => {
92 res.header('Access-Control-Allow-Origin', '*')
93 res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control')
94 res.header('Access-Control-Allow-Methods', 'GET, OPTIONS')
95
96 next()
97 }
98
99 const setRequestId = (req, res, next) => {
100 req.requestId = uuid.v4()
101 res.header('X-Request-Id', req.requestId)
102
103 next()
104 }
105
106 const accountFromToken = (token, req, next) => {
107 pgPool.connect((err, client, done) => {
108 if (err) {
109 next(err)
110 return
111 }
112
113 client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 LIMIT 1', [token], (err, result) => {
114 done()
115
116 if (err) {
117 next(err)
118 return
119 }
120
121 if (result.rows.length === 0) {
122 err = new Error('Invalid access token')
123 err.statusCode = 401
124
125 next(err)
126 return
127 }
128
129 req.accountId = result.rows[0].account_id
130
131 next()
132 })
133 })
134 }
135
136 const authenticationMiddleware = (req, res, next) => {
137 if (req.method === 'OPTIONS') {
138 next()
139 return
140 }
141
142 const authorization = req.get('Authorization')
143
144 if (!authorization) {
145 const err = new Error('Missing access token')
146 err.statusCode = 401
147
148 next(err)
149 return
150 }
151
152 const token = authorization.replace(/^Bearer /, '')
153
154 accountFromToken(token, req, next)
155 }
156
157 const errorMiddleware = (err, req, res, next) => {
158 log.error(req.requestId, err)
159 res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' })
160 res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }))
161 }
162
163 const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
164
165 const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => {
166 log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
167
168 const listener = message => {
169 const { event, payload, queued_at } = JSON.parse(message)
170
171 const transmit = () => {
172 const now = new Date().getTime()
173 const delta = now - queued_at;
174
175 log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
176 output(event, payload)
177 }
178
179 // Only messages that may require filtering are statuses, since notifications
180 // are already personalized and deletes do not matter
181 if (needsFiltering && event === 'update') {
182 pgPool.connect((err, client, done) => {
183 if (err) {
184 log.error(err)
185 return
186 }
187
188 const unpackedPayload = JSON.parse(payload)
189 const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : [])
190
191 client.query(`SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 1)}) UNION SELECT target_account_id FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 1)})`, [req.accountId].concat(targetAccountIds), (err, result) => {
192 done()
193
194 if (err) {
195 log.error(err)
196 return
197 }
198
199 if (result.rows.length > 0) {
200 return
201 }
202
203 transmit()
204 })
205 })
206 } else {
207 transmit()
208 }
209 }
210
211 subscribe(id, listener)
212 attachCloseHandler(id, listener)
213 }
214
215 // Setup stream output to HTTP
216 const streamToHttp = (req, res) => {
217 res.setHeader('Content-Type', 'text/event-stream')
218 res.setHeader('Transfer-Encoding', 'chunked')
219
220 const heartbeat = setInterval(() => res.write(':thump\n'), 15000)
221
222 req.on('close', () => {
223 log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
224 clearInterval(heartbeat)
225 })
226
227 return (event, payload) => {
228 res.write(`event: ${event}\n`)
229 res.write(`data: ${payload}\n\n`)
230 }
231 }
232
233 // Setup stream end for HTTP
234 const streamHttpEnd = req => (id, listener) => {
235 req.on('close', () => {
236 unsubscribe(id, listener)
237 })
238 }
239
240 // Setup stream output to WebSockets
241 const streamToWs = (req, ws) => {
242 const heartbeat = setInterval(() => ws.ping(), 15000)
243
244 ws.on('close', () => {
245 log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
246 clearInterval(heartbeat)
247 })
248
249 return (event, payload) => {
250 if (ws.readyState !== ws.OPEN) {
251 log.error(req.requestId, 'Tried writing to closed socket')
252 return
253 }
254
255 ws.send(JSON.stringify({ event, payload }))
256 }
257 }
258
259 // Setup stream end for WebSockets
260 const streamWsEnd = ws => (id, listener) => {
261 ws.on('close', () => {
262 unsubscribe(id, listener)
263 })
264
265 ws.on('error', e => {
266 unsubscribe(id, listener)
267 })
268 }
269
270 app.use(setRequestId)
271 app.use(allowCrossDomain)
272 app.use(authenticationMiddleware)
273 app.use(errorMiddleware)
274
275 app.get('/api/v1/streaming/user', (req, res) => {
276 streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req))
277 })
278
279 app.get('/api/v1/streaming/public', (req, res) => {
280 streamFrom('timeline:public', req, streamToHttp(req, res), streamHttpEnd(req), true)
281 })
282
283 app.get('/api/v1/streaming/public/local', (req, res) => {
284 streamFrom('timeline:public:local', req, streamToHttp(req, res), streamHttpEnd(req), true)
285 })
286
287 app.get('/api/v1/streaming/hashtag', (req, res) => {
288 streamFrom(`timeline:hashtag:${req.query.tag}`, req, streamToHttp(req, res), streamHttpEnd(req), true)
289 })
290
291 app.get('/api/v1/streaming/hashtag/local', (req, res) => {
292 streamFrom(`timeline:hashtag:${req.query.tag}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true)
293 })
294
295 wss.on('connection', ws => {
296 const location = url.parse(ws.upgradeReq.url, true)
297 const token = location.query.access_token
298 const req = { requestId: uuid.v4() }
299
300 accountFromToken(token, req, err => {
301 if (err) {
302 log.error(req.requestId, err)
303 ws.close()
304 return
305 }
306
307 switch(location.query.stream) {
308 case 'user':
309 streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws))
310 break;
311 case 'public':
312 streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(ws), true)
313 break;
314 case 'public:local':
315 streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(ws), true)
316 break;
317 case 'hashtag':
318 streamFrom(`timeline:hashtag:${location.query.tag}`, req, streamToWs(req, ws), streamWsEnd(ws), true)
319 break;
320 case 'hashtag:local':
321 streamFrom(`timeline:hashtag:${location.query.tag}:local`, req, streamToWs(req, ws), streamWsEnd(ws), true)
322 break;
323 default:
324 ws.close()
325 }
326 })
327 })
328
329 server.listen(process.env.PORT || 4000, () => {
330 log.level = process.env.LOG_LEVEL || 'verbose'
331 log.info(`Starting streaming API server worker on ${server.address()}`)
332 })
333
334 process.on('SIGINT', exit)
335 process.on('SIGTERM', exit)
336 process.on('exit', exit)
337
338 function exit() {
339 server.close()
340 }
341 }
This page took 0.140909 seconds and 4 git commands to generate.