-
Notifications
You must be signed in to change notification settings - Fork 14
/
app.js
141 lines (116 loc) · 4.6 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import { app } from 'mu';
import services from './config/rules';
import normalizeQuad from './config/normalize-quad';
import bodyParser from 'body-parser';
import dns from 'dns';
import { foldChangeSets } from './folding';
import { sendRequest } from './send-request';
import { sendBundledRequest } from './bundle-requests';
// Log server config if requested
if( process.env["LOG_SERVER_CONFIGURATION"] )
console.log(JSON.stringify( services ));
app.get( '/', function( req, res ) {
res.status(200);
res.send("Hello, delta notification is running");
} );
app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
if( process.env["LOG_REQUESTS"] ) {
console.log("Logging request body");
console.log(req.body);
}
const changeSets = req.body.changeSets;
const originalMuCallIdTrail = JSON.parse( req.get('mu-call-id-trail') || "[]" );
const originalMuCallId = req.get('mu-call-id');
const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] );
const muSessionId = req.get('mu-session-id');
changeSets.forEach( (change) => {
['insert', 'delete', 'effectiveInsert', 'effectiveDelete']
.map( (key) => {
change[key] = (change[key] || []).map(normalizeQuad);
} );
} );
// inform watchers
informWatchers( changeSets, res, muCallIdTrail, muSessionId );
// push relevant data to interested actors
res.status(204).send();
} );
async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
services.map( async (entry, index) => {
entry.index = index;
// for each entity
if( process.env["DEBUG_DELTA_MATCH"] )
console.log(`Checking if we want to send to ${entry.callback.url}`);
const matchSpec = entry.match;
const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);
let allInserts = [];
let allDeletes = [];
originFilteredChangeSets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );
const changedTriples = [...allInserts, ...allDeletes];
const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);
if( someTripleMatchedSpec ) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);
if( entry.options && entry.options.gracePeriod ) {
sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId);
} else {
const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets );
sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId );
}
}
} );
}
function tripleMatchesSpec( triple, matchSpec ) {
// form of triple is {s, p, o}, same as matchSpec
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`);
for( let key in matchSpec ){
// key is one of s, p, o
const subMatchSpec = matchSpec[key];
const subMatchValue = triple[key];
if( subMatchSpec && !subMatchValue )
return false;
for( let subKey in subMatchSpec )
// we're now matching something like {type: "url", value: "http..."}
if( subMatchSpec[subKey] !== subMatchValue[subKey] )
return false;
}
return true; // no false matches found, let's send a response
}
async function filterMatchesForOrigin( changeSets, entry ) {
if( ! entry.options || !entry.options.ignoreFromSelf ) {
return changeSets;
} else {
try {
const originIpAddress = await getServiceIp( entry );
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
} catch(e) {
console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`);
console.error(e);
return changeSets;
}
}
}
function hostnameForEntry( entry ) {
return (new URL(entry.callback.url)).hostname;
}
async function getServiceIp(entry) {
const hostName = hostnameForEntry( entry );
return new Promise( (resolve, reject) => {
dns.lookup( hostName, { family: 4 }, ( err, address) => {
if( err )
reject( err );
else
resolve( address );
} );
} );
};