diff --git a/.github/workflows/mile-build.yml b/.github/workflows/mile-build.yml new file mode 100644 index 00000000000..e8056028888 --- /dev/null +++ b/.github/workflows/mile-build.yml @@ -0,0 +1,70 @@ +name: Mile Build +run-name: Mile Build triggered by ${{ github.actor }} + +on: + pull_request: + branches: [ "main" ] + + workflow_call: + inputs: + # deployTarget: + # required: true + # default: staging + # type: string + gitTag: + required: true + type: string + + workflow_dispatch: +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout the code + uses: actions/checkout@v3 + + # - name: Install GoLang + # uses: actions/setup-go@v3 + # with: + # go-version: '>=1.22.0' + + # - name: Deployment target + # env: + # DEPLOY_TARGET: staging + # run: | + # echo "Received input: ${{ github.event.inputs.deployTarget }}" + # + # if [ ! -z "${{ github.event.inputs.deployTarget }}" ]; then + # DEPLOY_TARGET=${{ github.event.inputs.deployTarget }} + # fi + # echo "Using environment...${DEPLOY_TARGET}" + # echo "DEPLOY_TARGET=${DEPLOY_TARGET}" >> $GITHUB_ENV + # + # shell: bash + + # - name: Set Git Tag + # run: | + # echo "GIT_TAG=${{ github.event.inputs.gitTag }}" >> $GITHUB_ENV + + - name: Install doctl + uses: digitalocean/action-doctl@v2 + with: + token: ${{ secrets.DIGITALOCEAN_ACCESS_TOKEN }} + + - name: Log in to DO Container Registry + run: doctl registry login --expiry-seconds 1200 + + # - name: Save DigitalOcean kubeconfig + # run: | + # if [ ${DEPLOY_TARGET} == 'prod' ]; then + # doctl kubernetes cluster kubeconfig save dhi-prod + # elif [ ${DEPLOY_TARGET} == 'staging' ]; then + # doctl kubernetes cluster kubeconfig save atd-dev + # else + # echo "is neither MAIN nor staging branch" + # fi + # shell: bash + + - name: Build with Make + run: make milebuild +# cd ./prebid-setup && git clone https://github.com/prebid/prebid-server.git -b $GIT_TAG && \ No newline at end of file diff --git a/Makefile b/Makefile index cf4ac52b515..7aacf86c0c0 100644 --- a/Makefile +++ b/Makefile @@ -37,3 +37,15 @@ format: # formatcheck runs format for diagnostics, without modifying the code formatcheck: ./scripts/format.sh -f false + + +VERSION:=$(shell date +%Y%m%d%H%M%S) + +# Use the GIT_TAG passed from the GitHub Actions workflow +PREBID_TAG=registry.digitalocean.com/automatad/amp/prebid-server:$(VERSION) + +# .PHONY: build + +buildmile: + docker build -t $(PREBID_TAG) . + docker push $(PREBID_TAG) diff --git a/analytics/build/build.go b/analytics/build/build.go index 4cba9a3f1a6..4a293cf179b 100644 --- a/analytics/build/build.go +++ b/analytics/build/build.go @@ -2,6 +2,8 @@ package build import ( "encoding/json" + "fmt" + "github.com/prebid/prebid-server/v2/analytics/mile" "github.com/benbjohnson/clock" "github.com/golang/glog" @@ -56,6 +58,32 @@ func New(analytics *config.Analytics) analytics.Runner { } } + if analytics.Mile.Enabled { + mileConfig := mile.BuildConfig(analytics.Mile.Scope, + analytics.Mile.Endpoint, "auction") + + fmt.Println(mileConfig) + + mileModule, err := mile.NewModuleWithConfig( + clients.GetDefaultHttpInstance(), + analytics.Mile.Scope, + analytics.Mile.Endpoint, + mileConfig, + 1, + "100", + "5s", + clock.New(), + ) + fmt.Println(mileModule) + + if err == nil { + modules["mile"] = mileModule + } else { + glog.Errorf("Could not initialize MileModule: %v", err) + + } + } + return modules } diff --git a/analytics/mile/config.go b/analytics/mile/config.go new file mode 100644 index 00000000000..4d29363ac2f --- /dev/null +++ b/analytics/mile/config.go @@ -0,0 +1,91 @@ +package mile + +import ( + "encoding/json" + "net/http" + "net/url" + "time" + + "github.com/docker/go-units" +) + +func BuildConfig(scope, endpoint string, features ...string) *Configuration { + + config := Configuration{ + ScopeID: scope, + Endpoint: endpoint, + Features: map[string]bool{ + auction: false, + video: false, + amp: false, + cookieSync: false, + setUID: false, + }, + } + for _, feature := range features { + config.Features[feature] = true + } + return &config +} + +func fetchConfig(client *http.Client, endpoint *url.URL) (*Configuration, error) { + res, err := client.Get(endpoint.String()) + if err != nil { + return nil, err + } + + defer res.Body.Close() + c := Configuration{} + err = json.NewDecoder(res.Body).Decode(&c) + if err != nil { + return nil, err + } + return &c, nil +} + +func newBufferConfig(count int, size, duration string) (*bufferConfig, error) { + pDuration, err := time.ParseDuration(duration) + if err != nil { + return nil, err + } + pSize, err := units.FromHumanSize(size) + if err != nil { + return nil, err + } + return &bufferConfig{ + pDuration, + int64(count), + pSize, + }, nil +} + +func (a *Configuration) isSameAs(b *Configuration) bool { + sameEndpoint := a.Endpoint == b.Endpoint + sameScopeID := a.ScopeID == b.ScopeID + sameFeature := len(a.Features) == len(b.Features) + for key := range a.Features { + sameFeature = sameFeature && a.Features[key] == b.Features[key] + } + return sameFeature && sameEndpoint && sameScopeID +} + +func (a *Configuration) clone() *Configuration { + c := &Configuration{ + ScopeID: a.ScopeID, + Endpoint: a.Endpoint, + Features: make(map[string]bool, len(a.Features)), + } + + for k, v := range a.Features { + c.Features[k] = v + } + + return c +} + +func (a *Configuration) disableAllFeatures() *Configuration { + for k := range a.Features { + a.Features[k] = false + } + return a +} diff --git a/analytics/mile/configupdate.go b/analytics/mile/configupdate.go new file mode 100644 index 00000000000..8cf4fd4b7bd --- /dev/null +++ b/analytics/mile/configupdate.go @@ -0,0 +1,61 @@ +package mile + +import ( + "fmt" + "net/http" + "net/url" + "time" + + "github.com/prebid/prebid-server/v2/util/task" +) + +// ConfigUpdateTask publishes configurations until the stop channel is signaled. +type ConfigUpdateTask interface { + Start(stop <-chan struct{}) <-chan *Configuration +} + +// ConfigUpdateHttpTask polls an HTTP endpoint on a specified interval and publishes configurations until +// the stop channel is signaled. +type ConfigUpdateHttpTask struct { + task *task.TickerTask + configChan chan *Configuration +} + +func NewConfigUpdateHttpTask(httpClient *http.Client, scope, endpoint, refreshInterval string) (*ConfigUpdateHttpTask, error) { + refreshDuration, err := time.ParseDuration(refreshInterval) + if err != nil { + return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubstack.configuration_refresh_delay: %v", err) + } + + endpointUrl, err := url.Parse(endpoint + "/bootstrap?scopeId=" + scope) + if err != nil { + return nil, err + } + + configChan := make(chan *Configuration) + + tr := task.NewTickerTaskFromFunc(refreshDuration, func() error { + config, err := fetchConfig(httpClient, endpointUrl) + if err != nil { + return fmt.Errorf("[pubstack] Fail to fetch remote configuration: %v", err) + } + configChan <- config + return nil + }) + + return &ConfigUpdateHttpTask{ + task: tr, + configChan: configChan, + }, nil +} + +func (t *ConfigUpdateHttpTask) Start(stop <-chan struct{}) <-chan *Configuration { + go t.task.Start() + + go func() { + <-stop + t.task.Stop() + }() + + return t.configChan +} diff --git a/analytics/mile/eventchannel/eventchannel.go b/analytics/mile/eventchannel/eventchannel.go new file mode 100644 index 00000000000..7216fd16940 --- /dev/null +++ b/analytics/mile/eventchannel/eventchannel.go @@ -0,0 +1,147 @@ +package eventchannel + +import ( + "fmt" + "github.com/prebid/prebid-server/v2/analytics/mile/helpers" + "sync" + "time" + + "github.com/benbjohnson/clock" +) + +type Metrics struct { + bufferSize int64 + eventCount int64 +} + +type Limit struct { + maxByteSize int64 + maxEventCount int64 + maxTime time.Duration +} + +type EventChannel struct { + //gz *gzip.Writer + buff []*helpers.MileAnalyticsEvent + + ch chan *helpers.MileAnalyticsEvent + endCh chan int + metrics Metrics + muxGzBuffer sync.RWMutex + send Sender + limit Limit + clock clock.Clock +} + +func NewEventChannel(sender Sender, clock clock.Clock, maxByteSize, maxEventCount int64, maxTime time.Duration) *EventChannel { + b := []*helpers.MileAnalyticsEvent{} + //gzw := gzip.NewWriter(b) + + c := EventChannel{ + //gz: gzw, + buff: b, + ch: make(chan *helpers.MileAnalyticsEvent), + endCh: make(chan int), + metrics: Metrics{}, + send: sender, + limit: Limit{maxByteSize, maxEventCount, maxTime}, + clock: clock, + } + go c.start() + return &c +} + +func (c *EventChannel) Push(event *helpers.MileAnalyticsEvent) { + c.ch <- event +} + +func (c *EventChannel) Close() { + c.endCh <- 1 +} + +func (c *EventChannel) buffer(event *helpers.MileAnalyticsEvent) { + c.muxGzBuffer.Lock() + defer c.muxGzBuffer.Unlock() + + //_, err := c.gz.Write(event) + //if err != nil { + // glog.Warning("[pubstack] fail to compress, skip the event") + // return + c.buff = append(c.buff, event) + + c.metrics.eventCount++ + //c.metrics.bufferSize += int64(len(event)) +} + +func (c *EventChannel) isBufferFull() bool { + c.muxGzBuffer.RLock() + defer c.muxGzBuffer.RUnlock() + return c.metrics.eventCount >= c.limit.maxEventCount || c.metrics.bufferSize >= c.limit.maxByteSize +} + +func (c *EventChannel) reset() { + // reset buffer + //c.gz.Reset(c.buff) + //c.buff.Reset() + + c.buff = []*helpers.MileAnalyticsEvent{} + + // reset metrics + c.metrics.eventCount = 0 + c.metrics.bufferSize = 0 +} + +func (c *EventChannel) flush() { + c.muxGzBuffer.Lock() + defer c.muxGzBuffer.Unlock() + fmt.Println(c.metrics.eventCount, "evec") + + if c.metrics.eventCount == 0 { //|| c.metrics.bufferSize == 0 { + return + } + + // reset buffers and writers + defer c.reset() + + // finish writing gzip header + //err := c.gz.Close() + //if err != nil { + // glog.Warning("[mile] fail to close gzipped buffer") + // return + //} + + // copy the current buffer to send the payload in a new thread + //payload := make([]byte, c.buff.Len()) + //_, err = c.buff.Read(payload) + //if err != nil { + // glog.Warning("[mile] fail to copy the buffer") + // return + //} + + // send events (async) + err := c.send(c.buff) + fmt.Println(err) +} + +func (c *EventChannel) start() { + ticker := c.clock.Ticker(c.limit.maxTime) + + for { + select { + case <-c.endCh: + c.flush() + return + + // event is received + case event := <-c.ch: + c.buffer(event) + if c.isBufferFull() { + c.flush() + } + + // time between 2 flushes has passed + case <-ticker.C: + c.flush() + } + } +} diff --git a/analytics/mile/eventchannel/eventchannel_test.go b/analytics/mile/eventchannel/eventchannel_test.go new file mode 100644 index 00000000000..343eb64c73e --- /dev/null +++ b/analytics/mile/eventchannel/eventchannel_test.go @@ -0,0 +1,3 @@ +package eventchannel + +// diff --git a/analytics/mile/eventchannel/sender.go b/analytics/mile/eventchannel/sender.go new file mode 100644 index 00000000000..0dbcd49966e --- /dev/null +++ b/analytics/mile/eventchannel/sender.go @@ -0,0 +1,57 @@ +package eventchannel + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/golang/glog" + "github.com/prebid/prebid-server/v2/analytics/mile/helpers" + "net/http" + "net/url" + "path" +) + +type Sender = func(payload []*helpers.MileAnalyticsEvent) error + +func NewHttpSender(client *http.Client, endpoint string) Sender { + return func(payload []*helpers.MileAnalyticsEvent) error { + + data, err := json.Marshal(payload) + if err != nil { + glog.Error(err) + return err + } + + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(data)) + if err != nil { + glog.Error(err) + return err + } + + req.Header.Set("Content-Type", "application/json") + //req.Header.Set("Content-Encoding", "gzip") + + resp, err := client.Do(req) + if err != nil { + return err + } + resp.Body.Close() + + if resp.StatusCode != http.StatusNoContent { + glog.Errorf("[mile] Wrong code received %d instead of %d", resp.StatusCode, http.StatusOK) + return fmt.Errorf("wrong code received %d instead of %d", resp.StatusCode, http.StatusOK) + } + return nil + } +} + +func BuildEndpointSender(client *http.Client, baseUrl string, module string) Sender { + fmt.Println(baseUrl) + endpoint, err := url.Parse(baseUrl) + if err != nil { + glog.Error(err) + } + endpoint.Path = path.Join(endpoint.Path, "pageview-event", "json") + + return NewHttpSender(client, endpoint.String()) +} diff --git a/analytics/mile/helpers/json.go b/analytics/mile/helpers/json.go new file mode 100644 index 00000000000..3ea0b668613 --- /dev/null +++ b/analytics/mile/helpers/json.go @@ -0,0 +1,146 @@ +package helpers + +import ( + "fmt" + "github.com/prebid/prebid-server/v2/analytics" +) + +func JsonifyAuctionObject(ao *analytics.AuctionObject, scope string) ([]MileAnalyticsEvent, error) { + //var logEntry *MileAnalyticsEvent + var events []MileAnalyticsEvent + if ao != nil { + + var bidBiders []string + //var configuredBidders []string + if ao.Response != nil { + + for _, i := range ao.Response.SeatBid { + bidBiders = append(bidBiders, i.Seat) + } + //} + } + + if ao.RequestWrapper != nil { + + for range ao.RequestWrapper.Imp { + + fmt.Println(ao.RequestWrapper.Device.Geo) + fmt.Println(ao.RequestWrapper.Site.Publisher.ID) + + logEntry := MileAnalyticsEvent{ + //SessionID: ao.RequestWrapper + Ip: ao.RequestWrapper.Device.IP, + //ClientVersion: ao.RequestWrapper.Ext. + Ua: ao.RequestWrapper.Device.UA, + ArbitraryData: "", + Device: ao.RequestWrapper.Device.Model, + Publisher: ao.RequestWrapper.Site.Publisher.Domain, + Site: ao.RequestWrapper.Site.Domain, + ReferrerURL: ao.RequestWrapper.Site.Ref, + AdvertiserName: "", + //AuctionID: ao.RequestWrapper.ID, + //Page: ao.RequestWrapper.Site.Page, + //YetiSiteID: ao.RequestWrapper.Site.ID, + //YetiPublisherID: ao.RequestWrapper.Site.Publisher.ID, + SessionID: "", + EventType: "", + Section: "", + BidBidders: bidBiders, + ConfiguredBidders: []string{}, + //Viewability: ao.RequestWrapper. + //WinningSize: ao.Response.SeatBi + + } + + events = append(events, logEntry) + } + } + + } + + //events = append(events, logEntry) + return events, nil + +} + +func JsonifyVideoObject(vo *analytics.VideoObject, scope string) (*MileAnalyticsEvent, error) { + var logEntry *MileAnalyticsEvent + if vo != nil { + //var request *openrtb2.BidRequest + //if ao.RequestWrapper != nil { + // request = ao.RequestWrapper.BidRequest + //} + logEntry = &MileAnalyticsEvent{ + //Status: ao.Status, + //Errors: ao.Errors, + //Request: request, + //Response: ao.Response, + //Account: ao.Account, + //StartTime: ao.StartTime, + //HookExecutionOutcome: ao.HookExecutionOutcome, + } + } + return logEntry, nil + +} + +func JsonifyAmpObject(ao *analytics.AmpObject, scope string) (*MileAnalyticsEvent, error) { + var logEntry *MileAnalyticsEvent + if ao != nil { + //var request *openrtb2.BidRequest + //if ao.RequestWrapper != nil { + // request = ao.RequestWrapper.BidRequest + //} + logEntry = &MileAnalyticsEvent{ + //Status: ao.Status, + //Errors: ao.Errors, + //Request: request, + //Response: ao.Response, + //Account: ao.Account, + //StartTime: ao.StartTime, + //HookExecutionOutcome: ao.HookExecutionOutcome, + } + } + return logEntry, nil + +} +func JsonifyCookieSync(cso *analytics.CookieSyncObject, scope string) (*MileAnalyticsEvent, error) { + var logEntry *MileAnalyticsEvent + if cso != nil { + //var request *openrtb2.BidRequest + //if ao.RequestWrapper != nil { + // request = ao.RequestWrapper.BidRequest + //} + logEntry = &MileAnalyticsEvent{ + //Status: ao.Status, + //Errors: ao.Errors, + //Request: request, + //Response: ao.Response, + //Account: ao.Account, + //StartTime: ao.StartTime, + //HookExecutionOutcome: ao.HookExecutionOutcome, + } + } + return logEntry, nil + +} +func JsonifySetUIDObject(so *analytics.SetUIDObject, scope string) (*MileAnalyticsEvent, error) { + var logEntry *MileAnalyticsEvent + if so != nil { + //var request *openrtb2.BidRequest + //if ao.RequestWrapper != nil { + // request = ao.RequestWrapper.BidRequest + //} + logEntry = &MileAnalyticsEvent{ + //Status: ao.Status, + //Errors: ao.Errors, + //Request: request, + //Response: ao.Response, + //Account: ao.Account, + //StartTime: ao.StartTime, + //HookExecutionOutcome: ao.HookExecutionOutcome, + } + } + return logEntry, nil + +} diff --git a/analytics/mile/helpers/model.go b/analytics/mile/helpers/model.go new file mode 100644 index 00000000000..0fd1bb8dcfc --- /dev/null +++ b/analytics/mile/helpers/model.go @@ -0,0 +1,177 @@ +package helpers + +type MileAnalyticsEvent struct { + Ip string `json:"ip"` + + ClientVersion string `json:"clientVersion"` + + Ua string `json:"ua"` + + CityName string `json:"cityName"` + + StateName string `json:"stateName"` + + CountryName string `json:"countryName"` + + ArbitraryData string `json:"arbitraryData"` + + Device string `json:"device"` + + Publisher string `json:"publisher"` + + Site string `json:"site"` + + ReferrerURL string `json:"referrerURL"` + + AdvertiserName string `json:"advertiserName"` + + AuctionID string `json:"auctionID"` + + Page string `json:"page"` + + YetiSiteID string `json:"yetiSiteID"` + + YetiPublisherID string `json:"yetiPublisherID"` + + SessionID string `json:"sessionID"` + + EventType string `json:"eventType"` + + Section string `json:"section"` + + Cls float64 `json:"cls"` + + Fcp int64 `json:"fcp"` + + Fid int64 `json:"fid"` + + Ttfb int64 `json:"ttfb"` + + BidBidders []string `json:"bidBidders"` + + ConfiguredBidders []string `json:"configuredBidders"` + + IABCategories map[string]map[string]string `json:"IABCategories"` + + SizePrice map[string]map[string]float64 `json:"sizePrice"` + + StatisticalQuantities map[string]map[string]float64 `json:"statisticalQuantities"` + + UserID string `json:"userID"` + + PageViewID string `json:"pageViewID"` + + Lcp int64 `json:"lcp"` + + Cpm float64 `json:"cpm"` + + GamAdvertiserID int64 `json:"gamAdvertiserID"` + + GptAdUnit string `json:"gptAdUnit"` + + HasAdServerWonAuction bool `json:"hasAdServerWonAuction"` + + IsInfiniteScroll bool `json:"isInfiniteScroll"` + + HasPrebidWon bool `json:"hasPrebidWon"` + + IsGAMBackFill bool `json:"isGAMBackFill"` + + NoBidBidders []string `json:"noBidBidders"` + + PseudoAdUnitCode string `json:"pseudoAdUnitCode"` + + Viewability bool `json:"viewability"` + + WinningSize string `json:"winningSize"` + + WinningBidder string `json:"winningBidder"` + + TimedOutBidder []string `json:"timedOutBidder"` + + ConfiguredTimeout int64 `json:"configuredTimeout"` + + AdUnitCode string `json:"adUnitCode"` + + IsAXT bool `json:"isAXT"` + + IsMultiSizedUnit bool `json:"isMultiSizedUnit"` + + SizesRequested []string `json:"sizesRequested"` + + Revenue float64 `json:"revenue"` + + WinningRatio float64 `json:"winningRatio"` + + Impressions int64 `json:"impressions"` + + SessionPageViewCount int64 `json:"sessionPageViewCount"` + + Utm map[string]string `json:"utm"` + + Params map[string]map[string]string `json:"params"` + + Timestamp int64 `json:"timestamp"` + + ServerTimestamp int64 `json:"serverTimestamp"` + + InsertedAt int64 `json:"insertedAt"` + + Browser string `json:"browser"` + + ResponseTimes map[string]int64 `json:"responseTimes"` + + GamRecordedCPM float64 `json:"gamRecordedCPM"` + + SspAdvertiserDomain string `json:"sspAdvertiserDomain"` + + SiteUID string `json:"siteUID"` + + FloorMeta map[string]string `json:"floorMeta"` + + RejectedSizePrice map[string]map[string]float64 `json:"rejectedSizePrice"` + + RejectedBidders []string `json:"rejectedBidders"` + + SizeFloors map[string]map[string]string `json:"sizeFloors"` + + IsNewUser bool `json:"isNewUser"` + + DerivedBrowser string `json:"derivedBrowser"` + + ExprTags map[string]string `json:"exprTags"` + + AdType string `json:"adType"` + + RefreshBucket string `json:"refreshBucket"` + + ReferrerType string `json:"referrerType"` + + HasBid bool `json:"hasBid"` + + FloorPrice float64 `json:"floorPrice"` + + Bidder string `json:"bidder"` + + PageLayout string `json:"pageLayout"` + + UnfilledCPM float64 `json:"unfilledCPM"` + + Uuid string `json:"uuid"` + + FloorMech string `json:"floorMech"` + + Brif float64 `json:"brif"` + + AfihbsVersion string `json:"afihbsVersion"` + + InitPageLayout string `json:"initPageLayout"` + + DealIDsByBidder map[string]map[string]string `json:"dealIDsByBidder"` + + UserIDVendorsByBidder map[string]string `json:"userIDVendorsByBidder"` + + DealID string `json:"dealID"` + + UserIDVendors string `json:"userIDVendors"` +} diff --git a/analytics/mile/mile_module.go b/analytics/mile/mile_module.go new file mode 100644 index 00000000000..93f88747f8e --- /dev/null +++ b/analytics/mile/mile_module.go @@ -0,0 +1,279 @@ +package mile + +import ( + "fmt" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/benbjohnson/clock" + "github.com/golang/glog" + + "github.com/prebid/prebid-server/v2/analytics" + "github.com/prebid/prebid-server/v2/analytics/mile/eventchannel" + "github.com/prebid/prebid-server/v2/analytics/mile/helpers" +) + +type Configuration struct { + ScopeID string `json:"scopeId"` + Endpoint string `json:"endpoint"` + Features map[string]bool `json:"features"` +} + +// routes for events +const ( + auction = "auction" + cookieSync = "cookiesync" + amp = "amp" + setUID = "setuid" + video = "video" +) + +type bufferConfig struct { + timeout time.Duration + count int64 + size int64 +} + +type MileModule struct { + eventChannels map[string]*eventchannel.EventChannel + httpClient *http.Client + sigTermCh chan os.Signal + stopCh chan struct{} + scope string + cfg *Configuration + buffsCfg *bufferConfig + muxConfig sync.RWMutex + clock clock.Clock +} + +func NewModule(client *http.Client, scope, endpoint, configRefreshDelay string, maxEventCount int, maxByteSize, maxTime string, clock clock.Clock) (analytics.Module, error) { + configUpdateTask, err := NewConfigUpdateHttpTask( + client, + scope, + endpoint, + configRefreshDelay) + if err != nil { + return nil, err + } + + return NewModuleWithConfigTask(client, scope, endpoint, maxEventCount, maxByteSize, maxTime, configUpdateTask, clock) +} + +func NewModuleWithConfig(client *http.Client, scope, endpoint string, config *Configuration, maxEventCount int, maxByteSize, maxTime string, clock clock.Clock) (analytics.Module, error) { + + bufferCfg, err := newBufferConfig(maxEventCount, maxByteSize, maxTime) + if err != nil { + return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubstack.buffers, :%v", err) + } + mm := MileModule{ + scope: scope, + httpClient: client, + cfg: config, + buffsCfg: bufferCfg, + sigTermCh: make(chan os.Signal), + stopCh: make(chan struct{}), + eventChannels: make(map[string]*eventchannel.EventChannel), + muxConfig: sync.RWMutex{}, + clock: clock, + } + + mm.updateConfig(config) + + return &mm, nil +} + +func NewModuleWithConfigTask(client *http.Client, scope, endpoint string, maxEventCount int, maxByteSize, maxTime string, configTask ConfigUpdateTask, clock clock.Clock) (analytics.Module, error) { + glog.Infof("[mile] Initializing module scope=%s endpoint=%s\n", scope, endpoint) + + // parse args + bufferCfg, err := newBufferConfig(maxEventCount, maxByteSize, maxTime) + if err != nil { + return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubstack.buffers, :%v", err) + } + + defaultFeatures := map[string]bool{ + auction: false, + video: false, + amp: false, + cookieSync: false, + setUID: false, + } + + defaultConfig := &Configuration{ + ScopeID: scope, + Endpoint: endpoint, + Features: defaultFeatures, + } + + mm := MileModule{ + scope: scope, + httpClient: client, + cfg: defaultConfig, + buffsCfg: bufferCfg, + sigTermCh: make(chan os.Signal), + stopCh: make(chan struct{}), + eventChannels: make(map[string]*eventchannel.EventChannel), + muxConfig: sync.RWMutex{}, + clock: clock, + } + + signal.Notify(mm.sigTermCh, os.Interrupt, syscall.SIGTERM) + + configChannel := configTask.Start(mm.stopCh) + go mm.start(configChannel) + + glog.Info("[mile] Mile analytics configured and ready") + return &mm, nil +} + +func (m *MileModule) LogAuctionObject(ao *analytics.AuctionObject) { + m.muxConfig.RLock() + defer m.muxConfig.RUnlock() + + if !m.isFeatureEnable(auction) { + return + } + + // serialize event + events, err := helpers.JsonifyAuctionObject(ao, m.scope) + + if err != nil { + glog.Warning("[mile] Cannot serialize auction") + return + } + for _, event := range events { + m.eventChannels[auction].Push(&event) + } +} + +func (m *MileModule) LogNotificationEventObject(ne *analytics.NotificationEvent) { +} + +func (m *MileModule) LogVideoObject(vo *analytics.VideoObject) { + m.muxConfig.RLock() + defer m.muxConfig.RUnlock() + + if !m.isFeatureEnable(video) { + return + } + + // serialize event + payload, err := helpers.JsonifyVideoObject(vo, m.scope) + if err != nil { + glog.Warning("[mile] Cannot serialize video") + return + } + + m.eventChannels[video].Push(payload) +} + +func (m *MileModule) LogSetUIDObject(so *analytics.SetUIDObject) { + m.muxConfig.RLock() + defer m.muxConfig.RUnlock() + + if !m.isFeatureEnable(setUID) { + return + } + + // serialize event + payload, err := helpers.JsonifySetUIDObject(so, m.scope) + if err != nil { + glog.Warning("[mile] Cannot serialize video") + return + } + + m.eventChannels[setUID].Push(payload) +} + +func (m *MileModule) LogCookieSyncObject(cso *analytics.CookieSyncObject) { + m.muxConfig.RLock() + defer m.muxConfig.RUnlock() + + if !m.isFeatureEnable(cookieSync) { + return + } + + // serialize event + payload, err := helpers.JsonifyCookieSync(cso, m.scope) + if err != nil { + glog.Warning("[mile] Cannot serialize video") + return + } + + m.eventChannels[cookieSync].Push(payload) +} + +func (m *MileModule) LogAmpObject(ao *analytics.AmpObject) { + m.muxConfig.RLock() + defer m.muxConfig.RUnlock() + + if !m.isFeatureEnable(amp) { + return + } + + // serialize event + payload, err := helpers.JsonifyAmpObject(ao, m.scope) + if err != nil { + glog.Warning("[mile] Cannot serialize video") + return + } + + m.eventChannels[amp].Push(payload) +} + +func (m *MileModule) start(c <-chan *Configuration) { + for { + select { + case <-m.sigTermCh: + close(m.stopCh) + cfg := m.cfg.clone().disableAllFeatures() + m.updateConfig(cfg) + return + case config := <-c: + m.updateConfig(config) + glog.Infof("[mile] Updating config: %v", m.cfg) + } + } +} + +func (m *MileModule) updateConfig(config *Configuration) { + m.muxConfig.Lock() + defer m.muxConfig.Unlock() + + //if m.cfg.isSameAs(config) { + // return + //} + + m.cfg = config + m.closeAllEventChannels() + + m.registerChannel(amp) + m.registerChannel(auction) + m.registerChannel(cookieSync) + m.registerChannel(video) + m.registerChannel(setUID) + +} + +func (m *MileModule) isFeatureEnable(feature string) bool { + val, ok := m.cfg.Features[feature] + return ok && val +} + +func (m *MileModule) registerChannel(feature string) { + if m.isFeatureEnable(feature) { + sender := eventchannel.BuildEndpointSender(m.httpClient, m.cfg.Endpoint, feature) + m.eventChannels[feature] = eventchannel.NewEventChannel(sender, m.clock, m.buffsCfg.size, m.buffsCfg.count, m.buffsCfg.timeout) + } +} + +func (m *MileModule) closeAllEventChannels() { + for key, ch := range m.eventChannels { + ch.Close() + delete(m.eventChannels, key) + } +} diff --git a/analytics/mile/mile_module_test.go b/analytics/mile/mile_module_test.go new file mode 100644 index 00000000000..71d0b7983aa --- /dev/null +++ b/analytics/mile/mile_module_test.go @@ -0,0 +1,47 @@ +package mile + +import ( + "fmt" + "github.com/benbjohnson/clock" + "github.com/prebid/prebid-server/v2/analytics" + "github.com/prebid/prebid-server/v2/analytics/clients" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestEventSend(t *testing.T) { + config := Configuration{ + ScopeID: "test", + Endpoint: "http://localhost:8000", + Features: map[string]bool{ + auction: true, + video: true, + amp: true, + cookieSync: true, + setUID: true, + }, + } + + module, err := NewModuleWithConfig(clients.GetDefaultHttpInstance(), + "test", + "http://localhost:8000/pageview-event/json", + &config, + 1, + "100", + "5s", + clock.New(), + ) + assert.NoError(t, err) + + fmt.Println(module) + //fmt.Println + + analyticsEvent := analytics.AuctionObject{} + module.LogAuctionObject(&analyticsEvent) + module.LogAuctionObject(&analyticsEvent) + + time.Sleep(10 * time.Second) + + //assert.ElementsMatch(t, []byte{'1', '2', '3'}, []byte(readGz(data))) +} diff --git a/config/config.go b/config/config.go index bc1e17fbc29..404e5b8d9f3 100644 --- a/config/config.go +++ b/config/config.go @@ -454,6 +454,7 @@ type Analytics struct { File FileLogs `mapstructure:"file"` Agma AgmaAnalytics `mapstructure:"agma"` Pubstack Pubstack `mapstructure:"pubstack"` + Mile Mile `mapstructure:"mile"` } type CurrencyConverter struct { @@ -507,6 +508,12 @@ type Pubstack struct { ConfRefresh string `mapstructure:"configuration_refresh_delay"` } +type Mile struct { + Enabled bool `mapstructure:"enabled"` + Scope string `mapstructure:"scopeid"` + Endpoint string `mapstructure:"endpoint"` +} + type PubstackBuffer struct { BufferSize string `mapstructure:"size"` EventCount int `mapstructure:"count"`