Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
add mirror service (#1070)
Browse files Browse the repository at this point in the history
* add mirror service

* remove unnecessary mod change

* review comments

* lint

* spellin

* all ur rebase
  • Loading branch information
mikehelmick authored Oct 13, 2020
1 parent 3fde796 commit 3df5bf7
Show file tree
Hide file tree
Showing 9 changed files with 950 additions and 0 deletions.
72 changes: 72 additions & 0 deletions cmd/mirror/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This binary provides a server that can import export files from a trusted partner.
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/google/exposure-notifications-server/internal/buildinfo"
"github.com/google/exposure-notifications-server/internal/mirror"
"github.com/google/exposure-notifications-server/internal/setup"
"github.com/google/exposure-notifications-server/pkg/logging"
"github.com/google/exposure-notifications-server/pkg/server"
"github.com/sethvargo/go-signalcontext"
)

func main() {
ctx, done := signalcontext.OnInterrupt()

debug, _ := strconv.ParseBool(os.Getenv("LOG_DEBUG"))
logger := logging.NewLogger(debug)
logger = logger.With("build_id", buildinfo.BuildID)
logger = logger.With("build_tag", buildinfo.BuildTag)

ctx = logging.WithLogger(ctx, logger)

err := realMain(ctx)
done()

if err != nil {
logger.Fatal(err)
}
}

func realMain(ctx context.Context) error {
logger := logging.FromContext(ctx)

var config mirror.Config
env, err := setup.Setup(ctx, &config)
if err != nil {
return fmt.Errorf("setup.Setup: %w", err)
}
defer env.Close(ctx)

rotationServer, err := mirror.NewServer(&config, env)
if err != nil {
return fmt.Errorf("mirror.NewServer: %w", err)
}

srv, err := server.New(config.Port)
if err != nil {
return fmt.Errorf("server.New: %w", err)
}
logger.Info("listening on: ", config.Port)

return srv.ServeHTTPHandler(ctx, rotationServer.Routes(ctx))
}
65 changes: 65 additions & 0 deletions internal/mirror/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mirror

import (
"time"

"github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/setup"
"github.com/google/exposure-notifications-server/internal/storage"
"github.com/google/exposure-notifications-server/pkg/observability"
"github.com/google/exposure-notifications-server/pkg/secrets"
)

var _ setup.DatabaseConfigProvider = (*Config)(nil)
var _ setup.ObservabilityExporterConfigProvider = (*Config)(nil)
var _ setup.BlobstoreConfigProvider = (*Config)(nil)
var _ setup.SecretManagerConfigProvider = (*Config)(nil)

type Config struct {
Database database.Config
ObservabilityExporter observability.Config
SecretManager secrets.Config
Storage storage.Config

Port string `env:"PORT, default=8080"`

// Max file sizes for download. 1mb for index files, 20mb for zip files.
MaxIndexBytes int64 `env:"MAX_INDEX_BYTES, default=1048576"`
MaxZipBytes int64 `env:"MAX_ZIP_BYTES, default=20971520"`

IndexFileDownloadTimeout time.Duration `env:"INDEX_FILE_DOWNLOAD_TIMEOUT, default=30s"`
ExportFileDownloadTimeout time.Duration `env:"EXPORT_FILE_DOWNLOAD_TIMEOUT, default=2m"`

MaxRuntime time.Duration `env:"MAX_RUNTIME, default=14m"`
MirrorLockDuration time.Duration `env:"MIRROR_LOCK_DURATION, default=15m"`
}

func (c *Config) BlobstoreConfig() *storage.Config {
return &c.Storage
}

func (c *Config) DatabaseConfig() *database.Config {
return &c.Database
}

func (c *Config) ObservabilityExporterConfig() *observability.Config {
return &c.ObservabilityExporter
}

func (c *Config) SecretManagerConfig() *secrets.Config {
return &c.SecretManager
}
232 changes: 232 additions & 0 deletions internal/mirror/database/mirror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package database is a database interface for mirror settings.
package database

import (
"context"
"fmt"

"github.com/google/exposure-notifications-server/internal/database"
"github.com/google/exposure-notifications-server/internal/mirror/model"
"github.com/jackc/pgx/v4"
)

type MirrorDB struct {
db *database.DB
}

func New(db *database.DB) *MirrorDB {
return &MirrorDB{
db: db,
}
}

func (db *MirrorDB) AddMirror(ctx context.Context, m *model.Mirror) error {
return db.db.InTx(ctx, pgx.Serializable, func(tx pgx.Tx) error {
row := tx.QueryRow(ctx, `
INSERT INTO
Mirror (index_file, export_root, cloud_storage_bucket, filename_root)
VALUES
($1, $2, $3, $4)
RETURNING id
`, m.IndexFile, m.ExportRoot, m.CloudStorageBucket, m.FilenameRoot)

if err := row.Scan(&m.ID); err != nil {
return fmt.Errorf("fetching mirror.ID: %w", err)
}
return nil
})
}

