]>
cat aescling's git repositories - mastodon.git/blob - streaming/index.js
2 import cluster
from 'cluster' ;
3 import dotenv
from 'dotenv'
4 import express
from 'express'
5 import http
from 'http'
6 import redis
from 'redis'
8 import log
from 'npmlog'
10 import WebSocket
from 'ws'
11 import uuid
from 'uuid'
13 const env
= process
. env
. NODE_ENV
|| 'development'
16 path : env
=== 'production' ? '.env.production' : '.env'
19 if ( cluster
. isMaster
) {
22 const core
= + process
. env
. STREAMING_CLUSTER_NUM
|| ( env
=== 'development' ? 1 : ( os
. cpus (). length
> 1 ? os
. cpus (). length
- 1 : 1 ))
24 const worker
= cluster
. fork ();
25 worker
. on ( 'exit' , ( code
, signal
) => {
26 log
. error ( `Worker died with exit code ${code} , signal ${signal} received.` );
27 setTimeout (() => fork (), 0 );
30 for ( let i
= 0 ; i
< core
; i
++) fork ();
31 log
. info ( `Starting streaming API server master with ${core} workers` )
38 database : 'mastodon_development' ,
39 host : '/var/run/postgresql' ,
44 user : process
. env
. DB_USER
|| 'mastodon' ,
45 password : process
. env
. DB_PASS
|| '' ,
46 database : process
. env
. DB_NAME
|| 'mastodon_production' ,
47 host : process
. env
. DB_HOST
|| 'localhost' ,
48 port : process
. env
. DB_PORT
|| 5432 ,
54 const pgPool
= new pg
. Pool ( pgConfigs
[ env
])
55 const server
= http
. createServer ( app
)
56 const wss
= new WebSocket
. Server ({ server
})
58 const redisClient
= redis
. createClient ({
59 host : process
. env
. REDIS_HOST
|| '127.0.0.1' ,
60 port : process
. env
. REDIS_PORT
|| 6379 ,
61 password : process
. env
. REDIS_PASSWORD
66 redisClient
. on ( 'pmessage' , ( _
, channel
, message
) => {
67 const callbacks
= subs
[ channel
]
69 log
. silly ( `New message on channel ${channel} ` )
75 callbacks
. forEach ( callback
=> callback ( message
))
78 redisClient
. psubscribe ( 'timeline:*' )
80 const subscribe
= ( channel
, callback
) => {
81 log
. silly ( `Adding listener for ${channel} ` )
82 subs
[ channel
] = subs
[ channel
] || []
83 subs
[ channel
]. push ( callback
)
86 const unsubscribe
= ( channel
, callback
) => {
87 log
. silly ( `Removing listener for ${channel} ` )
88 subs
[ channel
] = subs
[ channel
]. filter ( item
=> item
!== callback
)
91 const allowCrossDomain
= ( req
, res
, next
) => {
92 res
. header ( 'Access-Control-Allow-Origin' , '*' )
93 res
. header ( 'Access-Control-Allow-Headers' , 'Authorization, Accept, Cache-Control' )
94 res
. header ( 'Access-Control-Allow-Methods' , 'GET, OPTIONS' )
99 const setRequestId
= ( req
, res
, next
) => {
100 req
. requestId
= uuid
. v4 ()
101 res
. header ( 'X-Request-Id' , req
. requestId
)
106 const accountFromToken
= ( token
, req
, next
) => {
107 pgPool
. connect (( err
, client
, done
) => {
113 client
. query ( 'SELECT oauth_access_tokens.resource_owner_id, users.account_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 LIMIT 1' , [ token
], ( err
, result
) => {
121 if ( result
. rows
. length
=== 0 ) {
122 err
= new Error ( 'Invalid access token' )
129 req
. accountId
= result
. rows
[ 0 ]. account_id
136 const authenticationMiddleware
= ( req
, res
, next
) => {
137 if ( req
. method
=== 'OPTIONS' ) {
142 const authorization
= req
. get ( 'Authorization' )
144 if (! authorization
) {
145 const err
= new Error ( 'Missing access token' )
152 const token
= authorization
. replace ( /^Bearer / , '' )
154 accountFromToken ( token
, req
, next
)
157 const errorMiddleware
= ( err
, req
, res
, next
) => {
158 log
. error ( req
. requestId
, err
)
159 res
. writeHead ( err
. statusCode
|| 500 , { 'Content-Type' : 'application/json' })
160 res
. end ( JSON
. stringify ({ error : err
. statusCode
? ` ${err} ` : 'An unexpected error occurred' }))
163 const placeholders
= ( arr
, shift
= 0 ) => arr
. map (( _
, i
) => `$ ${i + 1 + shift} ` ). join ( ', ' );
165 const streamFrom
= ( id
, req
, output
, attachCloseHandler
, needsFiltering
= false ) => {
166 log
. verbose ( req
. requestId
, `Starting stream from ${id} for ${req.accountId} ` )
168 const listener
= message
=> {
169 const { event
, payload
, queued_at
} = JSON
. parse ( message
)
171 const transmit
= () => {
172 const now
= new Date (). getTime ()
173 const delta
= now
- queued_at
;
175 log
. silly ( req
. requestId
, `Transmitting for ${req.accountId} : ${event} ${payload} Delay: ${delta} ms` )
176 output ( event
, payload
)
179 // Only messages that may require filtering are statuses, since notifications
180 // are already personalized and deletes do not matter
181 if ( needsFiltering
&& event
=== 'update' ) {
182 pgPool
. connect (( err
, client
, done
) => {
188 const unpackedPayload
= JSON
. parse ( payload
)
189 const targetAccountIds
= [ unpackedPayload
. account
. id
]. concat ( unpackedPayload
. mentions
. map ( item
=> item
. id
)). concat ( unpackedPayload
. reblog
? [ unpackedPayload
. reblog
. account
. id
] : [])
191 client
. query ( `SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN ( ${placeholders(targetAccountIds, 1)} ) UNION SELECT target_account_id FROM mutes WHERE account_id = $1 AND target_account_id IN ( ${placeholders(targetAccountIds, 1)} )` , [ req
. accountId
]. concat ( targetAccountIds
), ( err
, result
) => {
199 if ( result
. rows
. length
> 0 ) {
211 subscribe ( id
, listener
)
212 attachCloseHandler ( id
, listener
)
215 // Setup stream output to HTTP
216 const streamToHttp
= ( req
, res
) => {
217 res
. setHeader ( 'Content-Type' , 'text/event-stream' )
218 res
. setHeader ( 'Transfer-Encoding' , 'chunked' )
220 const heartbeat
= setInterval (() => res
. write ( ':thump \n ' ), 15000 )
222 req
. on ( 'close' , () => {
223 log
. verbose ( req
. requestId
, `Ending stream for ${req.accountId} ` )
224 clearInterval ( heartbeat
)
227 return ( event
, payload
) => {
228 res
. write ( `event: ${event} \n ` )
229 res
. write ( `data: ${payload} \n\n ` )
233 // Setup stream end for HTTP
234 const streamHttpEnd
= req
=> ( id
, listener
) => {
235 req
. on ( 'close' , () => {
236 unsubscribe ( id
, listener
)
240 // Setup stream output to WebSockets
241 const streamToWs
= ( req
, ws
) => {
242 const heartbeat
= setInterval (() => ws
. ping (), 15000 )
244 ws
. on ( 'close' , () => {
245 log
. verbose ( req
. requestId
, `Ending stream for ${req.accountId} ` )
246 clearInterval ( heartbeat
)
249 return ( event
, payload
) => {
250 if ( ws
. readyState
!== ws
. OPEN
) {
251 log
. error ( req
. requestId
, 'Tried writing to closed socket' )
255 ws
. send ( JSON
. stringify ({ event
, payload
}))
259 // Setup stream end for WebSockets
260 const streamWsEnd
= ws
=> ( id
, listener
) => {
261 ws
. on ( 'close' , () => {
262 unsubscribe ( id
, listener
)
265 ws
. on ( 'error' , e
=> {
266 unsubscribe ( id
, listener
)
270 app
. use ( setRequestId
)
271 app
. use ( allowCrossDomain
)
272 app
. use ( authenticationMiddleware
)
273 app
. use ( errorMiddleware
)
275 app
. get ( '/api/v1/streaming/user' , ( req
, res
) => {
276 streamFrom ( `timeline: ${req.accountId} ` , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
))
279 app
. get ( '/api/v1/streaming/public' , ( req
, res
) => {
280 streamFrom ( 'timeline:public' , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true )
283 app
. get ( '/api/v1/streaming/public/local' , ( req
, res
) => {
284 streamFrom ( 'timeline:public:local' , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true )
287 app
. get ( '/api/v1/streaming/hashtag' , ( req
, res
) => {
288 streamFrom ( `timeline:hashtag: ${req.query.tag} ` , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true )
291 app
. get ( '/api/v1/streaming/hashtag/local' , ( req
, res
) => {
292 streamFrom ( `timeline:hashtag: ${req.query.tag} :local` , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true )
295 wss
. on ( 'connection' , ws
=> {
296 const location
= url
. parse ( ws
. upgradeReq
. url
, true )
297 const token
= location
. query
. access_token
298 const req
= { requestId : uuid
. v4 () }
300 accountFromToken ( token
, req
, err
=> {
302 log
. error ( req
. requestId
, err
)
307 switch ( location
. query
. stream
) {
309 streamFrom ( `timeline: ${req.accountId} ` , req
, streamToWs ( req
, ws
), streamWsEnd ( ws
))
312 streamFrom ( 'timeline:public' , req
, streamToWs ( req
, ws
), streamWsEnd ( ws
), true )
315 streamFrom ( 'timeline:public:local' , req
, streamToWs ( req
, ws
), streamWsEnd ( ws
), true )
318 streamFrom ( `timeline:hashtag: ${location.query.tag} ` , req
, streamToWs ( req
, ws
), streamWsEnd ( ws
), true )
320 case 'hashtag:local' :
321 streamFrom ( `timeline:hashtag: ${location.query.tag} :local` , req
, streamToWs ( req
, ws
), streamWsEnd ( ws
), true )
329 server
. listen ( process
. env
. PORT
|| 4000 , () => {
330 log
. level
= process
. env
. LOG_LEVEL
|| 'verbose'
331 log
. info ( `Starting streaming API server worker on ${server.address()} ` )
334 process
. on ( 'SIGINT' , exit
)
335 process
. on ( 'SIGTERM' , exit
)
336 process
. on ( 'exit' , exit
)
This page took 0.140909 seconds and 4 git commands to generate.