Skip to content

Commit

Permalink
Update filestorage to store bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
berejant committed Sep 27, 2024
1 parent 5603734 commit ecd2d79
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 57 deletions.
15 changes: 8 additions & 7 deletions createdLessonsImporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"
"database/sql"
"encoding/binary"
"encoding/json"
"fmt"
dekanatEvents "github.com/kneu-messenger-pigeon/dekanat-events"
"github.com/kneu-messenger-pigeon/events"
"github.com/kneu-messenger-pigeon/fileStorage"
"github.com/segmentio/kafka-go"
"io"
"strconv"
"sync"
"time"
)
Expand Down Expand Up @@ -186,12 +186,10 @@ func (importer *CreatedLessonsImporter) pullCreatedLessons() error {

func (importer *CreatedLessonsImporter) getLessonMaxId() uint {
if importer.lessonMaxId == 0 {
var uint64Value uint64
stringValue, err := importer.storage.Get()
storageValue, err := importer.storage.Get()

if stringValue != "" {
uint64Value, err = strconv.ParseUint(stringValue, 10, 0)
importer.lessonMaxId = uint(uint64Value)
if storageValue != nil && len(storageValue) >= 8 {
importer.lessonMaxId = uint(binary.LittleEndian.Uint64(storageValue))

} else if err == nil {
// storage not exist or empty
Expand Down Expand Up @@ -220,7 +218,10 @@ func (importer *CreatedLessonsImporter) getLessonMaxId() uint {
func (importer *CreatedLessonsImporter) setLessonMaxId(newLastId uint) (err error) {
if importer.lessonMaxId < newLastId {
importer.lessonMaxId = newLastId
err = importer.storage.Set(strconv.FormatUint(uint64(newLastId), 10))

storageValue := make([]byte, 8)
binary.LittleEndian.PutUint64(storageValue, uint64(newLastId))
err = importer.storage.Set(storageValue)
if err != nil {
fmt.Fprintf(importer.out, "[%s] Failed to write LessonMaxId %v \n", t(), err)
}
Expand Down
36 changes: 22 additions & 14 deletions createdLessonsImporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"database/sql"
"encoding/binary"
"encoding/json"
"errors"
"github.com/DATA-DOG/go-sqlmock"
Expand Down Expand Up @@ -52,6 +53,12 @@ func TestExecuteImportCreatedLesson(t *testing.T) {
}
}

uintToBytes := func(i uint) []byte {
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(i))
return b
}

t.Run("New valid lesson", func(t *testing.T) {
lastLessonId := 10

Expand Down Expand Up @@ -98,8 +105,8 @@ func TestExecuteImportCreatedLesson(t *testing.T) {
dbMock.ExpectRollback()

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 107 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Get").Once().Return("", nil)
fileStorageMock.On("Set", strconv.Itoa(int(expectedEvent.Id))).Once().Return(nil)
fileStorageMock.On("Get").Once().Return([]byte{}, nil)
fileStorageMock.On("Set", uintToBytes(expectedEvent.Id)).Once().Return(nil)

writerMock := eventsMocks.NewWriterInterface(t)
writerMock.On(
Expand Down Expand Up @@ -176,8 +183,8 @@ func TestExecuteImportCreatedLesson(t *testing.T) {
dbMock.ExpectRollback()

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 185 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Get").Once().Return("", nil)
fileStorageMock.On("Set", strconv.Itoa(int(expectedEvent.Id))).Once().Return(nil)
fileStorageMock.On("Get").Once().Return(nil, nil)
fileStorageMock.On("Set", uintToBytes(expectedEvent.Id)).Once().Return(nil)

writerMock := eventsMocks.NewWriterInterface(t)
writerMock.On("WriteMessages", matchContext, mock.MatchedBy(expectLessonEventMessage(expectedEvent))).
Expand Down Expand Up @@ -262,8 +269,8 @@ func TestExecuteImportCreatedLesson(t *testing.T) {
dbMock.ExpectRollback()

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 271 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Get").Once().Return("", nil)
fileStorageMock.On("Set", strconv.Itoa(int(expectedEvent.Id))).Once().Return(nil)
fileStorageMock.On("Get").Once().Return(nil, nil)
fileStorageMock.On("Set", uintToBytes(expectedEvent.Id)).Once().Return(nil)

writerMock := eventsMocks.NewWriterInterface(t)
writerMock.On("WriteMessages", matchContext, mock.MatchedBy(expectLessonEventMessage(expectedEvent))).
Expand Down Expand Up @@ -309,7 +316,7 @@ func TestExecuteImportCreatedLesson(t *testing.T) {
out.Reset()
expectedError := errors.New("expected error")

lastLessonId := 10
lastLessonId := uint(10)

expectedEvent := events.LessonEvent{
Id: uint(lastLessonId) + 1,
Expand Down Expand Up @@ -353,8 +360,8 @@ func TestExecuteImportCreatedLesson(t *testing.T) {
dbMock.ExpectRollback()

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 362 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Get").Once().Return(strconv.Itoa(lastLessonId), nil)
fileStorageMock.On("Set", strconv.Itoa(int(expectedEvent.Id))).Once().Return(nil)
fileStorageMock.On("Get").Once().Return(uintToBytes(lastLessonId), nil)
fileStorageMock.On("Set", uintToBytes(expectedEvent.Id)).Once().Return(nil)

writerMock := eventsMocks.NewWriterInterface(t)
writerMock.On(
Expand Down Expand Up @@ -405,13 +412,13 @@ func TestImportCreatedLesson(t *testing.T) {
out.Reset()
expectedError := errors.New("expected error")

lastLessonId := 10
lastLessonId := uint(10)

db, dbMock, _ := sqlmock.New()
dbMock.ExpectBegin().WillReturnError(expectedError)

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 420 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Get").Once().Return(strconv.Itoa(lastLessonId), nil)
fileStorageMock.On("Get").Once().Return(uintToBytes(lastLessonId), nil)

writerMock := eventsMocks.NewWriterInterface(t)

Expand Down Expand Up @@ -459,7 +466,7 @@ func TestGetMaxLessonId(t *testing.T) {
expectedError := errors.New("expected error")

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 468 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Get").Once().Return("", expectedError)
fileStorageMock.On("Get").Once().Return(nil, expectedError)

createdLessonsImporter := &CreatedLessonsImporter{
out: &out,
Expand All @@ -481,7 +488,7 @@ func TestSetMaxLessonId(t *testing.T) {
expectedLessonMaxId := uint(20)

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 490 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Set", strconv.Itoa(int(expectedLessonMaxId))).Once().Return(expectedError)
fileStorageMock.On("Set", uintToBytes(expectedLessonMaxId)).Once().Return(expectedError)

createdLessonsImporter := &CreatedLessonsImporter{
out: &out,
Expand All @@ -499,7 +506,7 @@ func TestSetMaxLessonId(t *testing.T) {
expectedLessonMaxId := uint(50)

fileStorageMock := fileStorage.NewMockInterface(t)

Check failure on line 508 in createdLessonsImporter_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
fileStorageMock.On("Set", strconv.Itoa(int(expectedLessonMaxId))).Once().Return(nil)
fileStorageMock.On("Set", uintToBytes(expectedLessonMaxId)).Once().Return(nil)

createdLessonsImporter := &CreatedLessonsImporter{
out: &out,
Expand All @@ -516,6 +523,7 @@ func TestSetMaxLessonId(t *testing.T) {
err := createdLessonsImporter.setLessonMaxId(expectedLessonMaxId)
close(lessonMaxIdChan)
runtime.Gosched()
time.Sleep(time.Millisecond * 100)

assert.NoError(t, err)
assert.Equal(t, expectedLessonMaxId, actualLessonMaxId)
Expand Down
24 changes: 14 additions & 10 deletions currentYearWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package main

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/kneu-messenger-pigeon/events"
"github.com/kneu-messenger-pigeon/fileStorage"
"github.com/segmentio/kafka-go"
"io"
"strconv"
"time"
)

Expand All @@ -25,16 +26,16 @@ type CurrentYearWatcher struct {
out io.Writer
storage fileStorage.Interface
reader events.ReaderInterface
year int
year uint16
}

func (watcher *CurrentYearWatcher) Execute(ctx context.Context) {
yearString, err := watcher.storage.Get()
if err == nil && yearString != "" {
watcher.year, err = strconv.Atoi(yearString)
yearBytes, err := watcher.storage.Get()
if yearBytes != nil {
watcher.year = binary.LittleEndian.Uint16(yearBytes)
}
if watcher.year < 2022 {
watcher.year = (time.Now().Year()*12 + int(time.Now().Month()) - 8) / 12
watcher.year = uint16((time.Now().Year()*12 + int(time.Now().Month()) - 8) / 12)
}

var event events.SecondaryDbLoadedEvent
Expand All @@ -44,20 +45,23 @@ func (watcher *CurrentYearWatcher) Execute(ctx context.Context) {
if err == nil && string(m.Key) == events.SecondaryDbLoadedEventName {
err = json.Unmarshal(m.Value, &event)
if err == nil {
watcher.year = event.Year
err = watcher.storage.Set(strconv.Itoa(event.Year))
watcher.year = uint16(event.Year)

yearBytes = make([]byte, 2)
binary.LittleEndian.PutUint16(yearBytes, watcher.year)
err = watcher.storage.Set(yearBytes)
}
fmt.Fprintf(watcher.out, "[%s] New year received: %d (err: %v)\n", t(), event.Year, err)
}

if err == nil {
err = watcher.reader.CommitMessages(context.Background(), m)
} else if ctx.Err() != err {
} else if !errors.Is(err, ctx.Err()) {
fmt.Fprintf(watcher.out, "[%s] Year watcher error: %s \n", t(), err)
}
}
}

func (watcher *CurrentYearWatcher) GetYear() int {
return watcher.year
return int(watcher.year)
}
19 changes: 12 additions & 7 deletions currentYearWatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"strconv"
"testing"
"time"
)
Expand All @@ -22,7 +21,7 @@ func TestExecuteCurrentYearWatcher(t *testing.T) {

reader := mocks.NewReaderInterface(t)
storage := fileStorage.NewMockInterface(t)

Check failure on line 23 in currentYearWatcher_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
storage.On("Get").Return("", nil)
storage.On("Get").Return(nil, nil)

currentYearWatcher := CurrentYearWatcher{
out: &out,
Expand Down Expand Up @@ -71,8 +70,11 @@ func TestExecuteCurrentYearWatcher(t *testing.T) {
reader.On("CommitMessages", matchContext, message).Return(expectedError)

storage := fileStorage.NewMockInterface(t)

Check failure on line 72 in currentYearWatcher_test.go

View workflow job for this annotation

GitHub Actions / release / test / test

undefined: fileStorage.NewMockInterface
storage.On("Get").Return("2024", nil)
storage.On("Set", strconv.Itoa(expectedYear)).Return(nil)
storage.On("Get").Return(uintToBytes(2024), nil)
storage.On(
"Set",
uintToBytes(uint(expectedYear))[0:2],
).Return(nil)

currentYearWatcher := CurrentYearWatcher{
out: &out,
Expand Down Expand Up @@ -124,7 +126,7 @@ func TestExecuteCurrentYearWatcher(t *testing.T) {
}, expectedError)

storage := fileStorage.NewMockInterface(t)
storage.On("Get").Return("2024", nil)
storage.On("Get").Return(uintToBytes(2024), nil)

currentYearWatcher := CurrentYearWatcher{
out: &out,
Expand Down Expand Up @@ -172,8 +174,11 @@ func TestExecuteCurrentYearWatcher(t *testing.T) {
}, nil)

storage := fileStorage.NewMockInterface(t)
storage.On("Get").Return("2024", nil)
storage.On("Set", strconv.Itoa(expectedYear)).Return(expectedError)
storage.On("Get").Return(uintToBytes(2024), nil)
storage.On(
"Set",
uintToBytes(uint(expectedYear))[0:2],
).Return(expectedError)

currentYearWatcher := CurrentYearWatcher{
out: &out,
Expand Down
4 changes: 2 additions & 2 deletions deletedScoresImporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"time"
)

const DeletedScoreQuery = ScoreSelect + ` WHERE XI_2 IN (?) ` + ScoreSelectOrderBy
const CustomGroupDeletedScoreQuery = ScoreSelect + ` WHERE ID_ZANCG IN (?) ` + ScoreSelectOrderBy
const DeletedScoreQuery = ScoreSelect + ` WHERE XI_2 IN (?) AND ID_T_PD_CMS IS NOT NULL ` + ScoreSelectOrderBy
const CustomGroupDeletedScoreQuery = ScoreSelect + ` WHERE ID_ZANCG IN (?) AND ID_T_PD_CMS IS NOT NULL ` + ScoreSelectOrderBy

type DeletedScoresImporterInterface interface {
Execute(context context.Context)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/joho/godotenv v1.4.0
github.com/kneu-messenger-pigeon/dekanat-events v0.1.10
github.com/kneu-messenger-pigeon/events v0.1.41
github.com/kneu-messenger-pigeon/fileStorage v1.0.1
github.com/kneu-messenger-pigeon/fileStorage v1.1.2
github.com/kneu-messenger-pigeon/victoria-metrics-init v0.1.2
github.com/mattn/go-sqlite3 v1.14.16
github.com/nakagami/firebirdsql v0.9.4
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ github.com/kneu-messenger-pigeon/dekanat-events v0.1.10 h1:GMuL+pWW5MVH+Zl7cY+Nr
github.com/kneu-messenger-pigeon/dekanat-events v0.1.10/go.mod h1:KC0hflA3057dtpg2Mtpbhma9Z3n8Y43vwyygZODzO/Y=
github.com/kneu-messenger-pigeon/events v0.1.41 h1:Yzzp7oyVArzKzzI93HmLX+k3lUn8E3dKUk9uNF3le+8=
github.com/kneu-messenger-pigeon/events v0.1.41/go.mod h1:Q6X8B9gKZpKbby9gisLumvGT1xCWBM7bAFnviYi6MBQ=
github.com/kneu-messenger-pigeon/fileStorage v1.0.1 h1:9ncIZaUXDv5SwB5KtWae/CSYYye1m/z4R7hU1WmBnWQ=
github.com/kneu-messenger-pigeon/fileStorage v1.0.1/go.mod h1:v6IRKfG5C45B6Ls6781bmnLNwAE3e6ZcWuU0N55l66M=
github.com/kneu-messenger-pigeon/fileStorage v1.1.1 h1:+UnSwQ2ADQq41WemAl0LvvGMYiRGnnPPRqlSYpnDeFk=
github.com/kneu-messenger-pigeon/fileStorage v1.1.1/go.mod h1:v6IRKfG5C45B6Ls6781bmnLNwAE3e6ZcWuU0N55l66M=
github.com/kneu-messenger-pigeon/fileStorage v1.1.2 h1:kH8AklG9rTxF1t8+ZdjALizAXOh9QCYV1qEwk8pPTfA=
github.com/kneu-messenger-pigeon/fileStorage v1.1.2/go.mod h1:v6IRKfG5C45B6Ls6781bmnLNwAE3e6ZcWuU0N55l66M=
github.com/kneu-messenger-pigeon/victoria-metrics-init v0.1.2 h1:jtKQpIefoZBJzgniS4+wM/HU+IgAPkhdOZa4B52RZA0=
github.com/kneu-messenger-pigeon/victoria-metrics-init v0.1.2/go.mod h1:HxDZwQUrMLgZNdLFT1llblZ7Zj6EBpBk7+9tcU4NZCk=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
Expand Down
2 changes: 1 addition & 1 deletion timestampCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func idToBytes(id uint, isCustomGroup bool) []byte {
}

func uintToBytes(input uint) []byte {
idBytes := make([]byte, 4)
idBytes := make([]byte, 8)
binary.LittleEndian.PutUint32(idBytes, uint32(input))
return idBytes
}
16 changes: 11 additions & 5 deletions updatedScoresImporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"encoding/binary"
"encoding/json"
"fmt"
dekanatEvents "github.com/kneu-messenger-pigeon/dekanat-events"
Expand All @@ -16,7 +17,7 @@ import (

const StorageTimeFormat = time.RFC3339

const UpdateScoreQuery = ScoreSelect + ` WHERE REGDATE > ? ` + ScoreSelectOrderBy
const UpdateScoreQuery = ScoreSelect + ` WHERE REGDATE > ? AND ID_T_PD_CMS IS NOT NULL ` + ScoreSelectOrderBy

type UpdatedScoresImporterInterface interface {
Execute(context context.Context)
Expand Down Expand Up @@ -189,12 +190,14 @@ func (importer *UpdatedScoresImporter) pullUpdatedScores() error {

func (importer *UpdatedScoresImporter) getLastRegDate() time.Time {
if importer.lastRegDate.IsZero() {
stringValue, err := importer.storage.Get()
if stringValue == "" && err == nil { // storage not exist or empty. Make initial value
bytesValue, err := importer.storage.Get()
if bytesValue == nil && err == nil { // storage not exist or empty. Make initial value
importer.lastRegDate = time.Now().Add(-time.Minute)

} else if err == nil {
importer.lastRegDate, err = time.ParseInLocation(StorageTimeFormat, stringValue, time.Local)
importer.lastRegDate = time.Unix(
int64(binary.LittleEndian.Uint64(bytesValue)), 0,
)
}

if err != nil {
Expand All @@ -211,7 +214,10 @@ func (importer *UpdatedScoresImporter) setLastRegDate(newLastRegDate time.Time)
if importer.lastRegDate != newLastRegDate {
newLastRegDate.In(time.Local)
importer.lastRegDate = newLastRegDate
err = importer.storage.Set(newLastRegDate.Format(StorageTimeFormat))

value := make([]byte, 8)
binary.LittleEndian.PutUint64(value, uint64(newLastRegDate.Unix()))
err = importer.storage.Set(value)
if err != nil {
fmt.Fprintf(importer.out, "[%s] Failed to write LessonMaxId %v \n", t(), err)
}
Expand Down
Loading

0 comments on commit ecd2d79

Please sign in to comment.