Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: user suppressions adaptations for namespaces #2604

Merged
merged 4 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,7 @@ func (m *mockIdentifier) ID() string {
func (m *mockIdentifier) BasicAuth() (string, string) {
return m.token, ""
}

func (*mockIdentifier) Type() deployment.Type {
return deployment.Type(`mockType`)
}
2 changes: 1 addition & 1 deletion enterprise/suppress-user/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package suppression

type NOOP struct{}

func (*NOOP) IsSuppressedUser(_, _ string) bool {
func (*NOOP) IsSuppressedUser(_, _, _ string) bool {
return false
}
4 changes: 1 addition & 3 deletions enterprise/suppress-user/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ func (m *Factory) Setup(backendConfig backendconfig.BackendConfig) (types.UserSu
loadConfig()
ctx := context.TODO()
backendConfig.WaitForConfig(ctx)
workspaceId := backendConfig.Identity().ID()
suppressUser := &SuppressRegulationHandler{
RegulationBackendURL: configBackendURL,
RegulationsPollInterval: regulationsPollInterval,
WorkspaceID: workspaceId,
ID: backendConfig.Identity(),
pageSize: strconv.Itoa(suppressionApiPageSize),
}
suppressUser.setup(ctx)
Expand Down
82 changes: 51 additions & 31 deletions enterprise/suppress-user/suppressUser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

"github.com/cenkalti/backoff"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"

"github.com/rudderlabs/rudder-server/utils/logger"

Expand All @@ -26,8 +28,8 @@ type SuppressRegulationHandler struct {
Client *http.Client
RegulationBackendURL string
RegulationsPollInterval time.Duration
WorkspaceID string
userSpecificSuppressedSourceMap map[string]sourceFilter
ID identity.Identifier
userSpecificSuppressedSourceMap map[string]map[string]sourceFilter
regulationsSubscriberLock sync.RWMutex
suppressAPIToken string
pageSize string
Expand All @@ -47,29 +49,41 @@ type apiResponse struct {
}

type sourceRegulation struct {
Canceled bool `json:"canceled"`
UserID string `json:"userId"`
SourceIDs []string `json:"sourceIds"`
Canceled bool `json:"canceled"`
WorkspaceID string `json:"workspaceId"`
UserID string `json:"userId"`
SourceIDs []string `json:"sourceIds"`
}

func (suppressUser *SuppressRegulationHandler) setup(ctx context.Context) {
suppressUser.RegulationBackendURL = configBackendURL
switch suppressUser.ID.Type() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest this approach mainly for consistency. I also find it more idiomatic, but that is up for discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, it is considered to be a bad practice requiring from the consumer of an exported type to have to know both about the generic interface type and the concrete implementation types, since it beats the original purpose of the interface, that is abstracting from the underlying implementation(s).

case deployment.DedicatedType:
suppressUser.RegulationBackendURL += fmt.Sprintf("/dataplane/workspaces/%s/regulations/suppressions", suppressUser.ID.ID())
case deployment.MultiTenantType:
suppressUser.RegulationBackendURL += fmt.Sprintf("/dataplane/namespaces/%s/regulations/suppressions", suppressUser.ID.ID())
default:
panic("invalid deployment type")
}
rruntime.Go(func() {
suppressUser.regulationSyncLoop(ctx)
})
}

func (suppressUser *SuppressRegulationHandler) IsSuppressedUser(userID, sourceID string) bool {
func (suppressUser *SuppressRegulationHandler) IsSuppressedUser(workspaceID, userID, sourceID string) bool {
suppressUser.init()
pkgLogger.Debugf("IsSuppressedUser called for %v, %v", sourceID, userID)
pkgLogger.Debugf("IsSuppressedUser called for %v, %v, %v", workspaceID, sourceID, userID)
suppressUser.regulationsSubscriberLock.RLock()
defer suppressUser.regulationsSubscriberLock.RUnlock()
if _, ok := suppressUser.userSpecificSuppressedSourceMap[userID]; ok {
m := suppressUser.userSpecificSuppressedSourceMap[userID]
if m.all {
return true
}
if _, ok := m.specific[sourceID]; ok {
return true
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID]; ok {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userID]; ok {
m := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userID]
if m.all {
return true
}
if _, ok := m.specific[sourceID]; ok {
return true
}
}
}
return false
Expand Down Expand Up @@ -99,38 +113,43 @@ func (suppressUser *SuppressRegulationHandler) regulationSyncLoop(ctx context.Co
suppressUser.regulationsSubscriberLock.Lock()
for _, sourceRegulation := range regulations {
userId := sourceRegulation.UserID
workspaceID := sourceRegulation.WorkspaceID
_, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID]
if !ok {
suppressUser.userSpecificSuppressedSourceMap[workspaceID] = make(map[string]sourceFilter)
}
if len(sourceRegulation.SourceIDs) == 0 {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[userId]; !ok {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]; !ok {
if !sourceRegulation.Canceled {
m := sourceFilter{
all: true,
specific: map[string]struct{}{},
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
continue
}
}
m := suppressUser.userSpecificSuppressedSourceMap[userId]
m := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]
if sourceRegulation.Canceled {
m.all = false
} else {
m.all = true
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
} else {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[userId]; !ok {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]; !ok {
if !sourceRegulation.Canceled {
m := sourceFilter{
specific: map[string]struct{}{},
}
for _, srcId := range sourceRegulation.SourceIDs {
m.specific[srcId] = struct{}{}
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
continue
}
}
m := suppressUser.userSpecificSuppressedSourceMap[userId]
m := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]
if sourceRegulation.Canceled {
for _, srcId := range sourceRegulation.SourceIDs {
delete(m.specific, srcId) // will be no-op if key is not there in map
Expand All @@ -140,7 +159,7 @@ func (suppressUser *SuppressRegulationHandler) regulationSyncLoop(ctx context.Co
m.specific[srcId] = struct{}{}
}
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
}
}
suppressUser.regulationsSubscriberLock.Unlock()
Expand All @@ -152,12 +171,7 @@ func (suppressUser *SuppressRegulationHandler) regulationSyncLoop(ctx context.Co
}

func (suppressUser *SuppressRegulationHandler) getSourceRegulationsFromRegulationService() ([]sourceRegulation, error) {
if config.GetBool("HOSTED_SERVICE", false) {
pkgLogger.Info("[Regulations] Regulations on free tier are not supported at the moment.")
return []sourceRegulation{}, nil
}

urlStr := fmt.Sprintf("%s/dataplane/workspaces/%s/regulations/suppressions", suppressUser.RegulationBackendURL, suppressUser.WorkspaceID)
urlStr := suppressUser.RegulationBackendURL
urlValQuery := url.Values{}
if suppressUser.pageSize != "" {
urlValQuery.Set("pageSize", suppressUser.pageSize)
Expand All @@ -179,8 +193,7 @@ func (suppressUser *SuppressRegulationHandler) getSourceRegulationsFromRegulatio
if err != nil {
return err
}
workspaceToken := config.GetWorkspaceToken()
req.SetBasicAuth(workspaceToken, "")
req.SetBasicAuth(suppressUser.ID.BasicAuth())
req.Header.Set("Content-Type", "application/json")

resp, err = suppressUser.Client.Do(req)
Expand Down Expand Up @@ -228,6 +241,13 @@ func (suppressUser *SuppressRegulationHandler) getSourceRegulationsFromRegulatio
pkgLogger.Error("Error while parsing request: ", err, resp.StatusCode)
return []sourceRegulation{}, err
}
// TODO: remove this once regulation Service is updated
for i := range sourceRegulationsJSON.SourceRegulations {
sourceRegulation := &sourceRegulationsJSON.SourceRegulations[i]
if sourceRegulation.WorkspaceID == "" {
sourceRegulation.WorkspaceID = suppressUser.ID.ID()
}
}

if sourceRegulationsJSON.Token == "" {
pkgLogger.Errorf("[[ Workspace-config ]] No token found in the source regulations response: %v", string(respBody))
Expand All @@ -241,7 +261,7 @@ func (suppressUser *SuppressRegulationHandler) init() {
suppressUser.once.Do(func() {
pkgLogger.Info("init Regulations")
if len(suppressUser.userSpecificSuppressedSourceMap) == 0 {
suppressUser.userSpecificSuppressedSourceMap = map[string]sourceFilter{}
suppressUser.userSpecificSuppressedSourceMap = map[string]map[string]sourceFilter{}
}
if suppressUser.Client == nil {
suppressUser.Client = &http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}
Expand Down
Loading