func (db *MirrorDB) DeleteMirror(ctx context.Context, m *model.Mirror) error {
return db.db.InTx(ctx, pgx.Serializable, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, `
DELETE FROM
MirrorFile
WHERE
mirror_id = $1
`, m.ID)
if err != nil {
return fmt.Errorf("failed to delete mirror files: %w", err)
}

_, err = tx.Exec(ctx, `
DELETE FROM
Mirror
WHERE
id = $1
`, m.ID)
if err != nil {
return fmt.Errorf("failed to delete mirror config: %w", err)
}

return nil
})
}

func (db *MirrorDB) Mirrors(ctx context.Context) ([]*model.Mirror, error) {
var mirrors []*model.Mirror

if err := db.db.InTx(ctx, pgx.ReadCommitted, func(tx pgx.Tx) error {
rows, err := tx.Query(ctx, `
SELECT
id, index_file, export_root, cloud_storage_bucket, filename_root
FROM
mirror
ORDER BY id
`)
if err != nil {
return fmt.Errorf("failed to list: %w", err)
}
defer rows.Close()

for rows.Next() {
if err := rows.Err(); err != nil {
return fmt.Errorf("faield to iterate: %w", err)
}

var m model.Mirror
if err := rows.Scan(&m.ID, &m.IndexFile, &m.ExportRoot, &m.CloudStorageBucket, &m.FilenameRoot); err != nil {
return fmt.Errorf("reading row: %w", err)
}
mirrors = append(mirrors, &m)
}
return nil
}); err != nil {
return nil, fmt.Errorf("listing mirror configs: %w", err)
}

return mirrors, nil
}

// SaveFiles makes the list of filenames passed in the only files that are saved on that mirrorID.
func (db *MirrorDB) SaveFiles(ctx context.Context, mirrorID int64, filenames []string) error {
const deleteName = "delete mirror file"
const insertName = "insert mirror file"

wantFiles := make(map[string]struct{}, len(filenames))
for _, fname := range filenames {
wantFiles[fname] = struct{}{}
}

return db.db.InTx(ctx, pgx.ReadCommitted, func(tx pgx.Tx) error {
// Read files selects all of the existing known files FOR UPDATE.
knownFiles, err := readFiles(ctx, tx, mirrorID)
if err != nil {
return err
}

toDelete := make([]*model.MirrorFile, 0)
// if any filenames were read that aren't in the 'filenames' list, add them to the toDelete
for _, mirrorFile := range knownFiles {
if _, ok := wantFiles[mirrorFile.Filename]; ok {
delete(wantFiles, mirrorFile.Filename)
} else {
toDelete = append(toDelete, mirrorFile)
}
}

// if any filenames need removing, delete them.
// toDelete contains items in 'knownFiles' that weren't in 'filenames'
if len(toDelete) > 0 {
if _, err := tx.Prepare(ctx, deleteName, `
DELETE FROM
MirrorFile
WHERE
mirror_id = $1 AND filename = $2
`); err != nil {
return fmt.Errorf("failed to prepare delete DB statement: %w", err)
}

for _, mf := range toDelete {
if result, err := tx.Exec(ctx, deleteName, mf.MirrorID, mf.Filename); err != nil {
return fmt.Errorf("failed to delete mirrofile: %w", err)
} else if result.RowsAffected() != 1 {
return fmt.Errorf("delete of locked row failed")
}
}
}

// Create files if they still need to be created.
// wantFiles contains items from 'filenames' that weren't in 'knownFiles'
if len(wantFiles) > 0 {
if _, err := tx.Prepare(ctx, insertName, `
INSERT INTO
MirrorFile (mirror_id, filename)
VALUES
($1, $2)
ON CONFLICT (mirror_id, filename) DO NOTHING
`); err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}

for fName := range wantFiles {
if _, err := tx.Exec(ctx, insertName, mirrorID, fName); err != nil {
return fmt.Errorf("failed to insert mirrorfile: %w", err)
}
}
}

return nil
})
}

func (db *MirrorDB) ListFiles(ctx context.Context, mirrorID int64) ([]*model.MirrorFile, error) {
var mirrorFiles []*model.MirrorFile

if err := db.db.InTx(ctx, pgx.ReadCommitted, func(tx pgx.Tx) error {
var err error
mirrorFiles, err = readFiles(ctx, tx, mirrorID)
if err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return mirrorFiles, nil
}

func readFiles(ctx context.Context, tx pgx.Tx, mirrorID int64) ([]*model.MirrorFile, error) {
var mirrorFiles []*model.MirrorFile
rows, err := tx.Query(ctx, `
SELECT
mirror_id, filename
FROM
MirrorFile
WHERE
mirror_id = $1
FOR UPDATE
`, mirrorID)
if err != nil {
return nil, fmt.Errorf("failed to list: %w", err)
}
defer rows.Close()

for rows.Next() {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate: %w", err)
}

var f model.MirrorFile
if err := rows.Scan(&f.MirrorID, &f.Filename); err != nil {
return nil, fmt.Errorf("reading row: %w", err)
}
mirrorFiles = append(mirrorFiles, &f)
}

return mirrorFiles, nil
}
Loading

0 comments on commit 3df5bf7

Please sign in to comment.