Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
export-import improvements (#1076)
Browse files Browse the repository at this point in the history
* export imprort improvements

* remove debug stmt

* Update internal/exportimport/database/exportimport.go

Co-authored-by: Weston Haught <[email protected]>

* Update internal/exportimport/scheduler.go

Co-authored-by: Weston Haught <[email protected]>

Co-authored-by: Weston Haught <[email protected]>
  • Loading branch information
mikehelmick and whaught authored Oct 13, 2020
1 parent c4e021a commit f569bac
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 66 deletions.
18 changes: 14 additions & 4 deletions internal/exportimport/database/exportimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/exportimport/model"
"github.com/google/exposure-notifications-server/pkg/logging"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -218,12 +219,14 @@ func prepareInsertImportFile(ctx context.Context, tx pgx.Tx) (string, error) {
(export_import_id, zip_filename, discovered_at)
VALUES
($1, $2, $3)
RETURNING id
ON CONFLICT DO NOTHING
RETURNING id
`)
return stmtName, err
}

func (db *ExportImportDB) CreateFiles(ctx context.Context, ei *model.ExportImport, filenames []string) (int, error) {
logger := logging.FromContext(ctx)
insertedFiles := 0

now := time.Now().UTC()
Expand Down Expand Up @@ -258,13 +261,20 @@ func (db *ExportImportDB) CreateFiles(ctx context.Context, ei *model.ExportImpor
}

for _, fname := range filenames {
if _, ok := existing[fname]; ok {
// we've already scheduled this file previously, skip.
continue
}

result, err := tx.Exec(ctx, insertStmt, ei.ID, fname, now)
if err != nil {
return fmt.Errorf("error inserting filename: %v, %w", fname, err)
}
if result.RowsAffected() != 1 {
return fmt.Errorf("filename isnert failed: %v", fname)
logger.Warnw("attempted to insert duplicate file", "exportImportID", ei.ID)
continue
}
logger.Debugw("scheduled new export file for importing", "exportImportID", ei.ID, "filename", fname)
insertedFiles++
}
return nil
Expand All @@ -275,7 +285,7 @@ func (db *ExportImportDB) CreateFiles(ctx context.Context, ei *model.ExportImpor
return insertedFiles, nil
}

func (db *ExportImportDB) CompleteImportFile(ctx context.Context, ef *model.ImportFile) error {
func (db *ExportImportDB) CompleteImportFile(ctx context.Context, ef *model.ImportFile, status string) error {
now := time.Now().UTC()
return db.db.InTx(ctx, pgx.ReadCommitted, func(tx pgx.Tx) error {
rows, err := tx.Query(ctx, `
Expand Down Expand Up @@ -306,7 +316,7 @@ func (db *ExportImportDB) CompleteImportFile(ctx context.Context, ef *model.Impo
}
rows.Close()

ef.Status = model.ImportFileComplete
ef.Status = status
ef.ProcessedAt = &now
result, err := tx.Exec(ctx, `
UPDATE
Expand Down
2 changes: 1 addition & 1 deletion internal/exportimport/database/exportimport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestLeaseAndCompleteImportFile(t *testing.T) {
t.Fatalf("unable to lock file where lock has expired: %v", err)
}

if err := exportImportDB.CompleteImportFile(ctx, testFile); err != nil {
if err := exportImportDB.CompleteImportFile(ctx, testFile, model.ImportFileComplete); err != nil {
t.Fatalf("unable to complete import file: %v", err)
}

Expand Down
22 changes: 22 additions & 0 deletions internal/exportimport/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package exportimport
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -32,6 +33,10 @@ import (
"go.uber.org/zap"
)

var (
ErrArchiveNotFound = errors.New("archive file not found")
)

type ImportRequest struct {
config *Config
exportImport *model.ExportImport
Expand All @@ -51,6 +56,16 @@ type SignatureAndKey struct {
}

func (s *Server) ImportExportFile(ctx context.Context, ir *ImportRequest) (*ImportResposne, error) {
// Special case - previous versions may have inserted the filename root as a file.
// If we find that, skip attempted processing and just mark as successful.
if ir.exportImport.ExportRoot == ir.file.ZipFilename {
return &ImportResposne{
insertedKeys: 0,
revisedKeys: 0,
droppedKeys: 0,
}, nil
}

logger := logging.FromContext(ctx)
// Download zip file.
client := &http.Client{
Expand All @@ -61,6 +76,13 @@ func (s *Server) ImportExportFile(ctx context.Context, ir *ImportRequest) (*Impo
return nil, fmt.Errorf("error downloading export file: %w", err)
}

if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return nil, ErrArchiveNotFound
}
return nil, fmt.Errorf("unable to download file, code: %d", resp.StatusCode)
}

defer resp.Body.Close()
bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down
137 changes: 78 additions & 59 deletions internal/exportimport/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/exportimport/model"
"github.com/google/exposure-notifications-server/pkg/logging"
"go.opencensus.io/trace"
)
Expand All @@ -42,6 +43,7 @@ func (s *Server) handleImport(ctx context.Context) http.HandlerFunc {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}()
ctx = logging.WithLogger(ctx, logger)

configs, err := s.exportImportDB.ActiveConfigs(ctx)
if err != nil {
Expand All @@ -55,71 +57,88 @@ func (s *Server) handleImport(ctx context.Context) http.HandlerFunc {
return
}

// Obtain a lock to work on this import config.
unlock, err := s.db.Lock(ctx, fmt.Sprintf("%s%d", lockPrefix, config.ID), s.config.MaxRuntime)
if err != nil {
if errors.Is(err, database.ErrAlreadyLocked) {
logger.Warnw("import already locked", "config", config)
}
logger.Errorw("error locking import config", "config", config, "error", err)
continue
}
defer func() {
if err := unlock(); err != nil {
logger.Errorw("failed to unlock", "error", err)
}
}()

// Get the list of files that needs to be processed.
openFiles, err := s.exportImportDB.GetOpenImportFiles(ctx, s.config.ImportLockTime, config)
if err != nil {
logger.Errorw("unable to read open export files", "config", config, "error", err)
}
if len(openFiles) == 0 {
logger.Infow("no work to do", "config", config)
continue
if err := s.runImport(ctx, config); err != nil {
logger.Errorw("error running export-import", "config", config, "error", err)
}
}

// Read in public keys.
keys, err := s.exportImportDB.AllowedKeys(ctx, config)
if err != nil {
logger.Errorw("unable to read public keys", "config", config, "error", err)
continue
}
logger.Debugw("allowed public keys for file", "publicKeys", keys)

for _, file := range openFiles {
// Check how we're doing on max runtime.
if deadlinePassed(ctx) {
logger.Warnw("deadline passed, still work to do", "config", config)
return
}

if err := s.exportImportDB.LeaseImportFile(ctx, s.config.ImportLockTime, file); err != nil {
logger.Warnw("unexpected race condition, file already locked", "file", file, "error", err)
continue
}

// import the file.
result, err := s.ImportExportFile(ctx, &ImportRequest{
config: s.config,
exportImport: config,
keys: keys,
file: file,
})
if err != nil {
logger.Errorw("error processing import file", "error", err)
continue
}
logger.Infow("completed file import", "inserted", result.insertedKeys, "revised", result.revisedKeys, "dropped", result.droppedKeys)

if err := s.exportImportDB.CompleteImportFile(ctx, file); err != nil {
logger.Errorw("failed to mark file completed", "file", file, "error", err)
}
w.WriteHeader(http.StatusOK)
}
}

func (s *Server) runImport(ctx context.Context, config *model.ExportImport) error {
logger := logging.FromContext(ctx)

// Obtain a lock to work on this import config.
unlock, err := s.db.Lock(ctx, fmt.Sprintf("%s%d", lockPrefix, config.ID), s.config.MaxRuntime)
if err != nil {
if errors.Is(err, database.ErrAlreadyLocked) {
logger.Warnw("import already locked", "config", config)
}
logger.Errorw("error locking import config", "config", config, "error", err)
return nil
}
defer func() {
if err := unlock(); err != nil {
logger.Errorf("failed to unlock: %v", err)
}
}()

// Get the list of files that needs to be processed.
openFiles, err := s.exportImportDB.GetOpenImportFiles(ctx, s.config.ImportLockTime, config)
if err != nil {
logger.Errorw("unable to read open export files", "config", config, "error", err)
}
if len(openFiles) == 0 {
logger.Infow("no work to do", "config", config)
return nil
}

// Read in public keys.
keys, err := s.exportImportDB.AllowedKeys(ctx, config)
if err != nil {
return fmt.Errorf("unable to read public keys: %w", err)

}
logger.Debugw("allowed public keys for file", "publicKeys", keys)

for _, file := range openFiles {
// Check how we're doing on max runtime.
if deadlinePassed(ctx) {
return fmt.Errorf("deadline exceeded, work not done")
}

if err := s.exportImportDB.LeaseImportFile(ctx, s.config.ImportLockTime, file); err != nil {
logger.Warnw("unexpected race condition, file already locked", "file", file, "error", err)
return nil
}

// import the file.
status := model.ImportFileComplete
result, err := s.ImportExportFile(ctx, &ImportRequest{
config: s.config,
exportImport: config,
keys: keys,
file: file,
})
if err != nil {
if errors.Is(err, ErrArchiveNotFound) {
logger.Errorw("export file not found, marking failed", "exportImportID", config.ID, "filename", file.ZipFilename)
status = model.ImportFileFailed
} else {
return fmt.Errorf("error processing export file: %w", err)
}
}
// the not found error is passed through.
if result != nil {
logger.Infow("completed file import", "inserted", result.insertedKeys, "revised", result.revisedKeys, "dropped", result.droppedKeys)
}

if err := s.exportImportDB.CompleteImportFile(ctx, file, status); err != nil {
logger.Errorf("failed to mark file completed", "file", file, "error", err)
}
}
return nil
}

func deadlinePassed(ctx context.Context) bool {
Expand Down
15 changes: 13 additions & 2 deletions internal/exportimport/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package exportimport

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/pkg/logging"
"go.opencensus.io/trace"
)
Expand All @@ -37,6 +39,10 @@ func (s *Server) handleSchedule(ctx context.Context) http.HandlerFunc {

unlock, err := s.db.Lock(ctx, schedulerLockID, s.config.MaxRuntime)
if err != nil {
if errors.Is(err, database.ErrAlreadyLocked) {
w.WriteHeader(http.StatusOK)
return
}
logger.Warn(err)
w.WriteHeader(http.StatusInternalServerError)
return
Expand Down Expand Up @@ -82,8 +88,11 @@ func (s *Server) handleSchedule(ctx context.Context) http.HandlerFunc {
zipNames := strings.Split(string(bytes), "\n")
currentFiles := make([]string, 0, len(zipNames))
for _, zipFile := range zipNames {
if len(strings.TrimSpace(zipFile)) == 0 {
// drop blank lines.
continue
}
fullZipFile := fmt.Sprintf("%s%s", config.ExportRoot, strings.TrimSpace(zipFile))
logger.Debugw("found new export file", "zipFile", fullZipFile)
currentFiles = append(currentFiles, fullZipFile)
}

Expand All @@ -93,7 +102,9 @@ func (s *Server) handleSchedule(ctx context.Context) http.HandlerFunc {
logger.Errorw("unable to write new files", "error", err)
}
if n != 0 {
logger.Infow("found new files for export import config", "file", config.IndexFile, "newCount", n)
logger.Infow("found new files for export import config", "exportImportID", config.ID, "file", config.IndexFile, "newCount", n)
} else {
logger.Infow("no new export files", "exportImportID", config.ID, "file", config.IndexFile)
}
}

Expand Down
19 changes: 19 additions & 0 deletions migrations/000045_AddExportImportIndex.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Copyright 2020 Google LLC
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

BEGIN;

DROP INDEX export_import_filename_unique;

END;
19 changes: 19 additions & 0 deletions migrations/000045_AddExportImportIndex.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Copyright 2020 Google LLC
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

BEGIN;

CREATE UNIQUE INDEX export_import_filename_unique ON ImportFile (export_import_id, zip_filename);

END;

0 comments on commit f569bac

Please sign in to comment.