-
Notifications
You must be signed in to change notification settings - Fork 0
/
c470ip-tool
executable file
·514 lines (401 loc) · 17 KB
/
c470ip-tool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
#!/usr/bin/python
#
# c470ip-tool - Gigaset C470IP Phone management tool
#
# Copyright (c) 2013 by Yann Rouillard. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 dated June, 1991, or (at your
# option) any later version.
#
import sys
import os.path
import urllib
import time
import re
import argparse
import datetime
import simplejson
import atom.http_core
import gdata.gauth
import gdata.contacts
import gdata.contacts.client
import cStringIO
import mechanize
import xdg.BaseDirectory
class Contact(object):
__slots__ = ('full_name', 'phones')
def __init__(self):
self.phones = []
class Phone(object):
__slots__ = ('number', 'label')
def __init__(self, number, label):
self.number = Phone.sanitize_number(number)
self.label = label
@staticmethod
def sanitize_number(number):
""" Make sure the phone number is formatted
in the same way (no space, no leading +)
"""
number = number.replace(' ', '')
if number.startswith('+'):
number = '00' + number[1:]
return number
class GoogleContactsService(gdata.contacts.client.ContactsClient):
"""sub-class of gdata ContactsClient that adds more convenient methods
to access all the contacts informations.
"""
# Mapping between google contacts information and human-readable labels
REL2LABEL = {
gdata.contacts.REL_WORK: 'Work',
gdata.contacts.REL_HOME: 'Home',
gdata.contacts.REL_OTHER: 'Other',
gdata.contacts.PHONE_MOBILE: 'Mobile',
None: 'Home',
}
OAUTH_SCOPE = 'https://www.google.com/m8/feeds/'
def get_contacts(self):
""" Return an iterator over the list of contacts of a Google account
Each contact is returned as a simplified Contact object which only
contains the full name and the list of phone numbers
"""
contacts_feed = self.GetContacts()
while True:
for google_contact in contacts_feed.entry:
if google_contact.phone_number:
contact = Contact()
contact.full_name = google_contact.name.full_name.text
for phone_info in google_contact.phone_number:
phone = Phone(
phone_info.text,
GoogleContactsService.REL2LABEL[phone_info.rel]
)
contact.phones.append(phone)
yield contact
next_feed_uri = contacts_feed.GetNextLink()
if next_feed_uri:
contacts_feed = self.GetContacts(uri=next_feed_uri.href)
else:
break
class OAuth2DeviceToken(gdata.gauth.OAuth2Token):
""" sub-class of gdata OAuth2Token that can save and restore its state
from a file.
"""
# List of attributes that we want to save and restore
STATE_ATTRS = ('device_code', 'user_code', 'interval', 'access_token',
'refresh_token', 'verification_url')
def __init__(self, client_id, client_secret, scope, user_agent,
access_token=None, refresh_token=None, device_code=None,
state_file=None):
super(OAuth2DeviceToken, self).__init__(
client_id, client_secret, scope, user_agent,
access_token=access_token, refresh_token=refresh_token,
device_code=device_code)
self.state_file = state_file
for attr in OAuth2DeviceToken.STATE_ATTRS:
setattr(self, attr, None)
def load_state(self):
try:
with open(self.state_file, 'r') as f:
state = simplejson.loads(f.read())
for attr in OAuth2DeviceToken.STATE_ATTRS:
setattr(self, attr, state[attr])
except:
pass
def save_state(self):
state = {attr: getattr(self, attr)
for attr in OAuth2DeviceToken.STATE_ATTRS}
with open(self.state_file, 'w') as f:
f.write(simplejson.dumps(state))
class C470ipLoginError(Exception):
def __init__(self, error_code):
self.error_code = int(error_code)
class C470ipSessionAlreadyRunning(C470ipLoginError):
pass
class C470ipPhone(object):
""" The C470ipPhone object implements the operations than can be
performed on the Gigaset C470IP phone through the web interface.
:param url: url of the C470IP phone web interface.
:param pin: 4-digit pin code required to login on the web
interface.
"""
# List of label than can be used a suffix when a contact has several
# numbers
LABEL_AS_SUFFIX = ('Work', 'Home')
# Vcard format used to transfer contact information to the phone, using a
# python vcf # module would overkill here, we just use a simple format
# string
VCARD_FMT = (
"BEGIN:VCARD\r\nVERSION:2.1\r\n FN:%(full_name)s\r\n"
"N:%(full_name)s\r\nTEL;HOME:%(phone_number)s\r\nEND:VCARD\r\n")
# Full name of contacts can't be more than 16 characters on a C470 IP (pff)
MAX_NAME_LENGTH = 16
# List of tdt functions codes of the C450 IP settings_telephony_tdt.html
# form. Each of this code
UPLOAD_ADDRESS_BOOK = '2'
DELETE_ADDRESS_BOOK = '3'
# list of error codes
SESSION_RUNNING_ERROR = 2
LANG_DOWNLOAD_RUNNING = 160
def __init__(self, url, pin):
"""
"""
self.url = url
self.pin = pin
self._logged = False
# No real web API for the phone so we need to mimic a web browser
# that does each operation through the C470IP web interface
self.browser = mechanize.Browser()
def login(self):
"""Performs a login operation on the phone web interface.
"""
response = self.browser.open(self.url)
self.browser.select_form(name='gigaset')
self.browser['password'] = self.pin
response = self.browser.submit()
body = response.read()
match = re.search('var error = (?P<error>\d+);', body)
if match:
error_code = int(match.group('error'))
if error_code == SESSION_RUNNING_ERROR:
raise C470ipSessionAlreadyRunning(error_code)
elif error_code != LANG_DOWNLOAD_RUNNING:
raise C470ipLoginError(error_code)
elif not body.find('logout.html'):
raise C470ipLoginError(None)
# We are here if we got the LANG_DOWNLOAD_RUNNING error code.
# of we found the deconnection link on the page.
# Either way it means the login was successfull.
self._logged = True
def logout(self):
"""Performs a logout operation on the phone web interface.
"""
response = self.browser.open(self.url + 'logout.html')
self._logged = False
def delete_contacts(self, handset=1):
"""Erase the existing contacts list from the handset with the given
index.
:param handset: index of the handset whose address book must be
updated
"""
self._execute_tdt_function(C470ipPhone.DELETE_ADDRESS_BOOK, handset)
def add_contacts(self, contacts_list, handset=1):
"""Updates the adress book of a C470IP phone using the given contacts
list and returns the number of contacts added.
:param contacts_list: iterator or list of contacts list
:param handset: index of the handset whose address book must be
updated
"""
nb_contacts_added = 0
# We first convert the list of contacts into
# a VCF file compatible with the C470IP phone
vcard_file = cStringIO.StringIO()
for contact in contacts_list:
nb_contacts_added += 1
# The C470IP doesn't seem to support several phone numbers by
# contact (pff), so we create a unique contact card for each
# and use the label to make them different
for phone in contact.phones:
vcard_info = {'full_name': contact.full_name,
'phone_number': phone.number}
if (len(contact.phones) > 1 and
phone.label in C470ipPhone.LABEL_AS_SUFFIX):
# We need to truncate the name if the lenght doesn't
# allow use to add the label
phone_label_len = len(phone.label) + 1
max_name_len = (C470ipPhone.MAX_NAME_LENGTH -
phone_label_len)
if len(vcard_info['full_name']) > max_name_len:
vcard_info['full_name'] = (
vcard_info['full_name'][:max_name_len])
vcard_info['full_name'] += ' ' + phone.label
vcard_str = C470ipPhone.VCARD_FMT % vcard_info + "\r\n"
vcard_file.write(vcard_str.encode('iso8859-15'))
vcard_file.seek(0)
self._execute_tdt_function(C470ipPhone.UPLOAD_ADDRESS_BOOK, handset,
vcard_file)
return nb_contacts_added
def _execute_tdt_function(self, tdt_function, handset, tdt_file=None):
"""Executes a function of the settings_telephony_tdt.html form
which allows to manage a handset
"""
if not self._logged:
self.login()
self.browser.open(self.url + 'settings_telephony_tdt.html')
self.browser.select_form(name='gigaset')
# The web page dynamically add two fields using javascript
# so we have to manually create them
self.browser.form.set_all_readonly(False)
self.browser.form.new_control('text', 'hs2_0',
{'value': str(handset)})
self.browser.form.new_control('text', 'hs0_0',
{'value': 'INT %i' % handset})
self.browser.form.fixup()
self.browser['tdt_function'] = tdt_function
if tdt_file:
self.browser.form.add_file(tdt_file, 'text/vcf', 'tdt.vcf')
response = self.browser.submit()
while True:
match = re.search('var status = (?P<status>\d+);', response.read())
if match.group('status') != '0':
break
time.sleep(1)
try:
response = self.browser.open(self.url + 'status.html')
except:
# If there is anything wrong here, we still want try to stop
# the ongoing operation using the stoptdt.html page
break
self.browser.open(self.url + 'stoptdt.html')
###############################################################################
# Main code
###############################################################################
APP_NAME = 'c470ip-tool'
OAUTH_STATE_FILENAME = 'oauth.state'
# To be able to synchronize contacts information with Google, this script needs
# a Google OAuth Client ID and a Google OAuth Client secret that can be
# obtained here https://cloud.google.com/console/project
#
# They must be either put directly in this script or set up as environement
# variables before launching the script.
#
OAUTH_CLIENT_ID = ''
OAUTH_CLIENT_SECRET = ''
if os.getenv('CLIENT_ID'):
OAUTH_CLIENT_ID = os.getenv('CLIENT_ID')
if os.getenv('CLIENT_SECRET'):
OAUTH_CLIENT_SECRET = os.getenv('CLIENT_SECRET')
def verbose(message, verbose_mode):
if verbose_mode:
sys.stdout.write(message)
sys.stdout.flush()
## sub-command definition
def delete_contacts(args):
"""delete-contacts command: Delete all the existing contacts from the
C470 IP address book
"""
c470ip_phone = C470ipPhone(args.c470ip_url, args.c470ip_pin)
try:
verbose('Deleting existing contacts...', not args.quiet)
nb_contacts = c470ip_phone.delete_contacts()
verbose("done.\n", not args.quiet)
finally:
c470ip_phone.logout()
def google_sync(args):
"""google-sync command: synchronizes Google Contacts with the Gigaset
C470 IP address book
"""
if not OAUTH_CLIENT_ID or not OAUTH_CLIENT_SECRET:
sys.stderr.write("ERROR: CLIENT_ID and CLIENT_SECRET environment "
"variables must be defined to be able to use Google "
"synchronisation\n")
sys.exit(5)
google_contacts = GoogleContactsService(source=APP_NAME)
c470ip_phone = C470ipPhone(args.c470ip_url, args.c470ip_pin)
# We use the OAuth 2.0 for Device flow (described here:
# https://developers.google.com/accounts/docs/OAuth2ForDevices)
# to authorize the script to access the contacts information
oauth_state_file = os.path.join(xdg.BaseDirectory.save_data_path(APP_NAME),
OAUTH_STATE_FILENAME)
oauth = OAuth2DeviceToken(client_id=OAUTH_CLIENT_ID,
client_secret=OAUTH_CLIENT_SECRET,
scope=GoogleContactsService.OAUTH_SCOPE,
user_agent=APP_NAME,
state_file=oauth_state_file)
if not args.force_google_auth:
oauth.load_state()
# We perform the oauth2 in two step.
# The first we retrieve the user and device_codes and we ask the user to
# relaunch after having passed the authorized the device on the web.
if not oauth.user_code:
oauth.get_user_code()
oauth.save_state()
print ("%s needs to access your contacts information to be able"
" to perform the synchronization.\nPlease go to the url "
"%s and enter the code %s to authorize %s." % (
APP_NAME, oauth.verification_url, oauth.user_code,
APP_NAME))
sys.exit(0)
# The second time we retrieve the access_token using the device_code
if not oauth.access_token:
try:
oauth.get_access_token()
oauth.save_state()
except gdata.gauth.OAuth2AuthorizationPendingError:
print ("%s is still waiting for your authorization to be able to "
"access your contacts information.\nPlease go to the url "
"%s and enter the code %s." % (
APP_NAME, oauth.verification_url, oauth.user_code))
sys.exit(0)
oauth.authorize(google_contacts)
contacts = google_contacts.get_contacts()
try:
if args.overwrite:
verbose('Deleting existing contacts...', not args.quiet)
nb_contacts = c470ip_phone.delete_contacts()
verbose("done.\n", not args.quiet)
verbose('Synchronizing contacts...', not args.quiet)
nb_contacts = c470ip_phone.add_contacts(contacts)
verbose(" %i contacts transfered.\n" % nb_contacts, not args.quiet)
finally:
c470ip_phone.logout()
# Argument parsing
parser = argparse.ArgumentParser(
description='Gigaset C470IP Phone management tool')
parser.add_argument('--quiet', dest='quiet', action="store_true",
help='enable silent mode')
subparser = parser.add_subparsers(title='subcommands', dest='commands')
# google-sync command options
parser_google_sync = subparser.add_parser(
'google-sync',
help='Synchronize Google Contacts with the Gigaset C470 IP address '
'book')
parser_google_sync.add_argument(
'c470ip_url',
metavar='C470IP_URL',
help='url of the Gigaset C470 IP web interface')
parser_google_sync.add_argument(
'--pin',
dest='c470ip_pin',
default='0000',
help='Pin code of the C470 IP phone')
parser_google_sync.add_argument(
'--force-google-auth',
dest='force_google_auth',
action='store_true',
help='Force to pass again through the Google OAuth autorization '
'process')
parser_google_sync.add_argument(
'--overwrite',
dest='overwrite',
action='store_true',
help='if set, the existing phone address book will be erased before '
'adding the new contacts')
parser_google_sync.set_defaults(func=google_sync)
# delete command options
parser_delete_contacts = subparser.add_parser(
'delete-contacts',
help='Remove all the existing contacts from the phone')
parser_delete_contacts.add_argument(
'c470ip_url',
metavar='C470IP_URL',
help='url of the Gigaset C470 IP web interface')
parser_delete_contacts.add_argument(
'--pin',
dest='c470ip_pin',
default='0000',
help='Pin code of the C470 IP phone')
parser_delete_contacts.set_defaults(func=delete_contacts)
args = parser.parse_args()
try:
args.func(args)
except C470ipSessionAlreadyRunning:
sys.stderr.write("\nERROR: A session is already opened on the C470 IP web "
"interface.\n Please logout the existing session "
"or reset the phone.\n")
sys.exit(3)
except C470ipLoginError as e:
sys.stderr.write("\nERROR: Failed to login in the C470 IP Web interface."
" Error code: %i\n" % e.error_code)
sys.exit(4)