-
Notifications
You must be signed in to change notification settings - Fork 7
/
core.py
2126 lines (1806 loc) · 135 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#-*-coding=utf-8-*-
from __future__ import with_statement
import sys
import site
site.addsitedir('/opt/ebs/venv/lib/python2.6/site-packages')
site.addsitedir('/opt/ebs/venv/lib/python2.7/site-packages')
import sys
sys.path.insert(0, "modules")
sys.path.append("cmodules")
"""DON'T REMOVE, NEEDED FOR PROPER FREEZING!!!"""
try: import mx.DateTime
except: pass
from encodings import idna, ascii #DONT REMOVE, BLATS!
import IPy
import zlib
import signal
import random
import threading
import dictionary
import ConfigParser
import psycopg2, psycopg2.extras
import time, datetime, os, sys, gc, traceback
import socket
import isdlogger
import utilites
import itertools
import db
from decimal import Decimal
from copy import copy, deepcopy
from threading import Thread, Lock
from collections import defaultdict
from utilites import get_decimals_speeds
from utilites import settlement_period_info, in_period, in_period_info, create_speed
import ctypes
from ctypes.util import find_library
from ctypes import Structure
from hashlib import md5
import commands
from base64 import b64decode
from operator import itemgetter, attrgetter
#===============================================================================
# from importlib import import_module ##DONT REMOVE
# import contextlib
# import UserList
# import celery.utils.text
#
# import celery.task
#
# import kombu.entity
#===============================================================================
import celery.task
sys.path.append("/opt/ebs/data/workers/")
sys.path.append("workers/")
sys.path.append("celery/")
from tasks import PoD, change_speed, cred, update_vpn_speed_state, update_ipn_speed_state, update_pod_state
import tasks
from db import transaction, get_last_checkout, get_acctf_history, radiustraffictransaction
from db import get_last_addon_checkout, addon_history, check_in_suspended, TraftransTableException
from classes.cacheutils import CacheMaster
from classes.core_cache import *
from classes.flags import CoreFlags
from classes.vars import CoreVars
from utilites import renewCaches, savepid, get_connection, check_running, getpid, rempid
from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE, ISOLATION_LEVEL_REPEATABLE_READ
from classes.core_class.RadiusSession import RadiusSession
from classes.core_class.BillSession import BillSession
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
NAME = 'core'
DB_NAME = 'db'
SECONDS_PER_DAY = 86400
ZERO_SUM = 0
SECOND = datetime.timedelta(seconds=1)
PERIOD = 1
ADDON = 2
MEGABYTE=Decimal(1048576)
def comparator(d, s):
for key in s:
if s[key]!='' and s[key]!='Null' and s[key]!='None':
d[key]=s[key]
return d
class check_vpn_access(Thread):
def __init__ (self):
Thread.__init__(self)
def check_access(self):
"""
Раз в 30 секунд происходит выборка всех пользователей
OnLine, делается проверка,
1. не вышли ли они за рамки временного диапазона
2. Не ушли ли в нулевой балланс
если срабатывает одно из двух условий-посылаем команду на отключение пользователя
TO-DO: Переписать! Работает правильно.
nas_id содержит в себе IP адрес. Сделано для уменьшения выборок в модуле core при старте сессии
TO-DO: если NAS не поддерживает POD или в парметрах доступа ТП указан IPN - отсылать команды через SSH
"""
global cacheMaster, suicideCondition, vars
dateAT = datetime.datetime(2000, 1, 1)
self.connection = get_connection(vars.db_dsn)
caches = None
while True:
try:
if suicideCondition[self.__class__.__name__]:
try: self.connection.close()
except: pass
break
a = time.time()
if cacheMaster.date > dateAT:
cacheMaster.lock.acquire()
try:
caches = cacheMaster.cache
dateAT = deepcopy(cacheMaster.date)
except Exception, ex:
logger.error("%s: cache exception: %s", (self.getName, repr(ex)))
finally:
cacheMaster.lock.release()
if not caches:
time.sleep(10)
continue
if 0: assert isinstance(caches, CoreCaches)
cur = self.connection.cursor()
now = dateAT
dublicated_ips = {}
ips = []
cur.connection.commit()
cur.execute("""SELECT rs.id,rs.account_id, rs.subaccount_id, rs.sessionid,rs.framed_ip_address, rs.speed_string,
lower(rs.framed_protocol) AS access_type,rs.nas_int_id, extract('epoch' from now()-rs.interrim_update) as last_update, rs.date_start,rs.ipinuse_id, rs.caller_id, ((SELECT pool_id FROM billservice_ipinuse WHERE id=rs.ipinuse_id)=(SELECT vpn_guest_ippool_id FROM billservice_tariff WHERE id=get_tarif(rs.account_id)))::boolean as guest_pool, rs.nas_port_id,
rs.speed_change_queued, rs.pod_queued
FROM radius_activesession AS rs WHERE rs.date_end IS NULL AND rs.date_start <= %s and session_status='ACTIVE';""", ( dateAT,))
rows=cur.fetchall()
cur.connection.commit()
for row in rows:
try:
rs = RadiusSession(*row)
result=None
nas = caches.nas_cache.by_id.get(rs.nas_id)
acc = caches.account_cache.by_account.get(rs.account_id)
subacc = caches.subaccount_cache.by_id.get(rs.subaccount_id)
if not nas : continue
#Если не найден аккаунт или субаккаунт
if not (acc and subacc): continue
if 0: assert isinstance(nas, NasData); assert isinstance(acc, AccountData)
acstatus = (subacc.allow_vpn_with_null and acc.ballance+acc.credit ==0) or (subacc.allow_vpn_with_minus and acc.ballance+acc.credit<0) or acc.ballance+acc.credit>0\
or \
(subacc.allow_vpn_with_block and (acc.balance_blocked or acc.disabled_by_limit))
acstatus_guest = rs.guest_pool
if not (acc.account_status==1 and acc.tarif_active==True and caches.timeperiodaccess_cache.in_period.get(acc.tarif_id)):
##dont check next if account disabled
acstatus=False
elif acstatus and acstatus_guest:
acstatus=False
elif not acstatus and acstatus_guest:
acstatus=True
#acstatus = acstatus and not (acstatus_guest and not acstatus) #and not (((subacc.allow_vpn_with_null and acc.ballance+acc.credit ==0) or (subacc.allow_vpn_with_minus and acc.ballance+acc.credit<=0) or acc.ballance+acc.credit>0)\
#and \
#(subacc.allow_vpn_with_block or (not subacc.allow_vpn_with_block and not acc.balance_blocked and not acc.disabled_by_limit)))
if rs.framed_ip_address not in dublicated_ips:
dublicated_ips[rs.framed_ip_address]=[]
dublicated_ips[rs.framed_ip_address].append(rs)
if acstatus and not rs.speed_change_queued: # caches.timeperiodaccess_cache.in_period.get(acc.tarif_id) - not need
#chech whether speed has changed
account_limit_speed = caches.speedlimit_cache.by_account_id.get(acc.account_id, [])
accservices = []
addonservicespeed=[]
br = False
if subacc:
accservices = caches.accountaddonservice_cache.by_subaccount.get(subacc.id, [])
for accservice in accservices:
service = caches.addonservice_cache.by_id.get(accservice.service_id)
for pnode in caches.timeperiodnode_cache.by_id.get(service.timeperiod_id, []):
if not accservice.deactivated and service.change_speed and fMem.in_period_(pnode.time_start,pnode.length,pnode.repeat_after, now)[3]:
addonservicespeed = (service.max_tx, service.max_rx, service.burst_tx, service.burst_rx, service.burst_treshold_tx, service.burst_treshold_rx, service.burst_time_tx, service.burst_time_rx, service.min_tx, service.min_rx, service.priority, service.speed_units, service.change_speed_type)
br = True
break
if br: break
br = False
if not addonservicespeed:
accservices = caches.accountaddonservice_cache.by_account.get(acc.account_id, [])
for accservice in accservices:
service = caches.addonservice_cache.by_id.get(accservice.service_id)
for pnode in caches.timeperiodnode_cache.by_id.get(service.timeperiod_id, []):
if not accservice.deactivated and service.change_speed and fMem.in_period_(pnode.time_start,pnode.length,pnode.repeat_after, now)[3]:
addonservicespeed = (service.max_tx, service.max_rx, service.burst_tx, service.burst_rx, service.burst_treshold_tx, service.burst_treshold_rx, service.burst_time_tx, service.burst_time_rx, service.min_tx, service.min_rx, service.priority, service.speed_units, service.change_speed_type)
br = True
break
if br: break
defspeed = caches.defspeed_cache.by_id.get(acc.tarif_id)
speeds = caches.speed_cache.by_id.get(acc.tarif_id, [])
logger.debug("%s: account=%s sessionid=%s defspeed=%s speeds=%s speedlimit=%s addonservicespeed=%s vpn_speed=%s ", (self.getName(), acc.account_id, str(rs.sessionid), repr(defspeed), repr(speeds), repr(account_limit_speed), repr(addonservicespeed), subacc.vpn_speed))
speed = create_speed(defspeed, speeds,account_limit_speed, addonservicespeed, subacc.vpn_speed, dateAT, fMem)
speed = get_decimals_speeds(speed)
logger.debug("%s: account=%s sessionid=%s total_speed=%s", (self.getName(), acc.account_id, str(rs.sessionid), repr(speed) ))
newspeed = ''.join([unicode(spi) for spi in speed])
if rs.speed_string != newspeed:
logger.debug("%s:send request for change speed for: account: %s| nas: %s | sessionid: %s", (self.getName(), acc.account_id, nas.id, str(rs.sessionid)))
cur.execute("""UPDATE radius_activesession SET speed_string=%s, speed_change_queued=now() WHERE id=%s and nas_int_id=%s and nas_port_id=%s;
""" , (newspeed, rs.id, rs.nas_id, rs.nas_port_id))
change_speed.delay(account=acc._asdict(), subacc=subacc._asdict(), nas=nas._asdict(),
access_type=str(rs.access_type),
format_string=str(nas.vpn_speed_action),session_id=str(rs.sessionid), vpn_ip_address=rs.framed_ip_address,
speed=speed, cb=tasks.update_vpn_speed_state.s(nas_id=rs.nas_id, nas_port_id=rs.nas_port_id, session_id=rs.id, newspeed=newspeed))
cur.connection.commit()
logger.debug("%s: speed change over: account: %s| nas: %s | sessionid: %s", (self.getName(), acc.account_id, nas.id, str(rs.sessionid)))
elif not rs.pod_queued and not acstatus:
logger.debug("%s: Send POD: account: %s| subacc: %s| nas: %s | sessionid: %s", (self.getName(), acc, subacc, nas.id, str(rs.sessionid)))
cur.execute("""UPDATE radius_activesession SET pod_queued=now() WHERE id=%s and nas_int_id=%s and nas_port_id=%s;
""" , ( rs.id, rs.nas_id, rs.nas_port_id))
PoD.delay(acc._asdict(), subacc._asdict(), nas._asdict(), access_type=rs.access_type, session_id=str(rs.sessionid), vpn_ip_address=rs.framed_ip_address, nas_port_id=rs.nas_port_id, caller_id=str(rs.caller_id), format_string=str(nas.reset_action), cb=tasks.update_pod_state.s(nas_id=rs.nas_id, nas_port_id=rs.nas_port_id, session_id=rs.id))
cur.connection.commit()
logger.debug("%s: POD sended: account: %s| nas: %s | sessionid: %s", (self.getName(), acc.account_id, nas.id, str(rs.sessionid)))
continue
from_start = (dateAT-rs.date_start).seconds+(dateAT-rs.date_start).days*86400
if (rs.time_from_last_update and rs.time_from_last_update+15>=nas.acct_interim_interval*3+3) or (not rs.time_from_last_update and from_start>=nas.acct_interim_interval*3+3):
cur.execute("""UPDATE radius_activesession SET session_status='ACK' WHERE id=%s;
""", (rs.id,))
cur.connection.commit()
except Exception, ex:
logger.error("%s: row exec exception: %s \n %s", (self.getName(), repr(ex), traceback.format_exc()))
if isinstance(ex, vars.db_errors): raise ex
#cur.execute("UPDATE billservice_ipinuse SET disabled=now() WHERE dynamic=True and disabled is Null and ip::inet not in (SELECT DISTINCT framed_ip_address::inet FROM radius_activesession WHERE ipinuse_id is not NUll and (session_status='ACTIVE'));")
#cur.connection.commit()
#===============================================================
# for key, value in dublicated_ips.iteritems():
# if len(value)<=1: continue
# logger.debug("%s: Dublicated IP detected %s %s", (self.getName(), key, value))
# value = sorted(value, key=attrgetter('date_start'))
# first = True
# for rs in value:
# if first == True:
# first = False
# continue
# logger.debug("%s: Send dublicates remove POD: account: %s| nas: %s | sessionid: %s", (self.getName(), acc.account_id, nas.id, str(rs.sessionid)))
# PoD.delay(acc._asdict(), subacc._asdict(), nas._asdict(), access_type=rs.access_type, session_id=str(rs.sessionid), vpn_ip_address=rs.framed_ip_address, caller_id=str(rs.caller_id), format_string=str(nas.reset_action))
# logger.debug("%s: dublicates remove POD sended: account: %s| nas: %s | sessionid: %s", (self.getName(), acc.account_id, nas.id, str(rs.sessionid)))
#===============================================================
cur.close()
logger.info("VPNALIVE: VPN thread run time: %s", time.time() - a)
except Exception, ex:
logger.error("%s : exception: %s \n %s", (self.getName(), repr(ex), traceback.format_exc()))
self.connection.rollback()
if ex.__class__ in vars.db_errors:
time.sleep(5)
try:
self.connection = get_connection(vars.db_dsn)
except Exception, eex:
logger.info("%s : database reconnection error: %s" , (self.getName(), repr(eex)))
time.sleep(10)
time.sleep(vars.VPN_SLEEP + random.randint(0,5))
def run(self):
#self.remove_sessions()
self.check_access()
class periodical_service_bill(Thread):
"""
Процесс будет производить снятие денег у клиентов, у которых в ТП
указан список периодических услуг.
Нужно учесть что:
1. Снятие может производиться в начале расчётного периода.
Т.к. мы не можем производить проверку каждую секунду - нужно держать список снятий
, чтобы проверять с какого времени мы уже не делали снятий и произвести их.
2. Снятие может производиться в конце расчётного периода.
ситуация аналогичная первой
"""
def __init__ (self):
Thread.__init__(self)
self.PER_DAY = 1
self.PER_DAY_DELTA = 1
self.NOW = datetime.datetime(2000, 1, 1)
#ps_type - 1 for periodocal service, 2 - for periodical addonservice
def iterate_ps(self, cur, acc, ps, dateAT, acctf_id, acctf_datetime, next_date, current, pss_type):
account_ballance = (acc.ballance or 0) + (acc.credit or 0)
susp_per_mlt = 1
if pss_type == PERIOD:
time_start_ps = acctf_datetime if ps.autostart else ps.time_start #Возможно баг. проверить на дату создания услуи и начало тарифного плана
#Если в расчётном периоде указана длина в секундах-использовать её, иначе использовать предопределённые константы
get_last_checkout_ = get_last_checkout
elif pss_type == ADDON:
if ps.temporary_blocked:
susp_per_mlt = 0
time_start_ps = ps.created
get_last_checkout_ = get_last_addon_checkout
else:
return
lc = get_last_checkout_(cur, ps.ps_id, acctf_id)
if lc is None and pss_type == PERIOD:
last_checkout = ps.created if ps.created and ps.created<acctf_datetime else acctf_datetime
first_time = True
logger.debug('%s: Periodical Service: last checkout is None set last checkout=%s for account: %s service:%s type:%s', (self.getName(), last_checkout, acc.account_id, ps.ps_id, pss_type))
elif lc is None and pss_type == ADDON:
last_checkout = ps.created
first_time = True
logger.debug('%s: Addon Service: last checkout is None set last checkout=%s for account: %s service:%s type:%s', (self.getName(), last_checkout, acc.account_id, ps.ps_id, pss_type))
else:
first_time = False
last_checkout = lc
#Расчитываем параметры расчётного периода на момент окончания тарифного плана или сейчас
period_start, period_end, delta = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, dateAT)
# Проверка на расчётный период без повторения
#if period_end < dateAT: return
if ps.cash_method == "GRADUAL":
logger.debug('%s: Periodical Service: GRADUAL last checkout %s for account: %s service:%s type:%s next date: %s', (self.getName(), last_checkout, acc.account_id, ps.ps_id, pss_type, next_date))
PER_DAY = SECONDS_PER_DAY / (ps.tpd if ps.tpd else vars.TRANSACTIONS_PER_DAY)
PER_DAY_DELTA = datetime.timedelta(seconds=PER_DAY)
if (dateAT - last_checkout).seconds + (dateAT - last_checkout).days*SECONDS_PER_DAY >= PER_DAY:
#Проверяем наступил ли новый период
# Смотрим сколько раз уже должны были снять деньги
delta_from_last_checkout = dateAT - last_checkout
last_checkout_seconds = delta_from_last_checkout.seconds + delta_from_last_checkout.days*SECONDS_PER_DAY
nums,ost = divmod(last_checkout_seconds,PER_DAY)
chk_date = last_checkout + PER_DAY_DELTA
#Добавить проверку на окончание периода
#Смотрим на какую сумму должны были снять денег и снимаем её
while chk_date <= dateAT:
if chk_date >self.NOW:
logger.info('%s: Periodical Service: GRADUAL %s Can not bill future ps account: %s chk_date: %s', (self.getName(), ps.ps_id, acc.account_id, chk_date))
return
delta_coef = Decimal('1.00')
if pss_type == PERIOD and vars.USE_COEFF_FOR_PS==True and next_date and chk_date+PER_DAY_DELTA>next_date:# если следующая проверка будет в новом расчётном периоде - считаем дельту
delta_coef=Decimal(str(float((next_date-chk_date).days*86400+(next_date-chk_date).seconds)/float(PER_DAY)))
logger.debug('%s: Periodical Service: %s Use coeff %s for ps account: %s', (self.getName(), ps.ps_id, delta_coef, acc.account_id))
logger.debug('%s: Periodical Service: GRADUAL account: %s service:%s type:%s check date: %s next date: %s', (self.getName(), acc.account_id, ps.ps_id, pss_type, chk_date, next_date,))
period_start, period_end, delta = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, chk_date)
mult = 0 if check_in_suspended(cur, acc.account_id, chk_date)==True else 1 #Если на момент списания был в блоке - списать 0
cash_summ = delta_coef*(mult*((PER_DAY * vars.TRANSACTIONS_PER_DAY * ps.cost) / (delta * vars.TRANSACTIONS_PER_DAY)))
if pss_type == PERIOD and (ps.deactivated is None or (ps.deactivated and ps.deactivated > chk_date)):
# Если это подключаемая услуга и дата отключения услуги ещё не наступила
#cur.execute("UPDATE billservice_account SET ballance=ballance-")
cur.execute("""SELECT
periodicaltr_fn(%s,%s,%s, %s::numeric,
%s::character varying,
%s::numeric,
%s::timestamp without time zone,
%s::timestamp without time zone,
%s::timestamp without time zone,
%s,
%s::numeric,
%s::boolean
) as new_summ;""", (ps.ps_id,
acctf_id,
acc.account_id,
acc.credit,
'PS_GRADUAL',
cash_summ,
chk_date,
chk_date,
chk_date+PER_DAY_DELTA,
ps.condition,
ps.condition_summ,
ps.delta_from_ballance
)
)
cash_summ=cur.fetchone()[0]
cur.connection.commit()
#cur.execute("UPDATE billservice_account SET ballance=ballance-%s WHERE id=%s;", (new_summ, acc.account_id,))
logger.debug('%s: Periodical Service: GRADUAL BATCH iter checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
elif pss_type == ADDON:
cash_summ = Decimal(str(cash_summ)) * susp_per_mlt
addon_history(cur, ps.addon_id, 'periodical', ps.ps_id, acc.acctf_id, acc.account_id, 'ADDONSERVICE_PERIODICAL_GRADUAL', cash_summ, chk_date)
cur.connection.commit()
logger.debug('%s: Addon Service Checkout thread: GRADUAL BATCH iter checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
else:
return
cur.connection.commit()
chk_date += PER_DAY_DELTA
if pss_type == PERIOD and ((next_date and chk_date>=next_date) or (ps.deactivated and ps.deactivated <= chk_date)):
logger.debug('%s: Periodical Service: GRADUAL last billed is True for account: %s service:%s type:%s', (self.getName(), acc.account_id, ps.ps_id, pss_type))
cur.execute("UPDATE billservice_periodicalservicelog SET last_billed=True WHERE service_id=%s and accounttarif_id=%s", (ps.ps_id, acctf_id))
cur.connection.commit()
return
cur.connection.commit()
if ps.cash_method == "AT_START":
"""
Смотрим когда в последний раз платили по услуге. Если в текущем расчётном периоде
не платили-производим снятие.
"""
"""
Списывать в начале периода только, если последнее списание+период<следующего тарифного плана
"""
summ = 0
if first_time==True:
period_start_ast, period_end_ast, delta_ast = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, last_checkout)
while first_time==True or last_checkout <= period_start:
if first_time==False:
period_start_ast, last_checkout, delta_ast = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, last_checkout)
chk_date = last_checkout
if chk_date >self.NOW:
logger.debug('%s: Periodical Service: AT_START %s Can not bill future ps account: %s chk_date: %s', (self.getName(), ps.ps_id, acc.account_id, chk_date))
return
if ps.created and ps.created >= chk_date and not last_checkout == ps.created:
# если указана дата начала перид. услуги и она в будующем - прпускаем её списание
return
logger.debug('%s: Periodical Service: AT_START account: %s service:%s type:%s check date: %s next date: %s', (self.getName(), acc.account_id, ps.ps_id, pss_type, chk_date, next_date,))
#Если следующее списание произойдёт уже на новом тарифе - отмечаем, что тарификация произведена
if pss_type == PERIOD and ((next_date and chk_date>=next_date) or (ps.deactivated and ps.deactivated <= chk_date)):
logger.debug('%s: Periodical Service: AT_START last billed is True for account: %s service:%s type:%s next date: %s', (self.getName(), acc.account_id, ps.ps_id, pss_type, next_date))
cur.execute("UPDATE billservice_periodicalservicelog SET last_billed=True WHERE service_id=%s and accounttarif_id=%s", (ps.ps_id, acctf_id))
cur.connection.commit()
return
mult = 0 if check_in_suspended(cur, acc.account_id, chk_date)==True else 1 #Если на момент списания был в блоке - списать 0
cash_summ = mult*ps.cost # Установить сумму равной нулю, если пользователь в блокировке
#if first_time == False:
# period_start_ast, period_end_ast, delta_ast = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, chk_date)
# chk_date = chk_date+datetime.timedelta(seconds=delta_ast)
delta_coef=1
if pss_type == PERIOD and vars.USE_COEFF_FOR_PS==True and first_time and ((last_checkout-acctf_datetime).days*86400+(last_checkout-acctf_datetime).seconds)<delta_ast:
logger.warning('%s: Periodical Service: %s Use coeff for ps account: %s', (self.getName(), ps.ps_id, acc.account_id))
delta_coef=float((period_end_ast-acctf_datetime).days*86400+(period_end_ast-acctf_datetime).seconds)/float(delta_ast)
cash_summ=Decimal(str(cash_summ))*Decimal(str(delta_coef))
if pss_type == PERIOD and (ps.deactivated is None or (ps.deactivated and ps.deactivated > chk_date)):
_, ps_end_for_delta, _ = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, chk_date)
#cur.execute("SELECT periodicaltr_fn(%s,%s,%s, %s::numeric, %s::character varying, %s::numeric, %s::timestamp without time zone, %s, %s::numeric) as new_summ;", (ps.ps_id, acctf_id, acc.account_id, acc.credit, 'PS_AT_START', cash_summ, chk_date, ps.condition, ps.condition_summ))
cur.execute("""SELECT
periodicaltr_fn(%s,%s,%s, %s::numeric,
%s::character varying,
%s::numeric,
%s::timestamp without time zone,
%s::timestamp without time zone,
%s::timestamp without time zone,
%s,
%s::numeric,
%s::boolean
) as new_summ;""", (ps.ps_id,
acctf_id,
acc.account_id,
acc.credit,
'PS_AT_START',
cash_summ,
chk_date,
chk_date,
ps_end_for_delta,
ps.condition,
ps.condition_summ,
ps.delta_from_ballance
)
)
cash_summ=cur.fetchone()[0]
#cur.execute("UPDATE billservice_account SET ballance=ballance-%s WHERE id=%s;", (new_summ, acc.account_id,))
logger.debug('%s: Periodical Service: AT START iter checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
pass
elif pss_type == ADDON:
cash_summ = Decimal(str(cash_summ)) * susp_per_mlt
addon_history(cur, ps.addon_id, 'periodical', ps.ps_id, acc.acctf_id, acc.account_id, 'ADDONSERVICE_PERIODICAL_AT_START', cash_summ, chk_date)
logger.debug('%s: Addon Service Checkout thread: AT START checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
cur.connection.commit()
first_time=False
cur.connection.commit()
if ps.cash_method=="AT_END":
"""
Смотрим завершился ли хотя бы один расчётный период.
Если завершился - считаем сколько уже их завершилось.
для остальных со статусом False
"""
# Здесь нужно проверить сколько раз прошёл расчётный период
# Если с начала текущего периода не было снятий-смотрим сколько их уже не было
# Для последней проводки ставим статус Approved=True
# для всех остальных False
# Если дата начала периода больше последнего снятия или снятий не было и наступил новый период - делаем проводки
#second_ = datetime.timedelta(seconds=1)
cash_summ = 0
if first_time or period_start > last_checkout:
cash_summ = ps.cost
chk_date = last_checkout
while True:
period_start_ast, period_end_ast, delta_ast = fMem.settlement_period_(time_start_ps, ps.length_in, ps.length, chk_date)
prev_period_start_ast, _, _ = fMem.settlement_period_(time_start_ps,
ps.length_in,
ps.length,
chk_date-datetime.timedelta(seconds=1)
)
if first_time==False:
chk_date = period_end_ast
logger.debug('%s: Periodical Service: AT_END account: %s service:%s type:%s check date: %s next date: %s', (self.getName(), acc.account_id, ps.ps_id, pss_type, chk_date, next_date,))
mult = 0 if check_in_suspended(cur, acc.account_id, chk_date)==True else 1 #Если на момент списания был в блоке - списать 0
cash_summ = mult*ps.cost
if chk_date >self.NOW:
logger.error('%s: Periodical Service: AT_END %s Can not bill future ps account: %s chk_date: %s new period start: %s', (self.getName(), ps.ps_id, acc.account_id, chk_date, period_start_ast))
break
if period_start_ast>period_start: break
s_delta_ast = datetime.timedelta(seconds=delta_ast)
if pss_type == PERIOD and vars.USE_COEFF_FOR_PS==True and first_time and ((chk_date-acctf_datetime).days*86400+(chk_date-acctf_datetime).seconds)<delta_ast:
logger.debug('%s: Periodical Service: %s Use coeff for ps account: %s', (self.getName(), ps.ps_id, acc.account_id))
delta_coef=float((chk_date-acctf_datetime).days*86400+(chk_date-acctf_datetime).seconds)/float(delta_ast)
cash_summ=Decimal(str(cash_summ))*Decimal(str(delta_coef))
if first_time:
first_time = False
chk_date = last_checkout
tr_date = period_start_ast
if pss_type == PERIOD:
cash_summ = 0
#ps_history(cur, ps.ps_id, acc.acctf_id, acc.account_id, 'PS_AT_END', ZERO_SUM, tr_date)
cur.execute("SELECT periodicaltr_fn(%s,%s,%s, %s::numeric, %s::character varying, %s::numeric, %s::timestamp without time zone, %s, %s::numeric) as new_summ;", (ps.ps_id, acctf_id, acc.account_id, acc.credit, 'PS_AT_END', cash_summ, tr_date, ps.condition, ps.condition_summ))
logger.debug('%s: Periodical Service: AT END First time checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
# cur.execute("SELECT periodicaltr_fn(%s,%s,%s, %s::character varying, %s::decimal, %s::timestamp without time zone, %s);", (ps.ps_id, acc.acctf_id, acc.account_id, 'PS_GRADUAL', cash_summ, chk_date, ps.condition))
elif pss_type == ADDON:
addon_history(cur, ps.addon_id, 'periodical', ps.ps_id, acc.acctf_id, acc.account_id, 'ADDONSERVICE_PERIODICAL_AT_END', ZERO_SUM, tr_date)
logger.debug('%s: Addon Service Checkout: AT END First time checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
cur.connection.commit()
return
else:
if ps.created and ps.created >= chk_date and not last_checkout == ps.created:
# если указана дата начала перид. услуги и она в будующем - прпускаем её списание
return
if pss_type == PERIOD and (ps.deactivated is None or (ps.deactivated and ps.deactivated > chk_date)):
tr_date = chk_date
if (next_date and chk_date>=next_date) or (ps.deactivated and ps.deactivated <= chk_date):
logger.debug('%s: Periodical Service: AT_END last billed is True for account: %s service:%s type:%s', (self.getName(), acc.account_id, ps.ps_id, pss_type))
cur.execute("UPDATE billservice_periodicalservicelog SET last_billed=True WHERE service_id=%s and accounttarif_id=%s", (ps.ps_id, acctf_id))
cur.connection.commit()
return
#cur.execute("SELECT periodicaltr_fn(%s,%s,%s, %s::numeric, %s::character varying, %s::numeric, %s::timestamp without time zone, %s, %s::numeric) as new_summ;", (ps.ps_id, acctf_id, acc.account_id, acc.credit, 'PS_AT_END', cash_summ, tr_date, ps.condition, ps.condition_summ))
cur.execute("""SELECT
periodicaltr_fn(%s,%s,%s, %s::numeric,
%s::character varying,
%s::numeric,
%s::timestamp without time zone,
%s::timestamp without time zone,
%s::timestamp without time zone,
%s,
%s::numeric,
%s::boolean
) as new_summ;""", (ps.ps_id,
acctf_id,
acc.account_id,
acc.credit,
'PS_AT_END',
cash_summ,
tr_date,
prev_period_start_ast,
tr_date,
ps.condition,
ps.condition_summ,
ps.delta_from_ballance
)
)
cash_summ=cur.fetchone()[0]
#cur.execute("UPDATE billservice_account SET ballance=ballance-%s WHERE id=%s;", (new_summ, acc.account_id,))
logger.debug('%s: Periodical Service: AT END iter checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
elif pss_type == ADDON:
cash_summ = Decimal(str(cash_summ)) * susp_per_mlt
tr_date = chk_date
if ps.deactivated and ps.deactivated < chk_date:
#сделать расчёт остатка - сейчас эта штука компенсируется штрафами за досрочное отключение
cash_summ = 0
tr_date = ps.deactivated
addon_history(cur, ps.addon_id, 'periodical', ps.ps_id, acc.acctf_id, acc.account_id, 'ADDONSERVICE_PERIODICAL_AT_END', cash_summ, tr_date)
logger.debug('%s: Addon Service Checkout thread: AT END checkout for account: %s service:%s summ %s', (self.getName(), acc.account_id, ps.ps_id, cash_summ))
else:
return
cur.connection.commit()
chk_date = period_end_ast
if chk_date-SECOND > period_start: break
#cur.connection.commit()
if pss_type == ADDON and ps.deactivated and dateAT >= ps.deactivated:
cur.execute("UPDATE billservice_accountaddonservice SET last_checkout = deactivated WHERE id=%s", (ps.ps_id,))
cur.connection.commit()
#if pss_type == ZERO_SUM and ps.deactivated and dateAT >= ps.deactivated:
# cur.execute("UPDATE billservice_periodicalservice SET deleted = True WHERE id=%s;", (ps.ps_id,))
#cur.connection.commit()
def run(self):
global cacheMaster, fMem, suicideCondition, transaction_number, vars
self.connection = get_connection(vars.db_dsn)
self.connection.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
dateAT = datetime.datetime(2000, 1, 1)
caches = None
while True:
a_ = time.time()
try:
if suicideCondition[self.__class__.__name__]: break
a = time.time()
if cacheMaster.date <= dateAT:
time.sleep(10); continue
else:
cacheMaster.lock.acquire()
try:
caches = cacheMaster.cache
dateAT = deepcopy(cacheMaster.date)
except Exception, ex:
logger.error("%s: cache exception: %s", (self.getName, repr(ex)))
finally:
cacheMaster.lock.release()
if 0: assert isinstance(caches, CoreCaches)
cur = self.connection.cursor()
#transactions per day
#n = SECONDS_PER_DAY / vars.TRANSACTIONS_PER_DAY
#n_delta = datetime.timedelta(seconds=n)
#now = dateAT
self.NOW = dateAT
#get a list of tarifs with periodical services & loop
for acc_id in caches.account_cache.by_account:
acc = caches.account_cache.by_account.get(acc_id)
if acc.account_status in [3, 4, 5, '3', '4', '5']: continue
if acc_id == 6050:
pass
if 0: assert isinstance(acc, AccountData)
acctf_raw_history = get_acctf_history(cur, acc.account_id)
by_id = {}
#Получаем историю смены субаккаунтов по которым не производились списания период. услуг
#Начальная индексация
for acctf_id, acctf_datetime, next_acctf_id, next_date, acc_tarif_id in acctf_raw_history:
for ps in caches.periodicalsettlement_cache.by_id.get(acc_tarif_id,[]):
try:
current = True if next_acctf_id is None else False
dt = next_date if next_date else self.NOW
#logger.info("%s : preiter: acctf=%s now=%s dateat=%s current=%s next_date=%s", (self.getName(), acctf_id, self.NOW, dateAT, current, next_date))
self.iterate_ps(cur, acc, ps, dt, acctf_id, acctf_datetime, next_date, current, PERIOD)
except Exception, ex:
logger.error("%s : exception: %s \n %s", (self.getName(), repr(ex), traceback.format_exc()))
if ex.__class__ in vars.db_errors: raise ex
cur.connection.commit()
cur.connection.commit()
for addon_ps in caches.addonperiodical_cache.data:
if 0: assert isinstance(addon_ps, AddonPeriodicalData)
if not (addon_ps.account_id or addon_ps.subaccount_id):
logger.error("Accountaddonservice %s has no account and subaccount", (addon_ps.ps_id))
continue
subacc = caches.subaccount_cache.by_id.get(addon_ps.subaccount_id)
if subacc:
acc = caches.account_cache.by_account.get(subacc.account_id)
else:
acc = caches.account_cache.by_account.get(addon_ps.account_id)
if not acc:
logger.warning('%s: Addon Periodical Service: %s. Incostistent database ERROR. Account not found: %s', (self.getName(), addon_ps.ps_id, addon_ps.account_id))
continue
dt = dateAT if not addon_ps.deactivated else addon_ps.deactivated
try:
#self.iterate_ps(cur, caches, acc, addon_ps, mult, dateAT, ADDON)
#cur, acc, ps, dateAT, acctf_id, acctf_datetime, next_date, current, pss_type
self.iterate_ps(cur, acc, addon_ps, dt, None, addon_ps.created, None, False, ADDON)
except Exception, ex:
logger.error("%s : exception: %s \n %s", (self.getName(), repr(ex), traceback.format_exc()))
if ex.__class__ in vars.db_errors: raise ex
cur.connection.commit()
cur.close()
logger.info("PSALIVE: Period. service thread run time: %s", time.time() - a)
except Exception, ex:
logger.error("%s : exception: %s \n %s", (self.getName(), repr(ex), traceback.format_exc()))
self.connection.rollback()
if ex.__class__ in vars.db_errors:
time.sleep(5)
try:
self.connection = get_connection(vars.db_dsn)
except Exception, eex:
logger.info("%s : database reconnection error: %s" , (self.getName(), repr(eex)))
time.sleep(10)
gc.collect()
time.sleep(abs(vars.PERIODICAL_SLEEP-(time.time()-a_)) + random.randint(0,5))
class RadiusAccessBill(Thread):
"""
Услуга применима только для VPN доступа, когда точно известна дата авторизации
и дата отключения пользователя
"""
def __init__(self):
Thread.__init__(self)
def get_actual_cost(self, date):
pass
def get_actual_prices(self, radtrafficnodes):
for period in radtrafficnodes:
if period.value!=0:
return True
return False
def valued_prices(self, value, radtrafficnodes):
d = {}
for x in radtrafficnodes:
d[value]=x
keys = d.keys()
keys.sort()
keys.append(sys.maxint)
#d=map(adict.get, keys)
res=[]
i=0
l=len(keys)
#перебираем все ноды по объёму. Оставляем только подходящие
for x in keys:
if value/(1024*1024)>=x and value/(1024*1024)<=keys[i+1]:
res.append(d[x])
i+=1
if l==i: break
return res
def run(self):
"""
По каждой записи делаем транзакции для пользователя в соотв с его текущим тарифным планов
"""
#connection = pool.connection()
#connection._con._con.set_client_encoding('UTF8')
global fMem, suicideCondition, cacheMaster, vars
self.connection = get_connection(vars.db_dsn)
self.connection.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
dateAT = datetime.datetime(2000, 1, 1)
caches = None
accounts_bytes_cache={} # account_id: date_start_date_end, bytes_in, bytes_out
while True:
try:
if suicideCondition[self.__class__.__name__]:
try: self.connection.close()
except: pass
break
a = time.time()
if cacheMaster.date <= dateAT:
time.sleep(10); continue
else:
cacheMaster.lock.acquire()
try:
caches = cacheMaster.cache
dateAT = deepcopy(cacheMaster.date)
except Exception, ex:
logger.error("%s: cache exception: %s", (self.getName(), repr(ex)))
finally:
cacheMaster.lock.release()
if 0: assert isinstance(caches, CoreCaches)
cur = self.connection.cursor()
cur.execute("""SELECT rs.id, rs.account_id, rs.sessionid, rs.session_time, rs.bytes_in, rs.bytes_out, rs.interrim_update, rs.date_start, rs.date_end, acc_t.id, rs.lt_time, rs.lt_bytes_in, rs.lt_bytes_out,rs.nas_port_id,rs.nas_int_id
FROM radius_activesession AS rs
LEFT JOIN billservice_accounttarif AS acc_t ON acc_t.id=(SELECT id FROM billservice_accounttarif WHERE account_id=rs.account_id and datetime<rs.date_start ORDER BY datetime DESC LIMIT 1)
WHERE (rs.need_traffic_co=True or rs.need_time_co=True) and ((rs.lt_time<rs.session_time) or (rs.lt_bytes_in<rs.bytes_in or rs.lt_bytes_out<rs.bytes_out))
ORDER BY rs.interrim_update ASC LIMIT 20000;""")
rows=cur.fetchall()
cur.connection.commit()
acctfs = []
for r in rows:
if r[9]:
acctfs.append(str(r[9]))
data=[]
if acctfs:
cur.execute("""
select acct.id, t.radius_traffic_transmit_service_id, t.time_access_service_id FROM billservice_accounttarif as acct
JOIN billservice_tariff as t ON t.id=acct.tarif_id
WHERE t.radius_traffic_transmit_service_id is not NULL or t.time_access_service_id is not NULL and acct.id in (%s)
;
""" % ','.join(acctfs) )
data = cur.fetchall()
cur.connection.commit()
acctf_cache = {}
for acct_id, radius_traffic_transmit_service_id, time_access_service_id in data:
acctf_cache[acct_id] = (radius_traffic_transmit_service_id, time_access_service_id)
now = dateAT
for row in rows:
rs = BillSession(*row)
radius_traffic_transmit_service_id, time_access_service_id = acctf_cache.get(rs.acctf_id, (None, None))
checkouted=False
#1. Ищем последнюю запись по которой была произведена оплата
#2. Получаем данные из услуги "Доступ по времени" из текущего ТП пользователя
#TODO:2. Проверяем сколько стоил трафик в начале сессии и не было ли смены периода.
#TODO:2.1 Если была смена периода -посчитать сколько времени прошло до смены и после смены,
# рассчитав соотв снятия.
#2.2 Если снятия не было-снять столько, на сколько насидел пользователь
#rs_id, account_id, session_id, session_time, interrim_update, ps_id, tarif_id, accountt_tarif_id = row
logger.debug("RADCOTHREAD: Checking session: %s", repr(rs))
acc = caches.account_cache.by_account.get(rs.account_id)
#if acc.radius_traffic_transmit_service_id:continue
if acc and time_access_service_id:
logger.debug("RADCOTHREAD: Time tarification session: %s", rs.sessionid)
old_time = rs.lt_time or 0
logger.debug("RADCOTHREAD: Old session time: %s %s", (rs.sessionid, old_time))
# old_time = old_time[0] if old_time else 0
total_time = rs.session_time - old_time
logger.debug("RADCOTHREAD: Tarification time for session: %s %s", (rs.sessionid, total_time))
if rs.date_end:
taccs_service = caches.timeaccessservice_cache.by_id.get(time_access_service_id)
logger.debug("RADCOTHREAD: Tarification time of end session : %s", (rs.sessionid, ))
if taccs_service.rounding:
logger.debug("RADCOTHREAD: Rounding session time : %s", (rs.sessionid, ))
if taccs_service.tarification_step>0:
total_time = divmod(total_time, taccs_service.tarification_step)[1]*taccs_service.tarification_step+taccs_service.tarification_step
logger.debug("RADCOTHREAD: Searching for prepaid time for session : %s", (rs.sessionid, ))
cur.execute("""SELECT id, size FROM billservice_accountprepaystime WHERE account_tarif_id=%s and prepaid_time_service_id=%s and current=True""", (rs.acctf_id,time_access_service_id,))
result = cur.fetchone()
cur.connection.commit()
prepaid_id, prepaid = result if result else (0, -1)
if prepaid > 0:
logger.debug("RADCOTHREAD: Prepaid time for session : %s %s", (rs.sessionid, prepaid))
if prepaid >= total_time:
total_time, prepaid = 0, prepaid - total_time
elif total_time >= prepaid:
total_time, prepaid = total_time - prepaid, 0
cur.execute("""UPDATE billservice_accountprepaystime SET size=%s WHERE id=%s""", (prepaid, prepaid_id,))
cur.connection.commit()
#get the list of time periods and their cost
logger.debug("RADCOTHREAD: Searching for time tarification node for session : %s", (rs.sessionid, ))
for period in caches.timeaccessnode_cache.by_id.get(time_access_service_id, []):
if 0: assert isinstance(period, TimeAccessNodeData)
#get period nodes and check them
for pnode in caches.timeperiodnode_cache.by_id.get(period.time_period_id, []):
if 0: assert isinstance(pnode, TimePeriodNodeData)
if fMem.in_period_(pnode.time_start,pnode.length,pnode.repeat_after, dateAT)[3]:
logger.debug("RADCOTHREAD: Time tarification node for session %s was found", (rs.sessionid, ))
summ = (total_time * period.cost) / Decimal("60")
logger.debug("RADCOTHREAD: Summ for checkout for session %s %s", (rs.sessionid, summ))
if summ > 0:
#timetransaction(cur, rs.taccs_id, rs.acctf_id, rs.account_id, rs.id, summ, now)
db.timetransaction_fn(cur, time_access_service_id, rs.acctf_id, rs.account_id, summ, now, unicode(rs.sessionid), rs.interrim_update)
cur.connection.commit()
logger.debug("RADCOTHREAD: Time for session %s was checkouted", (rs.sessionid, ))
break
logger.debug("RADCOTHREAD: Session %s was checkouted (Time)", (rs.sessionid, ))
#
if acc and radius_traffic_transmit_service_id:
logger.debug("RADCOTHREAD: Traffic tarification session: %s", rs.sessionid)
radius_traffic = caches.radius_traffic_transmit_service_cache.by_id.get(radius_traffic_transmit_service_id)
lt_bytes_in = rs.lt_bytes_in or 0
lt_bytes_out = rs.lt_bytes_out or 0
logger.debug("RADCOTHREAD: Last bytes in/out for session: %s (%s/%s)", (rs.sessionid, lt_bytes_in,lt_bytes_out))
# old_time = old_time[0] if old_time else 0
bytes_in = 0
bytes_out = 0
total_bytes_value = 0
#проверяем есть ли ноды с указанным value
"""
Если есть - получаем данные текущего расчётного периода абонента
Получаем значения байт из кэша accounts_bytes_cache для текущего расчётного периода
если данные найдены - обновляем значения в кэше приращёнными значениями.
Если не найдены - делаем запрос в базу данных и помещаем данные в кэш
"""
if self.get_actual_prices(caches.radius_traffic_node_cache.by_id.get(radius_traffic_transmit_service_id, [])) and acc.settlement_period_id:
sp = caches.settlementperiod_cache.by_id.get(acc.settlement_period_id)
if 0: assert isinstance(sp, SettlementPeriodData)
sp_defstart = acc.datetime if sp.autostart else sp.time_start
sp_start, sp_end, delta = fMem.settlement_period_(sp_defstart, sp.length_in, sp.length, dateAT)
date_start, date_end, bytes_in, bytes_out = accounts_bytes_cache.get(acc.acctf_id, (None, None, 0,0))
if date_start==sp_start and date_end==sp_end:
# если в кэше есть данные о трафике для абонента за указанный расчётный период - обновляем кэш свежими значениями
bytes_in = bytes_in if bytes_in else 0
bytes_out = bytes_out if bytes_out else 0
bytes_in, bytes_out = bytes_in+rs.bytes_in-rs.lt_bytes_in, bytes_out+rs.bytes_out-rs.lt_bytes_out
logger.debug("RADCOTHREAD: Append traffic bytes to cached values for account %s for session %s (%s/%s)", (rs.account_id, rs.sessionid, bytes_in,bytes_out))
accounts_bytes_cache[acc.acctf_id] = (date_start, date_end, bytes_in, bytes_out)
#accounts_bytes_cache[acc.acctf_id]['bytes_in']+= rs.bytes_in-rs.lt_bytes_in
#accounts_bytes_cache[acc.acctf_id]['bytes_out']+= rs.bytes_out-rs.lt_bytes_out
else:
cur.execute("""
SELECT sum(bytes_in) as bytes_in, sum(bytes_out) as bytes_out FROM radius_activesession
WHERE account_id=%s and (date_start>=%s and interrim_update<=%s) or (date_start>=%s and date_end<=%s);
""", (acc.account_id, sp_start, now, sp_start, now))
bytes_in, bytes_out = cur.fetchone()
bytes_in = bytes_in if bytes_in else 0
bytes_out = bytes_out if bytes_out else 0
logger.debug("RADCOTHREAD: Selecting bytes info from DB for account %s for session %s (%s/%s)", (rs.account_id, rs.sessionid, bytes_in,bytes_out))
accounts_bytes_cache[acc.acctf_id]=(sp_start, sp_end, bytes_in, bytes_out)
#total_time = rs.session_time - old_time
[(0, u"Входящий"),(1, u"Исходящий"),(2, u"Вх.+Исх."),(3, u"Большее направление")]
if radius_traffic.direction==0:
total_bytes=rs.bytes_in-rs.lt_bytes_in
total_bytes_value = bytes_in
elif radius_traffic.direction==1:
total_bytes=rs.bytes_out-rs.lt_bytes_out
total_bytes_value = bytes_out
elif radius_traffic.direction==2:
total_bytes=(rs.bytes_out-rs.lt_bytes_out)+(rs.bytes_in-rs.lt_bytes_in)
total_bytes_value = bytes_in+bytes_out
elif radius_traffic.direction==3: