forked from ria-ee/X-Road-scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
xrd_all_methods.py
executable file
·160 lines (135 loc) · 5.33 KB
/
xrd_all_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/python3
from threading import Thread, Event
import argparse
import xrdinfo
import queue
import sys
# By default return listMethods
DEFAULT_METHOD = 'listMethods'
# Default timeout for HTTP requests
DEFAULT_TIMEOUT = 5.0
# Do not use threading by default
DEFAULT_THREAD_COUNT = 1
def print_error(content):
"""Error printer."""
content = "ERROR: {}\n".format(content)
sys.stderr.write(content)
def worker(params):
while True:
# Checking periodically if it is the time to gracefully shutdown
# the worker.
try:
subsystem = params['work_queue'].get(True, 0.1)
except queue.Empty:
if params['shutdown'].is_set():
return
else:
continue
try:
if params['rest']:
for method in xrdinfo.methods_rest(
addr=params['url'], client=params['client'], producer=subsystem,
method=params['method'], timeout=params['timeout'], verify=params['verify'],
cert=params['cert']):
line = xrdinfo.identifier(method) + '\n'
# Using thread safe "write" instead of "print"
sys.stdout.write(line)
else:
for method in xrdinfo.methods(
addr=params['url'], client=params['client'], producer=subsystem,
method=params['method'], timeout=params['timeout'],
verify=params['verify'],
cert=params['cert']):
line = xrdinfo.identifier(method) + '\n'
# Using thread safe "write" instead of "print"
sys.stdout.write(line)
except Exception as e:
print_error('{}: {}'.format(type(e).__name__, e))
finally:
params['work_queue'].task_done()
def main():
parser = argparse.ArgumentParser(
description='X-Road listMethods request to all members.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='By default peer TLS certificate is not validated.'
)
parser.add_argument(
'url', metavar='SERVER_URL',
help='URL of local Security Server accepting X-Road requests.')
parser.add_argument(
'client', metavar='CLIENT',
help='Client identifier consisting of slash separated Percent-Encoded parts (e.g. '
'"INSTANCE/MEMBER_CLASS/MEMBER_CODE/SUBSYSTEM_CODE" '
'or "INSTANCE/MEMBER_CLASS/MEMBER_CODE").')
parser.add_argument('-t', metavar='TIMEOUT', help='timeout for HTTP query', type=float)
parser.add_argument('--allowed', help='return only allowed methods', action='store_true')
parser.add_argument('--rest', help='return REST methods instead of SOAP', action='store_true')
parser.add_argument('--threads', metavar='THREADS', help='amount of threads to use', type=int)
parser.add_argument(
'--verify', metavar='CERT_PATH',
help='validate peer TLS certificate using CA certificate file.')
parser.add_argument(
'--cert', metavar='CERT_PATH', help='use TLS certificate for HTTPS requests.')
parser.add_argument('--key', metavar='KEY_PATH', help='private key for TLS certificate.')
parser.add_argument(
'--instance', metavar='INSTANCE',
help='use this instance instead of local X-Road instance.')
args = parser.parse_args()
params = {
'url': args.url,
'client': xrdinfo.identifier_parts(args.client),
'method': DEFAULT_METHOD,
'instance': None,
'timeout': DEFAULT_TIMEOUT,
'verify': False,
'cert': None,
'rest': args.rest,
'thread_cnt': DEFAULT_THREAD_COUNT,
'work_queue': queue.Queue(),
'shutdown': Event()
}
if not (len(params['client']) in (3, 4)):
print_error('Client name is incorrect: "{}"'.format(args.client))
exit(1)
if args.allowed:
params['method'] = 'allowedMethods'
if args.instance:
params['instance'] = args.instance
if args.t:
params['timeout'] = args.t
if args.verify:
params['verify'] = args.verify
if args.cert and args.key:
params['cert'] = (args.cert, args.key)
if args.threads and args.threads > 0:
params['thread_cnt'] = args.threads
shared_params = None
try:
shared_params = xrdinfo.shared_params_ss(
addr=args.url, instance=params['instance'], timeout=params['timeout'],
verify=params['verify'], cert=params['cert'])
except xrdinfo.XrdInfoError as e:
print_error('Cannot download Global Configuration: {}'.format(e))
exit(1)
# Create and start new threads
threads = []
for _ in range(params['thread_cnt']):
t = Thread(target=worker, args=(params,))
t.daemon = True
t.start()
threads.append(t)
# Populate the queue
try:
for subsystem in xrdinfo.registered_subsystems(shared_params):
params['work_queue'].put(subsystem)
except xrdinfo.XrdInfoError as e:
print_error(e)
exit(1)
# Block until all tasks in queue are done
params['work_queue'].join()
# Set shutdown event and wait until all daemon processes finish
params['shutdown'].set()
for t in threads:
t.join()
if __name__ == '__main__':
main()