-
Notifications
You must be signed in to change notification settings - Fork 2
/
program.py
387 lines (290 loc) · 13 KB
/
program.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# NOTE: this script was designed using the v1.1
# version of the OMF specification, as outlined here:
# https://omf-docs.osisoft.com/documentation_v11/Whats_New.html
# *************************************************************************************
# ************************************************************************
# Import necessary packages
# ************************************************************************
import enum
import json
import time
import datetime
import gzip
import random
import requests
import traceback
import os
from urllib.parse import urlparse
# ************************************************************************
# Global Variables
# ************************************************************************
# The version of the OMF messages
omf_version = '1.1'
# The number of seconds to sleep before sending another round of messages
sleep_time = 1
# The configurations of the endpoints to send to
endpoints = None
# Holders for data message values
boolean_value_1 = 0
boolean_value_2 = 1
# List of possible endpoint types
class EndpointTypes(enum.Enum):
ADH = 'ADH'
EDS = 'EDS'
PI = 'PI'
# ************************************************************************
# REQUIRED: generates a bearer token for authentication
# ************************************************************************
def get_token(endpoint):
'''Gets the token for the omfendpoint'''
endpoint_type = endpoint["EndpointType"]
# return an empty string if the endpoint is not an ADH type
if endpoint_type != EndpointTypes.ADH:
return ''
if (('expiration' in endpoint) and (endpoint["expiration"] - time.time()) > 5 * 60):
return endpoint["token"]
# we can't short circuit it, so we must go retreive it.
discovery_url = requests.get(
endpoint["Resource"] + '/identity/.well-known/openid-configuration',
headers={'Accept': 'application/json'},
verify=endpoint["VerifySSL"])
if discovery_url.status_code < 200 or discovery_url.status_code >= 300:
discovery_url.close()
raise Exception(f'Failed to get access token endpoint from discovery URL: {discovery_url.status_code}:{discovery_url.text}')
token_endpoint = json.loads(discovery_url.content)["token_endpoint"]
token_url = urlparse(token_endpoint)
# Validate URL
assert token_url.scheme == 'https'
assert token_url.geturl().startswith(endpoint["Resource"])
token_information = requests.post(
token_url.geturl(),
data={'client_id': endpoint["ClientId"],
'client_secret': endpoint["ClientSecret"],
'grant_type': 'client_credentials'},
verify=endpoint["VerifySSL"])
token = json.loads(token_information.content)
if token is None:
raise Exception('Failed to retrieve Token')
__expiration = float(token["expires_in"]) + time.time()
__token = token["access_token"]
# cache the results
endpoint["expiration"] = __expiration
endpoint["token"] = __token
return __token
# ************************************************************************
# REQUIRED: wrapper function for sending an HTTP message
# ************************************************************************
def send_message_to_omf_endpoint(endpoint, message_type, message_omf_json, action='create'):
'''Sends the request out to the preconfigured endpoint'''
# Compress json omf payload, if specified
compression = 'none'
if endpoint["UseCompression"]:
msg_body = gzip.compress(bytes(json.dumps(message_omf_json), 'utf-8'))
compression = 'gzip'
else:
msg_body = json.dumps(message_omf_json)
# Collect the message headers
msg_headers = get_headers(endpoint, compression, message_type, action)
# Send message to OMF endpoint
endpoints_type = endpoint["EndpointType"]
response = {}
# If the endpoint is ADH
if endpoints_type == EndpointTypes.ADH:
response = requests.post(
endpoint["OmfEndpoint"],
headers=msg_headers,
data=msg_body,
verify=endpoint["VerifySSL"],
timeout=endpoint["WebRequestTimeoutSeconds"]
)
# If the endpoint is EDS
elif endpoints_type == EndpointTypes.EDS:
response = requests.post(
endpoint["OmfEndpoint"],
headers=msg_headers,
data=msg_body,
timeout=endpoint["WebRequestTimeoutSeconds"]
)
# If the endpoint is PI
elif endpoints_type == EndpointTypes.PI:
response = requests.post(
endpoint["OmfEndpoint"],
headers=msg_headers,
data=msg_body,
verify=endpoint["VerifySSL"],
timeout=endpoint["WebRequestTimeoutSeconds"],
auth=(endpoint["Username"], endpoint["Password"])
)
# Check for 409, which indicates that a type with the specified ID and version already exists.
if response.status_code == 409:
return
# response code in 200s if the request was successful!
if response.status_code < 200 or response.status_code >= 300:
print(msg_headers)
response.close()
print(
f'Response from relay was bad. {message_type} message: {response.status_code} {response.text}. Message holdings: {message_omf_json}')
print()
raise Exception(f'OMF message was unsuccessful, {message_type}. {response.status_code}:{response.text}')
# ************************************************************************
# REQUIRED: retrieves headers for HTTP request to the specified endpoint
# ************************************************************************
def get_headers(endpoint, compression='', message_type='', action=''):
'''Assemble headers for sending to the endpoint's OMF endpoint'''
endpoint_type = endpoint["EndpointType"]
msg_headers = {
'messagetype': message_type,
'action': action,
'messageformat': 'JSON',
'omfversion': omf_version
}
if(compression == 'gzip'):
msg_headers["compression"] = 'gzip'
# If the endpoint is ADH
if endpoint_type == EndpointTypes.ADH:
msg_headers["Authorization"] = f'Bearer {get_token(endpoint)}'
# If the endpoint is PI
elif endpoint_type == EndpointTypes.PI:
msg_headers["x-requested-with"] = 'xmlhttprequest'
# validate headers to prevent injection attacks
validated_headers = {}
for key in msg_headers:
if key in {'Authorization', 'messagetype', 'action', 'messageformat', 'omfversion', 'x-requested-with', 'compression'}:
validated_headers[key] = msg_headers[key]
return validated_headers
# ************************************************************************
# This function will need to be customized to populate the OMF data
# message passed.
# ************************************************************************
def get_data(data):
''' Get data to be sent to EDS'''
global boolean_value_1, boolean_value_2
if data["containerid"] == 'FirstContainer' or data["containerid"] == 'SecondContainer':
data["values"][0]["Timestamp"] = get_current_time()
data["values"][0]["IntegerProperty"] = int(100*random.random())
elif data["containerid"] == 'ThirdContainer':
boolean_value_2 = (boolean_value_2 + 1) % 2
data["values"][0]["Timestamp"] = get_current_time()
data["values"][0]["NumberProperty1"] = 100*random.random()
data["values"][0]["NumberProperty2"] = 100*random.random()
data["values"][0]["StringEnum"] = str(bool(boolean_value_2))
elif data["containerid"] == 'FourthContainer':
boolean_value_1 = (boolean_value_1 + 1) % 2
data["values"][0]["Timestamp"] = get_current_time()
data["values"][0]["IntegerEnum"] = boolean_value_1
else:
print(f'Container {data["containerid"]} not recognized')
return data
def get_current_time():
''' Returns the current time'''
return datetime.datetime.utcnow().isoformat() + 'Z'
def get_json_file(filename):
''' Get a json file by the path specified relative to the application's path'''
# Try to open the configuration file
try:
with open(
filename,
'r',
) as f:
loaded_json = json.load(f)
except Exception as error:
print(f'Error: {str(error)}')
print(f'Could not open/read file: {filename}')
exit()
return loaded_json
def get_appsettings():
''' Return the appsettings.json as a json object, while also populating base_endpoint, omf_endpoint, and default values'''
# Try to open the configuration file
endpoints = get_json_file('appsettings.json')["Endpoints"]
filtered_endpoints = []
for endpoint in endpoints:
if endpoint["Selected"]:
filtered_endpoints.append(endpoint)
# for each endpoint construct the check base and OMF endpoint and populate default values
for endpoint in filtered_endpoints:
if endpoint["EndpointType"] == 'OCS':
print('OCS endpoint type is deprecated as OSIsoft Cloud Services has now been migrated to AVEVA Data Hub, using ADH type instead.')
endpoint_type = EndpointTypes.ADH
else:
endpoint["EndpointType"] = EndpointTypes(endpoint["EndpointType"])
endpoint_type = endpoint["EndpointType"]
# If the endpoint is ADH
if endpoint_type == EndpointTypes.ADH:
base_endpoint = f'{endpoint["Resource"]}/api/{endpoint["ApiVersion"]}' + \
f'/tenants/{endpoint["TenantId"]}/namespaces/{endpoint["NamespaceId"]}'
# If the endpoint is EDS
elif endpoint_type == EndpointTypes.EDS:
base_endpoint = f'{endpoint["Resource"]}/api/{endpoint["ApiVersion"]}' + \
f'/tenants/default/namespaces/default'
# If the endpoint is PI
elif endpoint_type == EndpointTypes.PI:
base_endpoint = endpoint["Resource"]
else:
raise ValueError('Invalid endpoint type')
omf_endpoint = f'{base_endpoint}/omf'
# add the base_endpoint and omf_endpoint to the endpoint configuration
endpoint["BaseEndpoint"] = base_endpoint
endpoint["OmfEndpoint"] = omf_endpoint
# check for optional/nullable parameters
if 'VerifySSL' not in endpoint or endpoint["VerifySSL"] == None:
endpoint["VerifySSL"] = True
if 'UseCompression' not in endpoint or endpoint["UseCompression"] == None:
endpoint["UseCompression"] = True
if 'WebRequestTimeoutSeconds' not in endpoint or endpoint["WebRequestTimeoutSeconds"] == None:
endpoint["WebRequestTimeoutSeconds"] = 30
return filtered_endpoints
def main(test=False, last_sent_values={}):
# Main program. Seperated out so that we can add a test function and call this easily
global endpoints
success = True
# Step 1 - Read endpoint configurations from appsettings.json
endpoints = get_appsettings()
# Step 2 - Get OMF Types
omf_types = get_json_file('OMF-Types.json')
# Step 3 - Get OMF Containers
omf_containers = get_json_file('OMF-Containers.json')
# Step 4 - Get OMF Data
omf_data = get_json_file('OMF-Data.json')
# Send messages and check for each endpoint in appsettings.json
try:
# Send out the messages that only need to be sent once
for endpoint in endpoints:
if not endpoint["VerifySSL"]:
print('You are not verifying the certificate of the end point. This is not advised for any system as there are security issues with doing this.')
# Step 5 - Send OMF Types
for omf_type in omf_types:
send_message_to_omf_endpoint(endpoint, 'type', [omf_type])
# Step 6 - Send OMF Containers
for omf_container in omf_containers:
send_message_to_omf_endpoint(
endpoint, 'container', [omf_container])
# Step 7 - Send OMF Data
count = 0
# send data to all endpoints forever if this is not a test
while not test or count < 2:
'''This is where custom loop logic should go.
The get_data call should also be customized to populate omf_data with relevant data.'''
for omf_datum in omf_data:
data_to_send = get_data(omf_datum)
for endpoint in endpoints:
# send the data
send_message_to_omf_endpoint(
endpoint, 'data', [data_to_send])
# record the values sent if this is a test
if test and count == 1:
last_sent_values.update(
{omf_datum["containerid"]: data_to_send})
time.sleep(sleep_time)
count = count + 1
except Exception as ex:
print(f'Encountered Error: {ex}')
print
traceback.print_exc()
print
success = False
if test:
raise ex
print('Done')
return success
if __name__ == '__main__':
main()