Skip to content

Commit

Permalink
change sample updated passed to detector to db-based
Browse files Browse the repository at this point in the history
  • Loading branch information
umputun committed Dec 30, 2024
1 parent 25594ff commit 46f8dfb
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 187 deletions.
66 changes: 0 additions & 66 deletions app/bot/sample_updater.go

This file was deleted.

101 changes: 0 additions & 101 deletions app/bot/sample_updater_test.go

This file was deleted.

31 changes: 14 additions & 17 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@ func execute(ctx context.Context, opts options) error {
return fmt.Errorf("can't make samples dir, %w", err)
}

// make detector with all sample files loaded
detector := makeDetector(opts)

dataFile := filepath.Join(opts.Files.DynamicDataPath, dataFile)
dataDB, err := storage.NewSqliteDB(dataFile)
dbFile := filepath.Join(opts.Files.DynamicDataPath, dataFile)
dataDB, err := storage.NewSqliteDB(dbFile)
if err != nil {
return fmt.Errorf("can't make data db file %s, %w", dataFile, err)
return fmt.Errorf("can't make data db file %s, %w", dbFile, err)
}
log.Printf("[DEBUG] data db: %s", dataFile)
log.Printf("[DEBUG] data db: %s", dbFile)

// make detector with all sample files loaded
detector := makeDetector(opts)

// make store and load approved users
approvedUsersStore, auErr := storage.NewApprovedUsers(ctx, dataDB)
Expand All @@ -227,7 +227,7 @@ func execute(ctx context.Context, opts options) error {
if err != nil {
return fmt.Errorf("can't load approved users, %w", err)
}
log.Printf("[DEBUG] approved users from: %s, loaded: %d", dataFile, count)
log.Printf("[DEBUG] approved users from: %s, loaded: %d", dbFile, count)

// make spam bot
spamBot, err := makeSpamBot(ctx, opts, dataDB, detector)
Expand Down Expand Up @@ -301,7 +301,7 @@ func execute(ctx context.Context, opts options) error {
}

log.Printf("[DEBUG] telegram listener config: {group: %s, idle: %v, super: %v, admin: %s, testing: %v, no-reply: %v,"+
" suppress: %v, dry: %v, training: %v}",
" suppress: %v, dry: %v, training: %v}",
tgListener.Group, tgListener.IdleDuration, tgListener.SuperUsers, tgListener.AdminGroup,
tgListener.TestingIDs, tgListener.NoSpamReply, tgListener.SuppressJoinMessage, tgListener.Dry,
tgListener.TrainingMode)
Expand Down Expand Up @@ -504,14 +504,6 @@ func makeDetector(opts options) *tgspam.Detector {
}
detector.WithMetaChecks(metaChecks...)

dynSpamFile := filepath.Join(opts.Files.DynamicDataPath, dynamicSpamFile)
detector.WithSpamUpdater(bot.NewSampleUpdater(dynSpamFile))
log.Printf("[DEBUG] dynamic spam file: %s", dynSpamFile)

dynHamFile := filepath.Join(opts.Files.DynamicDataPath, dynamicHamFile)
detector.WithHamUpdater(bot.NewSampleUpdater(dynHamFile))
log.Printf("[DEBUG] dynamic ham file: %s", dynHamFile)

log.Printf("[DEBUG] detector config: %+v", detectorConfig)
return detector
}
Expand Down Expand Up @@ -552,6 +544,11 @@ func makeSpamBot(ctx context.Context, opts options, dataDB *sqlx.DB, detector *t
if err := spamBot.ReloadSamples(); err != nil {
return nil, fmt.Errorf("can't relaod samples, %w", err)
}

// set detector samples updaters
detector.WithSpamUpdater(storage.NewSampleUpdater(samplesStore, storage.SampleTypeSpam, opts.StorageTimeout))
detector.WithHamUpdater(storage.NewSampleUpdater(samplesStore, storage.SampleTypeHam, opts.StorageTimeout))

return spamBot, nil
}

Expand Down
32 changes: 32 additions & 0 deletions app/storage/storage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package storage

import (
"context"
"io"
"time"

"github.com/jmoiron/sqlx"
_ "modernc.org/sqlite" // sqlite driver loaded here
)
Expand All @@ -27,3 +31,31 @@ func setSqlitePragma(db *sqlx.DB) error {
}
return nil
}

// SampleUpdater is a service to update dynamic (user's) samples for detector, either ham or spam.
type SampleUpdater struct {
SamplesService *Samples
SampleType SampleType
Timeout time.Duration
}

// NewSampleUpdater creates a new SampleUpdater
func NewSampleUpdater(samplesService *Samples, sampleType SampleType, timeout time.Duration) *SampleUpdater {
return &SampleUpdater{SamplesService: samplesService, SampleType: sampleType, Timeout: timeout}
}

// Append a message to the samples, forcing user origin
func (u *SampleUpdater) Append(msg string) error {
ctx, cancel := context.Background(), func() {}
if u.Timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), u.Timeout)
}
defer cancel()
return u.SamplesService.Add(ctx, u.SampleType, SampleOriginUser, msg)
}

// Reader returns a reader for the samples
func (u *SampleUpdater) Reader() (io.ReadCloser, error) {
// we don't want to pass context with timeout here, as it's an async operation
return u.SamplesService.Reader(context.Background(), u.SampleType, SampleOriginUser)
}
Loading

0 comments on commit 46f8dfb

Please sign in to comment.