diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 5ffefe1201..c5df420b02 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -470,6 +470,7 @@ type RouteSJsonCfg struct { type LoaderJsonDataType struct { Type *string File_name *string + Flags *[]string Fields *[]*FcTemplateJsonCfg } diff --git a/config/loaderscfg.go b/config/loaderscfg.go index 4ed55b9ada..c97d8f16d5 100644 --- a/config/loaderscfg.go +++ b/config/loaderscfg.go @@ -71,6 +71,7 @@ func NewDfltLoaderDataTypeConfig() *LoaderDataType { type LoaderDataType struct { //rename to LoaderDataType Type string Filename string + Flags utils.FlagsWithParams Fields []*FCTemplate } @@ -84,6 +85,11 @@ func (self *LoaderDataType) loadFromJsonCfg(jsnCfg *LoaderJsonDataType, separato if jsnCfg.File_name != nil { self.Filename = *jsnCfg.File_name } + if jsnCfg.Flags != nil { + if self.Flags, err = utils.FlagsWithParamsFromSlice(*jsnCfg.Flags); err != nil { + return + } + } if jsnCfg.Fields != nil { if self.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Fields, separator); err != nil { return diff --git a/data/ansible/deb_packages/main.yaml b/data/ansible/deb_packages/main.yaml index 6017a15dd4..59bd2f3319 100644 --- a/data/ansible/deb_packages/main.yaml +++ b/data/ansible/deb_packages/main.yaml @@ -229,7 +229,7 @@ # Move the file to PKG server - name: Copy the file to PKG server become: yes - shell: 'scp /var/packages/debian/incoming/{{ item }} {{ {{ pkgAddr }} }}:/tmp/' + shell: 'scp /var/packages/debian/incoming/{{ item }} {{ pkgAddr }}:/tmp/' args: chdir: /var/packages/debian/incoming/ with_items: '{{ debFileName.stdout_lines }}' diff --git a/data/ansible/rpm_packages/main.yaml b/data/ansible/rpm_packages/main.yaml index b6a0b2f4b0..5fabccc845 100644 --- a/data/ansible/rpm_packages/main.yaml +++ b/data/ansible/rpm_packages/main.yaml @@ -269,7 +269,7 @@ # Move the file to PKG server - name: Copy the file to PKG server - shell: 'scp cgr_build/RPMS/x86_64/{{ item }} {{ pkgAddr }}:/tmp/' + shell: 'scp cgr_build/RPMS/x86_64/{{ item }} {{ pkgAddr }}:/tmp/' with_items: '{{ rmpFileName.stdout_lines }}' - name: Sign with rpm --addsign the .rpm file diff --git a/loaders/loader.go b/loaders/loader.go index 195ec7df44..7fa47d77dc 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -51,6 +51,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg, lockFilename: cfg.LockFileName, fieldSep: cfg.FieldSeparator, dataTpls: make(map[string][]*config.FCTemplate), + flagsTpls: make(map[string]utils.FlagsWithParams), rdrs: make(map[string]map[string]*openedCSVFile), bufLoaderData: make(map[string][]LoaderData), dm: dm, @@ -61,6 +62,7 @@ func NewLoader(dm *engine.DataManager, cfg *config.LoaderSCfg, } for _, ldrData := range cfg.Data { ldr.dataTpls[ldrData.Type] = ldrData.Fields + ldr.flagsTpls[ldrData.Type] = ldrData.Flags ldr.rdrs[ldrData.Type] = make(map[string]*openedCSVFile) if ldrData.Filename != "" { ldr.rdrs[ldrData.Type][ldrData.Filename] = nil @@ -87,6 +89,7 @@ type Loader struct { lockFilename string fieldSep string dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate + flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read procRows int // keep here the last processed row in the file/-s bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID @@ -572,11 +575,17 @@ func (ldr *Loader) storeLoadedData(loaderType string, } // get IDs so we can reload in cache ids = append(ids, rpl.TenantID()) - if err := ldr.dm.SetRateProfile(rpl, true); err != nil { - return err + if ldr.flagsTpls[loaderType].GetBool(utils.MetaPartial) { + if err := ldr.dm.SetRateProfileRates(rpl, true); err != nil { + return err + } + } else { + if err := ldr.dm.SetRateProfile(rpl, true); err != nil { + return err + } } cacheArgs.RateProfileIDs = ids - cachePartition = utils.CacheDispatcherProfiles + cachePartition = utils.CacheRateProfiles } } } @@ -666,7 +675,8 @@ func (ldr *Loader) removeContent(loaderType, caching string) (err error) { for prevTntID = range ldr.bufLoaderData { break // have stolen the existing key in buffer } - if err = ldr.removeLoadedData(loaderType, prevTntID, caching); err != nil { + if err = ldr.removeLoadedData(loaderType, + map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}, caching); err != nil { return } delete(ldr.bufLoaderData, prevTntID) @@ -678,7 +688,8 @@ func (ldr *Loader) removeContent(loaderType, caching string) (err error) { for tntID = range ldr.bufLoaderData { break // get the first tenantID } - if err = ldr.removeLoadedData(loaderType, tntID, caching); err != nil { + if err = ldr.removeLoadedData(loaderType, + map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}, caching); err != nil { return } delete(ldr.bufLoaderData, tntID) @@ -687,183 +698,204 @@ func (ldr *Loader) removeContent(loaderType, caching string) (err error) { //removeLoadedData will remove the data from database //since we remove we don't need to compose the struct we only need the Tenant and the ID of the profile -func (ldr *Loader) removeLoadedData(loaderType, tntID, caching string) (err error) { +func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderData, caching string) (err error) { var ids []string var cacheArgs utils.ArgsCache var cachePartition string switch loaderType { case utils.MetaAttributes: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveAttributeProfile(tntIDStruct.Tenant, tntIDStruct.ID, - utils.NonTransactional, true); err != nil { - return err + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: AttributeProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveAttributeProfile(tntIDStruct.Tenant, tntIDStruct.ID, + utils.NonTransactional, true); err != nil { + return err + } + cacheArgs.AttributeProfileIDs = ids + cachePartition = utils.CacheAttributeProfiles } - cacheArgs.AttributeProfileIDs = ids - cachePartition = utils.CacheAttributeProfiles } + case utils.MetaResources: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: ResourceProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - if err := ldr.dm.RemoveResource(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { - return err + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + if err := ldr.dm.RemoveResource(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { + return err + } + cacheArgs.ResourceProfileIDs = ids + cacheArgs.ResourceIDs = ids + cachePartition = utils.CacheResourceProfiles } - cacheArgs.ResourceProfileIDs = ids - cacheArgs.ResourceIDs = ids - cachePartition = utils.CacheResourceProfiles } case utils.MetaFilters: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveFilter(tntIDStruct.Tenant, tntIDStruct.ID, - utils.NonTransactional, true); err != nil { - return err - } - cacheArgs.FilterIDs = ids - cachePartition = utils.CacheFilters + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: Filter: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveFilter(tntIDStruct.Tenant, tntIDStruct.ID, + utils.NonTransactional, true); err != nil { + return err + } + cacheArgs.FilterIDs = ids + cachePartition = utils.CacheFilters + } } case utils.MetaStats: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { - return err - } - cacheArgs.StatsQueueProfileIDs = ids - cacheArgs.StatsQueueIDs = ids - cachePartition = utils.CacheStatQueueProfiles + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: StatsQueueProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { + return err + } + cacheArgs.StatsQueueProfileIDs = ids + cacheArgs.StatsQueueIDs = ids + cachePartition = utils.CacheStatQueueProfiles + } } case utils.MetaThresholds: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { - return err - } - cacheArgs.ThresholdProfileIDs = ids - cacheArgs.ThresholdIDs = ids - cachePartition = utils.CacheThresholdProfiles + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: ThresholdProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { + return err + } + cacheArgs.ThresholdProfileIDs = ids + cacheArgs.ThresholdIDs = ids + cachePartition = utils.CacheThresholdProfiles + } } case utils.MetaRoutes: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveRouteProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - cacheArgs.RouteProfileIDs = ids - cachePartition = utils.CacheRouteProfiles + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: RouteProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveRouteProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + cacheArgs.RouteProfileIDs = ids + cachePartition = utils.CacheRouteProfiles + } } case utils.MetaChargers: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveChargerProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - cacheArgs.ChargerProfileIDs = ids - cachePartition = utils.CacheChargerProfiles + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: ChargerProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveChargerProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + cacheArgs.ChargerProfileIDs = ids + cachePartition = utils.CacheChargerProfiles + } } case utils.MetaDispatchers: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfileID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveDispatcherProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - cacheArgs.DispatcherProfileIDs = ids - cachePartition = utils.CacheDispatcherProfiles + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherProfileID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveDispatcherProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + cacheArgs.DispatcherProfileIDs = ids + cachePartition = utils.CacheDispatcherProfiles + } } case utils.MetaDispatcherHosts: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHostID: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveDispatcherHost(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional); err != nil { - return err - } - cacheArgs.DispatcherHostIDs = ids - cachePartition = utils.CacheDispatcherHosts + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: DispatcherHostID: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveDispatcherHost(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional); err != nil { + return err + } + cacheArgs.DispatcherHostIDs = ids + cachePartition = utils.CacheDispatcherHosts + } } case utils.MetaRateProfiles: - if ldr.dryRun { - utils.Logger.Info( - fmt.Sprintf("<%s-%s> DRY_RUN: RateProfileIDs: %s", - utils.LoaderS, ldr.ldrID, tntID)) - } else { - tntIDStruct := utils.NewTenantID(tntID) - // get IDs so we can reload in cache - ids = append(ids, tntID) - if err := ldr.dm.RemoveRateProfile(tntIDStruct.Tenant, - tntIDStruct.ID, utils.NonTransactional, true); err != nil { - return err - } - cacheArgs.RateProfileIDs = ids - cachePartition = utils.CacheRateProfiles + for tntID, _ := range lds { + if ldr.dryRun { + utils.Logger.Info( + fmt.Sprintf("<%s-%s> DRY_RUN: RateProfileIDs: %s", + utils.LoaderS, ldr.ldrID, tntID)) + } else { + tntIDStruct := utils.NewTenantID(tntID) + // get IDs so we can reload in cache + ids = append(ids, tntID) + if err := ldr.dm.RemoveRateProfile(tntIDStruct.Tenant, + tntIDStruct.ID, utils.NonTransactional, true); err != nil { + return err + } + cacheArgs.RateProfileIDs = ids + cachePartition = utils.CacheRateProfiles + } } } @@ -885,14 +917,16 @@ func (ldr *Loader) removeLoadedData(loaderType, tntID, caching string) (err erro return } case utils.MetaRemove: - if err = ldr.connMgr.Call(ldr.cacheConns, nil, - utils.CacheSv1RemoveItem, &utils.ArgsGetCacheItemWithArgDispatcher{ - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: cachePartition, - ItemID: tntID, - }, - }, &reply); err != nil { - return + for tntID, _ := range lds { + if err = ldr.connMgr.Call(ldr.cacheConns, nil, + utils.CacheSv1RemoveItem, &utils.ArgsGetCacheItemWithArgDispatcher{ + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: cachePartition, + ItemID: tntID, + }, + }, &reply); err != nil { + return + } } case utils.MetaClear: if err = ldr.connMgr.Call(ldr.cacheConns, nil, diff --git a/utils/consts.go b/utils/consts.go index bcdf2ada67..ee31101b98 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -766,6 +766,7 @@ const ( MetaWeekly = "*weekly" RateS = "RateS" Underline = "_" + MetaPartial = "*partial" ) // Migrator Action @@ -791,6 +792,7 @@ const ( MetaIndexes = "*indexes" MetaDispatcherProfiles = "*dispatcher_profiles" MetaRateProfiles = "*rate_profiles" + MetaRateProfileRates = "*rate_profile_rates" MetaChargerProfiles = "*charger_profiles" MetaSharedGroups = "*shared_groups" MetaThresholds = "*thresholds"