]>
cat aescling's git repositories - mastodon.git/blob - streaming/index.js
1 const os
= require ( 'os' );
2 const throng
= require ( 'throng' );
3 const dotenv
= require ( 'dotenv' );
4 const express
= require ( 'express' );
5 const http
= require ( 'http' );
6 const redis
= require ( 'redis' );
7 const pg
= require ( 'pg' );
8 const log
= require ( 'npmlog' );
9 const url
= require ( 'url' );
10 const { WebSocketServer
} = require ( '@clusterws/cws' );
11 const uuid
= require ( 'uuid' );
12 const fs
= require ( 'fs' );
14 const env
= process
. env
. NODE_ENV
|| 'development' ;
17 path : env
=== 'production' ? '.env.production' : '.env' ,
20 log
. level
= process
. env
. LOG_LEVEL
|| 'verbose' ;
22 const dbUrlToConfig
= ( dbUrl
) => {
27 const params
= url
. parse ( dbUrl
, true );
31 [ config
. user
, config
. password
] = params
. auth
. split ( ':' );
34 if ( params
. hostname
) {
35 config
. host
= params
. hostname
;
39 config
. port
= params
. port
;
42 if ( params
. pathname
) {
43 config
. database
= params
. pathname
. split ( '/' )[ 1 ];
46 const ssl
= params
. query
&& params
. query
. ssl
;
48 if ( ssl
&& ssl
=== 'true' || ssl
=== '1' ) {
55 const redisUrlToClient
= ( defaultConfig
, redisUrl
) => {
56 const config
= defaultConfig
;
59 return redis
. createClient ( config
);
62 if ( redisUrl
. startsWith ( 'unix://' )) {
63 return redis
. createClient ( redisUrl
. slice ( 7 ), config
);
66 return redis
. createClient ( Object
. assign ( config
, {
71 const numWorkers
= + process
. env
. STREAMING_CLUSTER_NUM
|| ( env
=== 'development' ? 1 : Math
. max ( os
. cpus (). length
- 1 , 1 ));
73 const startMaster
= () => {
74 if (! process
. env
. SOCKET
&& process
. env
. PORT
&& isNaN (+ process
. env
. PORT
)) {
75 log
. warn ( 'UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.' );
78 log
. info ( `Starting streaming API server master with ${numWorkers} workers` );
81 const startWorker
= ( workerId
) => {
82 log
. info ( `Starting worker ${workerId} ` );
86 user : process
. env
. DB_USER
|| pg
. defaults
. user
,
87 password : process
. env
. DB_PASS
|| pg
. defaults
. password
,
88 database : process
. env
. DB_NAME
|| 'mastodon_development' ,
89 host : process
. env
. DB_HOST
|| pg
. defaults
. host
,
90 port : process
. env
. DB_PORT
|| pg
. defaults
. port
,
95 user : process
. env
. DB_USER
|| 'mastodon' ,
96 password : process
. env
. DB_PASS
|| '' ,
97 database : process
. env
. DB_NAME
|| 'mastodon_production' ,
98 host : process
. env
. DB_HOST
|| 'localhost' ,
99 port : process
. env
. DB_PORT
|| 5432 ,
104 if (!! process
. env
. DB_SSLMODE
&& process
. env
. DB_SSLMODE
!== 'disable' ) {
105 pgConfigs
. development
. ssl
= true ;
106 pgConfigs
. production
. ssl
= true ;
109 const app
= express ();
110 app
. set ( 'trusted proxy' , process
. env
. TRUSTED_PROXY_IP
|| 'loopback,uniquelocal' );
112 const pgPool
= new pg
. Pool ( Object
. assign ( pgConfigs
[ env
], dbUrlToConfig ( process
. env
. DATABASE_URL
)));
113 const server
= http
. createServer ( app
);
114 const redisNamespace
= process
. env
. REDIS_NAMESPACE
|| null ;
116 const redisParams
= {
117 host : process
. env
. REDIS_HOST
|| '127.0.0.1' ,
118 port : process
. env
. REDIS_PORT
|| 6379 ,
119 db : process
. env
. REDIS_DB
|| 0 ,
120 password : process
. env
. REDIS_PASSWORD
,
123 if ( redisNamespace
) {
124 redisParams
. namespace = redisNamespace
;
127 const redisPrefix
= redisNamespace
? ` ${redisNamespace} :` : '' ;
129 const redisSubscribeClient
= redisUrlToClient ( redisParams
, process
. env
. REDIS_URL
);
130 const redisClient
= redisUrlToClient ( redisParams
, process
. env
. REDIS_URL
);
134 redisSubscribeClient
. on ( 'message' , ( channel
, message
) => {
135 const callbacks
= subs
[ channel
];
137 log
. silly ( `New message on channel ${channel} ` );
143 callbacks
. forEach ( callback
=> callback ( message
));
146 const subscriptionHeartbeat
= ( channel
) => {
147 const interval
= 6 * 60 ;
148 const tellSubscribed
= () => {
149 redisClient
. set ( ` ${redisPrefix} subscribed: ${channel} ` , '1' , 'EX' , interval
* 3 );
152 const heartbeat
= setInterval ( tellSubscribed
, interval
* 1000 );
154 clearInterval ( heartbeat
);
158 const subscribe
= ( channel
, callback
) => {
159 log
. silly ( `Adding listener for ${channel} ` );
160 subs
[ channel
] = subs
[ channel
] || [];
161 if ( subs
[ channel
]. length
=== 0 ) {
162 log
. verbose ( `Subscribe ${channel} ` );
163 redisSubscribeClient
. subscribe ( channel
);
165 subs
[ channel
]. push ( callback
);
168 const unsubscribe
= ( channel
, callback
) => {
169 log
. silly ( `Removing listener for ${channel} ` );
170 subs
[ channel
] = subs
[ channel
]. filter ( item
=> item
!== callback
);
171 if ( subs
[ channel
]. length
=== 0 ) {
172 log
. verbose ( `Unsubscribe ${channel} ` );
173 redisSubscribeClient
. unsubscribe ( channel
);
177 const allowCrossDomain
= ( req
, res
, next
) => {
178 res
. header ( 'Access-Control-Allow-Origin' , '*' );
179 res
. header ( 'Access-Control-Allow-Headers' , 'Authorization, Accept, Cache-Control' );
180 res
. header ( 'Access-Control-Allow-Methods' , 'GET, OPTIONS' );
185 const setRequestId
= ( req
, res
, next
) => {
186 req
. requestId
= uuid
. v4 ();
187 res
. header ( 'X-Request-Id' , req
. requestId
);
192 const setRemoteAddress
= ( req
, res
, next
) => {
193 req
. remoteAddress
= req
. connection
. remoteAddress
;
198 const accountFromToken
= ( token
, allowedScopes
, req
, next
) => {
199 pgPool
. connect (( err
, client
, done
) => {
205 client
. query ( 'SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1' , [ token
], ( err
, result
) => {
213 if ( result
. rows
. length
=== 0 ) {
214 err
= new Error ( 'Invalid access token' );
215 err
. statusCode
= 401 ;
221 const scopes
= result
. rows
[ 0 ]. scopes
. split ( ' ' );
223 if ( allowedScopes
. size
> 0 && ! scopes
. some ( scope
=> allowedScopes
. includes ( scope
))) {
224 err
= new Error ( 'Access token does not cover required scopes' );
225 err
. statusCode
= 401 ;
231 req
. accountId
= result
. rows
[ 0 ]. account_id
;
232 req
. chosenLanguages
= result
. rows
[ 0 ]. chosen_languages
;
233 req
. allowNotifications
= scopes
. some ( scope
=> [ 'read' , 'read:notifications' ]. includes ( scope
));
240 const accountFromRequest
= ( req
, next
, required
= true , allowedScopes
= [ 'read' ]) => {
241 const authorization
= req
. headers
. authorization
;
242 const location
= url
. parse ( req
. url
, true );
243 const accessToken
= location
. query
. access_token
|| req
. headers
[ 'sec-websocket-protocol' ];
245 if (! authorization
&& ! accessToken
) {
247 const err
= new Error ( 'Missing access token' );
248 err
. statusCode
= 401 ;
258 const token
= authorization
? authorization
. replace ( /^Bearer / , '' ) : accessToken
;
260 accountFromToken ( token
, allowedScopes
, req
, next
);
263 const PUBLIC_STREAMS
= [
267 'public:local:media' ,
272 const wsVerifyClient
= ( info
, cb
) => {
273 const location
= url
. parse ( info
. req
. url
, true );
274 const authRequired
= ! PUBLIC_STREAMS
. some ( stream
=> stream
=== location
. query
. stream
);
275 const allowedScopes
= [];
278 allowedScopes
. push ( 'read' );
279 if ( location
. query
. stream
=== 'user:notification' ) {
280 allowedScopes
. push ( 'read:notifications' );
282 allowedScopes
. push ( 'read:statuses' );
286 accountFromRequest ( info
. req
, err
=> {
288 cb ( true , undefined , undefined );
290 log
. error ( info
. req
. requestId
, err
. toString ());
291 cb ( false , 401 , 'Unauthorized' );
293 }, authRequired
, allowedScopes
);
296 const PUBLIC_ENDPOINTS
= [
297 '/api/v1/streaming/public' ,
298 '/api/v1/streaming/public/local' ,
299 '/api/v1/streaming/hashtag' ,
300 '/api/v1/streaming/hashtag/local' ,
303 const authenticationMiddleware
= ( req
, res
, next
) => {
304 if ( req
. method
=== 'OPTIONS' ) {
309 const authRequired
= ! PUBLIC_ENDPOINTS
. some ( endpoint
=> endpoint
=== req
. path
);
310 const allowedScopes
= [];
313 allowedScopes
. push ( 'read' );
314 if ( req
. path
=== '/api/v1/streaming/user/notification' ) {
315 allowedScopes
. push ( 'read:notifications' );
317 allowedScopes
. push ( 'read:statuses' );
321 accountFromRequest ( req
, next
, authRequired
, allowedScopes
);
324 const errorMiddleware
= ( err
, req
, res
, {}) => {
325 log
. error ( req
. requestId
, err
. toString ());
326 res
. writeHead ( err
. statusCode
|| 500 , { 'Content-Type' : 'application/json' });
327 res
. end ( JSON
. stringify ({ error : err
. statusCode
? err
. toString () : 'An unexpected error occurred' }));
330 const placeholders
= ( arr
, shift
= 0 ) => arr
. map (( _
, i
) => `$ ${i + 1 + shift} ` ). join ( ', ' );
332 const authorizeListAccess
= ( id
, req
, next
) => {
333 pgPool
. connect (( err
, client
, done
) => {
339 client
. query ( 'SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1' , [ id
], ( err
, result
) => {
342 if ( err
|| result
. rows
. length
=== 0 || result
. rows
[ 0 ]. account_id
!== req
. accountId
) {
352 const streamFrom
= ( id
, req
, output
, attachCloseHandler
, needsFiltering
= false , notificationOnly
= false ) => {
353 const accountId
= req
. accountId
|| req
. remoteAddress
;
355 const streamType
= notificationOnly
? ' (notification)' : '' ;
356 log
. verbose ( req
. requestId
, `Starting stream from ${id} for ${accountId}${streamType} ` );
358 const listener
= message
=> {
359 const { event
, payload
, queued_at
} = JSON
. parse ( message
);
361 const transmit
= () => {
362 const now
= new Date (). getTime ();
363 const delta
= now
- queued_at
;
364 const encodedPayload
= typeof payload
=== 'object' ? JSON
. stringify ( payload
) : payload
;
366 log
. silly ( req
. requestId
, `Transmitting for ${accountId} : ${event} ${encodedPayload} Delay: ${delta} ms` );
367 output ( event
, encodedPayload
);
370 if ( notificationOnly
&& event
!== 'notification' ) {
374 if ( event
=== 'notification' && ! req
. allowNotifications
) {
378 // Only messages that may require filtering are statuses, since notifications
379 // are already personalized and deletes do not matter
380 if (! needsFiltering
|| event
!== 'update' ) {
385 const unpackedPayload
= payload
;
386 const targetAccountIds
= [ unpackedPayload
. account
. id
]. concat ( unpackedPayload
. mentions
. map ( item
=> item
. id
));
387 const accountDomain
= unpackedPayload
. account
. acct
. split ( '@' )[ 1 ];
389 if ( Array
. isArray ( req
. chosenLanguages
) && unpackedPayload
. language
!== null && req
. chosenLanguages
. indexOf ( unpackedPayload
. language
) === - 1 ) {
390 log
. silly ( req
. requestId
, `Message ${unpackedPayload.id} filtered by language ( ${unpackedPayload.language} )` );
394 // When the account is not logged in, it is not necessary to confirm the block or mute
395 if (! req
. accountId
) {
400 pgPool
. connect (( err
, client
, done
) => {
407 client
. query ( `SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN ( ${placeholders(targetAccountIds, 2)} )) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN ( ${placeholders(targetAccountIds, 2)} )` , [ req
. accountId
, unpackedPayload
. account
. id
]. concat ( targetAccountIds
)),
411 queries
. push ( client
. query ( 'SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2' , [ req
. accountId
, accountDomain
]));
414 Promise
. all ( queries
). then ( values
=> {
417 if ( values
[ 0 ]. rows
. length
> 0 || ( values
. length
> 1 && values
[ 1 ]. rows
. length
> 0 )) {
429 subscribe ( ` ${redisPrefix}${id} ` , listener
);
430 attachCloseHandler ( ` ${redisPrefix}${id} ` , listener
);
433 // Setup stream output to HTTP
434 const streamToHttp
= ( req
, res
) => {
435 const accountId
= req
. accountId
|| req
. remoteAddress
;
437 res
. setHeader ( 'Content-Type' , 'text/event-stream' );
438 res
. setHeader ( 'Transfer-Encoding' , 'chunked' );
440 const heartbeat
= setInterval (() => res
. write ( ':thump \n ' ), 15000 );
442 req
. on ( 'close' , () => {
443 log
. verbose ( req
. requestId
, `Ending stream for ${accountId} ` );
444 clearInterval ( heartbeat
);
447 return ( event
, payload
) => {
448 res
. write ( `event: ${event} \n ` );
449 res
. write ( `data: ${payload} \n\n ` );
453 // Setup stream end for HTTP
454 const streamHttpEnd
= ( req
, closeHandler
= false ) => ( id
, listener
) => {
455 req
. on ( 'close' , () => {
456 unsubscribe ( id
, listener
);
463 // Setup stream output to WebSockets
464 const streamToWs
= ( req
, ws
) => ( event
, payload
) => {
465 if ( ws
. readyState
!== ws
. OPEN
) {
466 log
. error ( req
. requestId
, 'Tried writing to closed socket' );
470 ws
. send ( JSON
. stringify ({ event
, payload
}));
473 // Setup stream end for WebSockets
474 const streamWsEnd
= ( req
, ws
, closeHandler
= false ) => ( id
, listener
) => {
475 const accountId
= req
. accountId
|| req
. remoteAddress
;
477 ws
. on ( 'close' , () => {
478 log
. verbose ( req
. requestId
, `Ending stream for ${accountId} ` );
479 unsubscribe ( id
, listener
);
485 ws
. on ( 'error' , () => {
486 log
. verbose ( req
. requestId
, `Ending stream for ${accountId} ` );
487 unsubscribe ( id
, listener
);
494 const httpNotFound
= res
=> {
495 res
. writeHead ( 404 , { 'Content-Type' : 'application/json' });
496 res
. end ( JSON
. stringify ({ error : 'Not found' }));
499 app
. use ( setRequestId
);
500 app
. use ( setRemoteAddress
);
501 app
. use ( allowCrossDomain
);
503 app
. get ( '/api/v1/streaming/health' , ( req
, res
) => {
504 res
. writeHead ( 200 , { 'Content-Type' : 'text/plain' });
508 app
. use ( authenticationMiddleware
);
509 app
. use ( errorMiddleware
);
511 app
. get ( '/api/v1/streaming/user' , ( req
, res
) => {
512 const channel
= `timeline: ${req.accountId} ` ;
513 streamFrom ( channel
, req
, streamToHttp ( req
, res
), streamHttpEnd ( req
, subscriptionHeartbeat ( channel
)));
516 app
. get ( '/api/v1/streaming/user/notification' , ( req
, res
) => {
517 streamFrom ( `timeline: ${req.accountId} ` , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), false , true );
520 app
. get ( '/api/v1/streaming/public' , ( req
, res
) => {
521 const onlyMedia
= req
. query
. only_media
=== '1' || req
. query
. only_media
=== 'true' ;
522 const channel
= onlyMedia
? 'timeline:public:media' : 'timeline:public' ;
524 streamFrom ( channel
, req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true );
527 app
. get ( '/api/v1/streaming/public/local' , ( req
, res
) => {
528 const onlyMedia
= req
. query
. only_media
=== '1' || req
. query
. only_media
=== 'true' ;
529 const channel
= onlyMedia
? 'timeline:public:local:media' : 'timeline:public:local' ;
531 streamFrom ( channel
, req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true );
534 app
. get ( '/api/v1/streaming/direct' , ( req
, res
) => {
535 const channel
= `timeline:direct: ${req.accountId} ` ;
536 streamFrom ( channel
, req
, streamToHttp ( req
, res
), streamHttpEnd ( req
, subscriptionHeartbeat ( channel
)), true );
539 app
. get ( '/api/v1/streaming/hashtag' , ( req
, res
) => {
540 const { tag
} = req
. query
;
542 if (! tag
|| tag
. length
=== 0 ) {
547 streamFrom ( `timeline:hashtag: ${tag.toLowerCase()} ` , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true );
550 app
. get ( '/api/v1/streaming/hashtag/local' , ( req
, res
) => {
551 const { tag
} = req
. query
;
553 if (! tag
|| tag
. length
=== 0 ) {
558 streamFrom ( `timeline:hashtag: ${tag.toLowerCase()} :local` , req
, streamToHttp ( req
, res
), streamHttpEnd ( req
), true );
561 app
. get ( '/api/v1/streaming/list' , ( req
, res
) => {
562 const listId
= req
. query
. list
;
564 authorizeListAccess ( listId
, req
, authorized
=> {
570 const channel
= `timeline:list: ${listId} ` ;
571 streamFrom ( channel
, req
, streamToHttp ( req
, res
), streamHttpEnd ( req
, subscriptionHeartbeat ( channel
)));
575 const wss
= new WebSocketServer ({ server
, verifyClient : wsVerifyClient
});
577 wss
. on ( 'connection' , ( ws
, req
) => {
578 const location
= url
. parse ( req
. url
, true );
579 req
. requestId
= uuid
. v4 ();
580 req
. remoteAddress
= ws
. _socket
. remoteAddress
;
584 switch ( location
. query
. stream
) {
586 channel
= `timeline: ${req.accountId} ` ;
587 streamFrom ( channel
, req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
, subscriptionHeartbeat ( channel
)));
589 case 'user:notification' :
590 streamFrom ( `timeline: ${req.accountId} ` , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), false , true );
593 streamFrom ( 'timeline:public' , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), true );
596 streamFrom ( 'timeline:public:local' , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), true );
599 streamFrom ( 'timeline:public:media' , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), true );
601 case 'public:local:media' :
602 streamFrom ( 'timeline:public:local:media' , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), true );
605 channel
= `timeline:direct: ${req.accountId} ` ;
606 streamFrom ( channel
, req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
, subscriptionHeartbeat ( channel
)), true );
609 if (! location
. query
. tag
|| location
. query
. tag
. length
=== 0 ) {
614 streamFrom ( `timeline:hashtag: ${location.query.tag.toLowerCase()} ` , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), true );
616 case 'hashtag:local' :
617 if (! location
. query
. tag
|| location
. query
. tag
. length
=== 0 ) {
622 streamFrom ( `timeline:hashtag: ${location.query.tag.toLowerCase()} :local` , req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
), true );
625 const listId
= location
. query
. list
;
627 authorizeListAccess ( listId
, req
, authorized
=> {
633 channel
= `timeline:list: ${listId} ` ;
634 streamFrom ( channel
, req
, streamToWs ( req
, ws
), streamWsEnd ( req
, ws
, subscriptionHeartbeat ( channel
)));
642 wss
. startAutoPing ( 30000 );
644 attachServerWithConfig ( server
, address
=> {
645 log
. info ( `Worker ${workerId} now listening on ${address} ` );
648 const onExit
= () => {
649 log
. info ( `Worker ${workerId} exiting, bye bye` );
654 const onError
= ( err
) => {
660 process
. on ( 'SIGINT' , onExit
);
661 process
. on ( 'SIGTERM' , onExit
);
662 process
. on ( 'exit' , onExit
);
663 process
. on ( 'uncaughtException' , onError
);
666 const attachServerWithConfig
= ( server
, onSuccess
) => {
667 if ( process
. env
. SOCKET
|| process
. env
. PORT
&& isNaN (+ process
. env
. PORT
)) {
668 server
. listen ( process
. env
. SOCKET
|| process
. env
. PORT
, () => {
670 fs
. chmodSync ( server
. address (), 0o666 );
671 onSuccess ( server
. address ());
675 server
. listen (+ process
. env
. PORT
|| 4000 , process
. env
. BIND
|| '0.0.0.0' , () => {
677 onSuccess ( ` ${server.address().address} : ${server.address().port} ` );
683 const onPortAvailable
= onSuccess
=> {
684 const testServer
= http
. createServer ();
686 testServer
. once ( 'error' , err
=> {
690 testServer
. once ( 'listening' , () => {
691 testServer
. once ( 'close' , () => onSuccess ());
695 attachServerWithConfig ( testServer
);
698 onPortAvailable ( err
=> {
700 log
. error ( 'Could not start server, the port or socket is in use' );
This page took 0.263325 seconds and 4 git commands to generate.