", reply.Meta.RecordMeta[1].RevisitReferenceId)
+ assert.Regexp(`warcfile:`+fileNamePattern+`:\d\d\d\d`, reply.Meta.RecordMeta[1].StorageRef)
+
+ dirHasFilesMatching(t, warcdir, "^"+fileNamePattern+".open$", 1)
+ serverAndClient.close()
+ dirHasFilesMatching(t, warcdir, "^"+fileNamePattern+"$", 1)
+}
+
+func dirHasFilesMatching(t *testing.T, dir string, pattern string, count int) bool {
+ files, err := ioutil.ReadDir(dir)
+ if err != nil {
+ panic(err)
+ }
+
+ found := 0
+ p := regexp.MustCompile(pattern)
+ for _, file := range files {
+ if p.MatchString(file.Name()) {
+ found++
+ }
+ }
+ if found != count {
+ f := ""
+ for _, ff := range files {
+ f += "\n " + ff.Name()
+ }
+ return assert.Fail(t, "Wrong number of files in '"+dir+"'", "Expected %d files to match %s, but found %d\nFiles in dir:%s", count, pattern, found, f)
+ }
+ return false
+}
+
+func rmDir(dir string) {
+ files, err := ioutil.ReadDir(dir)
+ if err != nil {
+ return
+ }
+
+ for _, file := range files {
+ fileName := warcdir + "/" + file.Name()
+ err = os.Remove(fileName)
+ if err != nil {
+ panic(err)
+ }
+ }
+ err = os.Remove(dir)
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/server/sessioncontext.go b/server/sessioncontext.go
new file mode 100644
index 0000000..88cf906
--- /dev/null
+++ b/server/sessioncontext.go
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2019 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package server
+
+import (
+ "context"
+ "fmt"
+ "github.com/nlnwa/gowarc"
+ "github.com/nlnwa/veidemann-api/go/config/v1"
+ "github.com/nlnwa/veidemann-api/go/contentwriter/v1"
+ "github.com/nlnwa/veidemann-contentwriter/database"
+ "github.com/nlnwa/veidemann-contentwriter/settings"
+ "github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+type writeSessionContext struct {
+ log zerolog.Logger
+ settings settings.Settings
+ configCache database.ConfigCache
+ meta *contentwriter.WriteRequestMeta
+ collectionConfig *config.ConfigObject
+ records map[int32]gowarc.WarcRecord
+ recordBuilders map[int32]gowarc.WarcRecordBuilder
+ payloadStarted map[int32]bool
+ rbMapSync sync.Mutex
+ canceled bool
+}
+
+func newWriteSessionContext(settings settings.Settings, configCache database.ConfigCache) *writeSessionContext {
+ return &writeSessionContext{
+ settings: settings,
+ configCache: configCache,
+ records: make(map[int32]gowarc.WarcRecord),
+ recordBuilders: make(map[int32]gowarc.WarcRecordBuilder),
+ payloadStarted: make(map[int32]bool),
+ log: log.Logger,
+ }
+}
+
+func (s *writeSessionContext) handleErr(code codes.Code, msg string, args ...interface{}) error {
+ m := fmt.Sprintf(msg, args...)
+ s.log.Error().Msg(m)
+ return status.Error(code, m)
+}
+
+func (s *writeSessionContext) setWriteRequestMeta(w *contentwriter.WriteRequestMeta) error {
+ if s.meta == nil {
+ s.log = log.With().Str("eid", w.ExecutionId).Str("uri", w.TargetUri).Logger()
+ }
+ s.meta = w
+
+ if w.CollectionRef == nil {
+ return s.handleErr(codes.InvalidArgument, "No collection id in request")
+ }
+ if w.IpAddress == "" {
+ return s.handleErr(codes.InvalidArgument, "Missing IP-address")
+ }
+
+ collectionConfig, err := s.configCache.GetConfigObject(context.TODO(), w.GetCollectionRef())
+ if err != nil {
+ msg := "Error getting collection config " + w.GetCollectionRef().GetId()
+ s.log.Error().Msg(msg)
+ return status.Error(codes.Unknown, msg)
+ }
+ s.collectionConfig = collectionConfig
+ if collectionConfig == nil || collectionConfig.Meta == nil || collectionConfig.Spec == nil {
+ return s.handleErr(codes.Unknown, "Collection with id '%s' is missing or insufficient: %s", w.CollectionRef.Id, collectionConfig.String())
+ }
+ return nil
+}
+
+func (s *writeSessionContext) writeProtocolHeader(header *contentwriter.Data) error {
+ recordBuilder, err := s.getRecordBuilder(header.RecordNum)
+ if err != nil {
+ s.cancelSession(err.Error())
+ return err
+ }
+ if recordBuilder.Size() != 0 {
+ err := s.handleErr(codes.InvalidArgument, "Header received twice")
+ s.cancelSession(err.Error())
+ return err
+ }
+ if _, err := recordBuilder.Write(header.GetData()); err != nil {
+ s.cancelSession(err.Error())
+ return err
+ }
+ return nil
+}
+
+func (s *writeSessionContext) writePayoad(payload *contentwriter.Data) error {
+ recordBuilder, err := s.getRecordBuilder(payload.RecordNum)
+ if err != nil {
+ s.cancelSession(err.Error())
+ return err
+ }
+ if !s.payloadStarted[payload.RecordNum] {
+ if _, err := recordBuilder.Write([]byte("\r\n")); err != nil {
+ s.cancelSession(err.Error())
+ return err
+ }
+ s.payloadStarted[payload.RecordNum] = true
+ }
+ if _, err := recordBuilder.Write(payload.GetData()); err != nil {
+ s.cancelSession(err.Error())
+ return err
+ }
+ return nil
+}
+
+func (s *writeSessionContext) getRecordBuilder(recordNum int32) (gowarc.WarcRecordBuilder, error) {
+ s.rbMapSync.Lock()
+ defer s.rbMapSync.Unlock()
+
+ if recordBuilder, ok := s.recordBuilders[recordNum]; ok {
+ return recordBuilder, nil
+ }
+
+ rb := gowarc.NewRecordBuilder(0,
+ //gowarc.WithStrictValidation(),
+ gowarc.WithBufferTmpDir(s.settings.WorkDir()),
+ gowarc.WithVersion(s.settings.WarcVersion()))
+ s.recordBuilders[recordNum] = rb
+ return rb, nil
+}
+
+func (s *writeSessionContext) validateSession() error {
+ for k, rb := range s.recordBuilders {
+ recordMeta, ok := s.meta.RecordMeta[k]
+ if !ok {
+ return s.handleErr(codes.InvalidArgument, "Missing metadata for record num: %d", k)
+ }
+
+ rt := ToGowarcRecordType(recordMeta.Type)
+ rb.SetRecordType(rt)
+ rb.AddWarcHeader(gowarc.WarcTargetURI, s.meta.TargetUri)
+ rb.AddWarcHeader(gowarc.WarcIPAddress, s.meta.IpAddress)
+ rb.AddWarcHeaderTime(gowarc.WarcDate, s.meta.FetchTimeStamp.AsTime())
+ rb.AddWarcHeaderInt64(gowarc.ContentLength, recordMeta.Size)
+ rb.AddWarcHeader(gowarc.ContentType, recordMeta.RecordContentType)
+ rb.AddWarcHeader(gowarc.WarcBlockDigest, recordMeta.BlockDigest)
+ if recordMeta.PayloadDigest != "" {
+ rb.AddWarcHeader(gowarc.WarcPayloadDigest, recordMeta.PayloadDigest)
+ }
+ for _, wct := range recordMeta.GetWarcConcurrentTo() {
+ rb.AddWarcHeader(gowarc.WarcConcurrentTo, "<"+wct+">")
+ }
+
+ wr, _, err := rb.Build()
+ if err != nil {
+ return s.handleErr(codes.InvalidArgument, "Error: %s", err)
+ }
+ s.records[k] = wr
+ }
+ return nil
+}
+
+func (s *writeSessionContext) cancelSession(cancelReason string) {
+ s.canceled = true
+ s.log.Debug().Msgf("Request cancelled before WARC record written. Reason %s", cancelReason)
+ for _, rb := range s.recordBuilders {
+ _ = rb.Close()
+ }
+}
diff --git a/server/warcinfogenerator.go b/server/warcinfogenerator.go
new file mode 100644
index 0000000..fbadc06
--- /dev/null
+++ b/server/warcinfogenerator.go
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2021 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package server
+
+import (
+ "fmt"
+ "github.com/nlnwa/gowarc"
+ "github.com/nlnwa/veidemann-api/go/config/v1"
+ "os"
+)
+
+func (ww *warcWriter) warcInfoGenerator(recordBuilder gowarc.WarcRecordBuilder) error {
+ payload := &gowarc.WarcFields{}
+ payload.Set("format", fmt.Sprintf("WARC File Format %d.%d", ww.settings.WarcVersion().Major(), ww.settings.WarcVersion().Minor()))
+ payload.Set("collection", ww.collectionConfig.GetMeta().GetName())
+ payload.Set("description", ww.collectionConfig.GetMeta().GetDescription())
+ if ww.subCollection != config.Collection_UNDEFINED {
+ payload.Set("subCollection", ww.subCollection.String())
+ }
+ payload.Set("isPartOf", ww.CollectionName())
+ h, e := os.Hostname()
+ if e != nil {
+ return e
+ }
+ payload.Set("host", h)
+
+ _, err := recordBuilder.WriteString(payload.String())
+ return err
+}
diff --git a/server/warcwriter.go b/server/warcwriter.go
new file mode 100644
index 0000000..a3686b0
--- /dev/null
+++ b/server/warcwriter.go
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2021 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package server
+
+import (
+ "context"
+ "github.com/nlnwa/gowarc"
+ "github.com/nlnwa/veidemann-api/go/config/v1"
+ "github.com/nlnwa/veidemann-api/go/contentwriter/v1"
+ "github.com/nlnwa/veidemann-contentwriter/database"
+ "github.com/nlnwa/veidemann-contentwriter/settings"
+ "github.com/rs/zerolog/log"
+ "google.golang.org/protobuf/types/known/timestamppb"
+ "strconv"
+ "sync"
+ "time"
+)
+
+// now is a function so that tests can override the clock.
+var now = time.Now
+
+const warcFileScheme = "warcfile"
+
+type warcWriter struct {
+ settings settings.Settings
+ collectionConfig *config.ConfigObject
+ subCollection config.Collection_SubCollectionType
+ filePrefix string
+ fileWriter *gowarc.WarcFileWriter
+ dbAdapter database.DbAdapter
+ timer *time.Timer
+ done chan interface{}
+ lock sync.Mutex
+}
+
+func newWarcWriter(s settings.Settings, db database.DbAdapter, c *config.ConfigObject, recordMeta *contentwriter.WriteRequestMeta_RecordMeta) *warcWriter {
+ collectionConfig := c.GetCollection()
+ ww := &warcWriter{
+ settings: s,
+ dbAdapter: db,
+ collectionConfig: c,
+ subCollection: recordMeta.GetSubCollection(),
+ filePrefix: createFilePrefix(c.GetMeta().GetName(), recordMeta.GetSubCollection(), now(), c.GetCollection().GetCollectionDedupPolicy()),
+ }
+ ww.initFileWriter()
+
+ rotationPolicy := collectionConfig.GetFileRotationPolicy()
+ dedupPolicy := collectionConfig.GetCollectionDedupPolicy()
+ if dedupPolicy != config.Collection_NONE && dedupPolicy < rotationPolicy {
+ rotationPolicy = dedupPolicy
+ }
+ if d, ok := timeToNextRotation(now(), rotationPolicy); ok {
+ ww.timer = time.NewTimer(d)
+ ww.done = make(chan interface{})
+ go func() {
+ for {
+ if !ww.waitForTimer(rotationPolicy) {
+ break
+ }
+ }
+ }()
+ }
+
+ return ww
+}
+
+func (ww *warcWriter) CollectionName() string {
+ return ww.filePrefix[:len(ww.filePrefix)-1]
+}
+
+func (ww *warcWriter) Write(meta *contentwriter.WriteRequestMeta, record ...gowarc.WarcRecord) (*contentwriter.WriteReply, error) {
+ ww.lock.Lock()
+ defer ww.lock.Unlock()
+ revisitKeys := make([]string, len(record))
+ for i, r := range record {
+ r := r
+ record[i], revisitKeys[i] = ww.detectRevisit(int32(i), r, meta)
+ defer func() { _ = r.Close() }()
+ }
+ results := ww.fileWriter.Write(record...)
+ var err error
+
+ reply := &contentwriter.WriteReply{
+ Meta: &contentwriter.WriteResponseMeta{
+ RecordMeta: map[int32]*contentwriter.WriteResponseMeta_RecordMeta{},
+ },
+ }
+
+ for i, res := range results {
+ recNum := int32(i)
+ rec := record[i]
+ revisitKey := revisitKeys[i]
+
+ if res.Err != nil {
+ log.Err(res.Err).Msg("Aha!!!")
+ }
+ // If writing records faild. Set err to the first error
+ if err == nil && res.Err != nil {
+ err = res.Err
+ }
+
+ if res.Err == nil && revisitKey != "" {
+ t, err := time.Parse(time.RFC3339, rec.WarcHeader().Get(gowarc.WarcDate))
+ if err != nil {
+ log.Err(err).Msg("Could not write CrawledContent to DB")
+ }
+ cr := &contentwriter.CrawledContent{
+ Digest: revisitKey,
+ WarcId: rec.WarcHeader().Get(gowarc.WarcRecordID),
+ TargetUri: meta.GetTargetUri(),
+ Date: timestamppb.New(t),
+ }
+ if err := ww.dbAdapter.WriteCrawledContent(context.TODO(), cr); err != nil {
+ log.Err(err).Msg("Could not write CrawledContent to DB")
+ }
+ }
+ storageRef := warcFileScheme + ":" + res.FileName + ":" + strconv.FormatInt(res.FileOffset, 10)
+ collectionFinalName := ww.filePrefix[:len(ww.filePrefix)-1]
+
+ reply.GetMeta().GetRecordMeta()[recNum] = &contentwriter.WriteResponseMeta_RecordMeta{
+ RecordNum: recNum,
+ Type: FromGowarcRecordType(record[i].Type()),
+ WarcId: rec.WarcHeader().Get(gowarc.WarcRecordID),
+ StorageRef: storageRef,
+ BlockDigest: rec.WarcHeader().Get(gowarc.WarcBlockDigest),
+ PayloadDigest: rec.WarcHeader().Get(gowarc.WarcPayloadDigest),
+ RevisitReferenceId: rec.WarcHeader().Get(gowarc.WarcRefersTo),
+ CollectionFinalName: collectionFinalName,
+ }
+ }
+ return reply, err
+}
+
+func (ww *warcWriter) detectRevisit(recordNum int32, record gowarc.WarcRecord, meta *contentwriter.WriteRequestMeta) (gowarc.WarcRecord, string) {
+ if record.Type() == gowarc.Response || record.Type() == gowarc.Resource {
+ digest := record.WarcHeader().Get(gowarc.WarcPayloadDigest)
+ if digest == "" {
+ digest = record.WarcHeader().Get(gowarc.WarcBlockDigest)
+ }
+ revisitKey := digest + ":" + ww.filePrefix[:len(ww.filePrefix)-1]
+ duplicate, err := ww.dbAdapter.HasCrawledContent(context.TODO(), revisitKey)
+ if err != nil {
+ log.Err(err).Msg("Failed checking for revisit, treating as new object")
+ }
+
+ if duplicate != nil {
+ log.Debug().Msgf("Detected %s as a revisit of %s",
+ record.WarcHeader().Get(gowarc.WarcRecordID), duplicate.GetWarcId())
+ ref := &gowarc.RevisitRef{
+ Profile: gowarc.ProfileIdenticalPayloadDigest,
+ TargetRecordId: duplicate.GetWarcId(),
+ TargetUri: duplicate.GetTargetUri(),
+ TargetDate: duplicate.GetDate().AsTime().In(time.UTC).Format(time.RFC3339),
+ }
+ revisit, err := record.ToRevisitRecord(ref)
+ if err != nil {
+ log.Err(err).Msg("Failed checking for revisit, treating as new object")
+ }
+
+ newRecordMeta := meta.GetRecordMeta()[recordNum]
+ newRecordMeta.Type = contentwriter.RecordType_REVISIT
+ newRecordMeta.BlockDigest = revisit.Block().BlockDigest()
+ if r, ok := revisit.Block().(gowarc.PayloadBlock); ok {
+ newRecordMeta.PayloadDigest = r.PayloadDigest()
+ }
+
+ size, err := strconv.ParseInt(revisit.WarcHeader().Get(gowarc.ContentLength), 10, 64)
+ if err != nil {
+ log.Err(err).Msg("Failed checking for revisit, treating as new object")
+ }
+ newRecordMeta.Size = size
+ meta.GetRecordMeta()[recordNum] = newRecordMeta
+ return revisit, ""
+ }
+ return record, revisitKey
+ }
+ return record, ""
+}
+
+func (ww *warcWriter) initFileWriter() {
+ log.Debug().Msgf("Initializing filewriter with dir: '%s' and file prefix: '%s'", ww.settings.WarcDir(), ww.filePrefix)
+ c := ww.collectionConfig.GetCollection()
+ namer := &gowarc.PatternNameGenerator{
+ Directory: ww.settings.WarcDir(),
+ Prefix: ww.filePrefix,
+ }
+
+ opts := []gowarc.WarcFileWriterOption{
+ gowarc.WithCompression(c.GetCompress()),
+ gowarc.WithMaxFileSize(c.GetFileSize()),
+ gowarc.WithFileNameGenerator(namer),
+ gowarc.WithWarcInfoFunc(ww.warcInfoGenerator),
+ gowarc.WithMaxConcurrentWriters(ww.settings.WarcWriterPoolSize()),
+ gowarc.WithAddWarcConcurrentToHeader(true),
+ }
+
+ ww.fileWriter = gowarc.NewWarcFileWriter(opts...)
+}
+
+func (ww *warcWriter) waitForTimer(rotationPolicy config.Collection_RotationPolicy) bool {
+ select {
+ case <-ww.done:
+ case <-ww.timer.C:
+ c := ww.collectionConfig.GetCollection()
+ prefix := createFilePrefix(ww.collectionConfig.GetMeta().GetName(), ww.subCollection, now(), c.GetCollectionDedupPolicy())
+ if prefix != ww.filePrefix {
+ ww.lock.Lock()
+ defer ww.lock.Unlock()
+ ww.filePrefix = prefix
+ if err := ww.fileWriter.Close(); err != nil {
+ log.Err(err).Msg("failed closing file writer")
+ }
+ ww.fileWriter = nil
+ ww.initFileWriter()
+ } else {
+ if err := ww.fileWriter.Rotate(); err != nil {
+ log.Err(err).Msg("failed rotating file")
+ }
+ }
+
+ if d, ok := timeToNextRotation(now(), rotationPolicy); ok {
+ ww.timer.Reset(d)
+ }
+ return true
+ }
+
+ // We still need to check the return value
+ // of Stop, because timer could have fired
+ // between the receive on done and this line.
+ if !ww.timer.Stop() {
+ <-ww.timer.C
+ }
+ return false
+}
+
+func (ww *warcWriter) Shutdown() {
+ if ww.timer != nil {
+ close(ww.done)
+ }
+ if err := ww.fileWriter.Close(); err != nil {
+ log.Err(err).Msg("failed closing file writer")
+ }
+}
+
+func timeToNextRotation(now time.Time, p config.Collection_RotationPolicy) (time.Duration, bool) {
+ var t2 time.Time
+
+ switch p {
+ case config.Collection_HOURLY:
+ t2 = time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, now.Location())
+ case config.Collection_DAILY:
+ t2 = time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
+ case config.Collection_MONTHLY:
+ t2 = time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, now.Location())
+ case config.Collection_YEARLY:
+ t2 = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, now.Location())
+ default:
+ return 0, false
+ }
+
+ d := t2.Sub(now)
+ return d, true
+}
+
+func createFileRotationKey(now time.Time, p config.Collection_RotationPolicy) string {
+ switch p {
+ case config.Collection_HOURLY:
+ return now.Format("2006010215")
+ case config.Collection_DAILY:
+ return now.Format("20060102")
+ case config.Collection_MONTHLY:
+ return now.Format("200601")
+ case config.Collection_YEARLY:
+ return now.Format("2006")
+ default:
+ return ""
+ }
+}
+
+func createFilePrefix(collectionName string, subCollection config.Collection_SubCollectionType, ts time.Time, dedupPolicy config.Collection_RotationPolicy) string {
+ if subCollection != config.Collection_UNDEFINED {
+ collectionName += "_" + subCollection.String()
+ }
+
+ dedupRotationKey := createFileRotationKey(ts, dedupPolicy)
+ if dedupRotationKey == "" {
+ return collectionName + "-"
+ } else {
+ return collectionName + "_" + dedupRotationKey + "-"
+ }
+}
diff --git a/server/warcwriterregistry.go b/server/warcwriterregistry.go
new file mode 100644
index 0000000..c194b49
--- /dev/null
+++ b/server/warcwriterregistry.go
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2021 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package server
+
+import (
+ "github.com/nlnwa/veidemann-api/go/config/v1"
+ "github.com/nlnwa/veidemann-api/go/contentwriter/v1"
+ "github.com/nlnwa/veidemann-contentwriter/database"
+ "github.com/nlnwa/veidemann-contentwriter/settings"
+ "sync"
+)
+
+type warcWriterRegistry struct {
+ settings settings.Settings
+ dbAdapter database.DbAdapter
+ warcWriters map[string]*warcWriter
+ lock sync.Mutex
+}
+
+func newWarcWriterRegistry(settings settings.Settings, db database.DbAdapter) *warcWriterRegistry {
+ return &warcWriterRegistry{settings: settings, warcWriters: make(map[string]*warcWriter), dbAdapter: db}
+}
+
+func (w *warcWriterRegistry) GetWarcWriter(collectionConf *config.ConfigObject, recordMeta *contentwriter.WriteRequestMeta_RecordMeta) *warcWriter {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ key := collectionConf.GetMeta().GetName() + "#" + recordMeta.GetSubCollection().String()
+ if ww, ok := w.warcWriters[key]; ok {
+ return ww
+ }
+
+ ww := newWarcWriter(w.settings, w.dbAdapter, collectionConf, recordMeta)
+ w.warcWriters[key] = ww
+ return ww
+}
+
+func (w *warcWriterRegistry) Shutdown() {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ for _, ww := range w.warcWriters {
+ ww.Shutdown()
+ }
+}
diff --git a/server/warcwriterregistry_test.go b/server/warcwriterregistry_test.go
new file mode 100644
index 0000000..528d581
--- /dev/null
+++ b/server/warcwriterregistry_test.go
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2021 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package server
+
+import (
+ "github.com/nlnwa/gowarc"
+ "github.com/nlnwa/veidemann-api/go/config/v1"
+ "github.com/stretchr/testify/assert"
+ "regexp"
+ "testing"
+ "time"
+)
+
+func Test_timeToNextRotation(t *testing.T) {
+ ts1 := time.Date(2021, 1, 1, 0, 0, 0, 0, time.Local)
+ ts2 := time.Date(2021, 11, 20, 11, 29, 59, 0, time.Local)
+ ts3 := time.Date(2021, 12, 31, 23, 59, 59, 0, time.Local)
+ type args struct {
+ now time.Time
+ p config.Collection_RotationPolicy
+ }
+ tests := []struct {
+ name string
+ args args
+ want time.Duration
+ wantOk bool
+ }{
+ {"none", args{ts1, config.Collection_NONE}, 0, false},
+ {"none", args{ts2, config.Collection_NONE}, 0, false},
+ {"none", args{ts3, config.Collection_NONE}, 0, false},
+ {"hourly", args{ts1, config.Collection_HOURLY}, time.Minute * 60, true},
+ {"hourly", args{ts2, config.Collection_HOURLY}, time.Minute*30 + time.Second*1, true},
+ {"hourly", args{ts3, config.Collection_HOURLY}, time.Second * 1, true},
+ {"daily", args{ts1, config.Collection_DAILY}, time.Hour * 24, true},
+ {"daily", args{ts2, config.Collection_DAILY}, time.Hour*12 + time.Minute*30 + time.Second*1, true},
+ {"daily", args{ts3, config.Collection_DAILY}, time.Second * 1, true},
+ {"monthly", args{ts1, config.Collection_MONTHLY}, time.Hour * 24 * 31, true},
+ {"monthly", args{ts2, config.Collection_MONTHLY}, time.Hour*(24*10+12) + time.Minute*30 + time.Second*1, true},
+ {"monthly", args{ts3, config.Collection_MONTHLY}, time.Second * 1, true},
+ {"yearly", args{ts1, config.Collection_YEARLY}, time.Hour * 24 * 365, true},
+ {"yearly", args{ts2, config.Collection_YEARLY}, time.Hour*(24*(10+31)+12) + time.Minute*30 + time.Second*1, true},
+ {"yearly", args{ts3, config.Collection_YEARLY}, time.Second * 1, true},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, got1 := timeToNextRotation(tt.args.now, tt.args.p)
+ if got != tt.want {
+ t.Errorf("timeToNextRotation() got = %v, want %v", got, tt.want)
+ }
+ if got1 != tt.wantOk {
+ t.Errorf("timeToNextRotation() got1 = %v, want %v", got1, tt.wantOk)
+ }
+ })
+ }
+}
+
+func Test_createFileRotationKey(t *testing.T) {
+ ts1 := time.Date(2021, 1, 1, 0, 0, 0, 0, time.Local)
+ ts2 := time.Date(2021, 11, 20, 11, 29, 59, 0, time.Local)
+ ts3 := time.Date(2021, 12, 31, 23, 59, 59, 0, time.Local)
+ type args struct {
+ now time.Time
+ p config.Collection_RotationPolicy
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {"none", args{ts1, config.Collection_NONE}, ""},
+ {"none", args{ts2, config.Collection_NONE}, ""},
+ {"none", args{ts3, config.Collection_NONE}, ""},
+ {"hourly", args{ts1, config.Collection_HOURLY}, "2021010100"},
+ {"hourly", args{ts2, config.Collection_HOURLY}, "2021112011"},
+ {"hourly", args{ts3, config.Collection_HOURLY}, "2021123123"},
+ {"daily", args{ts1, config.Collection_DAILY}, "20210101"},
+ {"daily", args{ts2, config.Collection_DAILY}, "20211120"},
+ {"daily", args{ts3, config.Collection_DAILY}, "20211231"},
+ {"monthly", args{ts1, config.Collection_MONTHLY}, "202101"},
+ {"monthly", args{ts2, config.Collection_MONTHLY}, "202111"},
+ {"monthly", args{ts3, config.Collection_MONTHLY}, "202112"},
+ {"yearly", args{ts1, config.Collection_YEARLY}, "2021"},
+ {"yearly", args{ts2, config.Collection_YEARLY}, "2021"},
+ {"yearly", args{ts3, config.Collection_YEARLY}, "2021"},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := createFileRotationKey(tt.args.now, tt.args.p); got != tt.want {
+ t.Errorf("createFileRotationKey() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_getFilename(t *testing.T) {
+ ng := &gowarc.PatternNameGenerator{
+ Directory: "",
+ Prefix: createFilePrefix("foo", config.Collection_UNDEFINED, time.Now(), config.Collection_NONE),
+ Serial: 0,
+ }
+ d1, f1 := ng.NewWarcfileName()
+ d2, f2 := ng.NewWarcfileName()
+ assert.Regexp(t, regexp.MustCompile(`foo-\d{14}-0001-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f1)
+ assert.Equal(t, "", d1)
+ assert.Regexp(t, regexp.MustCompile(`foo-\d{14}-0002-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f2)
+ assert.Equal(t, "", d2)
+
+ ng = &gowarc.PatternNameGenerator{
+ Directory: "",
+ Prefix: createFilePrefix("foo", config.Collection_UNDEFINED, time.Now(), config.Collection_YEARLY),
+ Serial: 0,
+ }
+ d1, f1 = ng.NewWarcfileName()
+ d2, f2 = ng.NewWarcfileName()
+ assert.Regexp(t, regexp.MustCompile(`foo_\d{4}-\d{14}-0001-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f1)
+ assert.Equal(t, "", d1)
+ assert.Regexp(t, regexp.MustCompile(`foo_\d{4}-\d{14}-0002-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f2)
+ assert.Equal(t, "", d2)
+
+ ng = &gowarc.PatternNameGenerator{
+ Directory: "myDir",
+ Prefix: createFilePrefix("foo", config.Collection_DNS, time.Now(), config.Collection_MONTHLY),
+ Serial: 0,
+ }
+ d1, f1 = ng.NewWarcfileName()
+ d2, f2 = ng.NewWarcfileName()
+ assert.Regexp(t, regexp.MustCompile(`foo_DNS_\d{6}-\d{14}-0001-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f1)
+ assert.Equal(t, "myDir", d1)
+ assert.Regexp(t, regexp.MustCompile(`foo_DNS_\d{6}-\d{14}-0002-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f2)
+ assert.Equal(t, "myDir", d2)
+
+ ng = &gowarc.PatternNameGenerator{
+ Directory: "myDir",
+ Prefix: createFilePrefix("foo", config.Collection_DNS, time.Now(), config.Collection_DAILY),
+ Serial: 0,
+ }
+ d1, f1 = ng.NewWarcfileName()
+ d2, f2 = ng.NewWarcfileName()
+ assert.Regexp(t, regexp.MustCompile(`foo_DNS_\d{8}-\d{14}-0001-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f1)
+ assert.Equal(t, "myDir", d1)
+ assert.Regexp(t, regexp.MustCompile(`foo_DNS_\d{8}-\d{14}-0002-\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.warc`), f2)
+ assert.Equal(t, "myDir", d2)
+}
diff --git a/settings/mock.go b/settings/mock.go
new file mode 100644
index 0000000..9a6ad6b
--- /dev/null
+++ b/settings/mock.go
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2021 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package settings
+
+import "github.com/nlnwa/gowarc"
+
+type Mock struct {
+ hostName string
+ warcDir string
+ warcWriterPoolSize int
+ workDir string
+ terminationGracePeriodSeconds int
+}
+
+func NewMock(warcDir string, warcWriterPoolSize int) *Mock {
+ return &Mock{warcDir: warcDir, warcWriterPoolSize: warcWriterPoolSize}
+}
+
+func (m Mock) HostName() string {
+ return m.hostName
+}
+
+func (m Mock) WarcDir() string {
+ return m.warcDir
+}
+
+func (m Mock) WarcWriterPoolSize() int {
+ return m.warcWriterPoolSize
+}
+
+func (m Mock) WorkDir() string {
+ return m.workDir
+}
+
+func (m Mock) TerminationGracePeriodSeconds() int {
+ return m.terminationGracePeriodSeconds
+}
+
+func (m Mock) WarcVersion() *gowarc.WarcVersion {
+ return gowarc.V1_1
+}
diff --git a/settings/settings.go b/settings/settings.go
new file mode 100644
index 0000000..aa6b344
--- /dev/null
+++ b/settings/settings.go
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2021 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package settings
+
+import (
+ "github.com/nlnwa/gowarc"
+ "github.com/spf13/viper"
+)
+
+type Settings interface {
+ HostName() string
+ WarcDir() string
+ WarcWriterPoolSize() int
+ WorkDir() string
+ TerminationGracePeriodSeconds() int
+ WarcVersion() *gowarc.WarcVersion
+}
+
+type ViperSettings struct{}
+
+func (s ViperSettings) HostName() string {
+ return viper.GetString("host-name")
+}
+
+func (s ViperSettings) WarcDir() string {
+ return viper.GetString("warc-dir")
+}
+
+func (s ViperSettings) WarcWriterPoolSize() int {
+ return viper.GetInt("warc-writer-pool-size")
+}
+
+func (s ViperSettings) WorkDir() string {
+ return viper.GetString("work-dir")
+}
+
+func (s ViperSettings) TerminationGracePeriodSeconds() int {
+ return viper.GetInt("termination-grace-period-seconds")
+}
+
+func (s ViperSettings) WarcVersion() *gowarc.WarcVersion {
+ return gowarc.V1_1
+}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/ApiServer.java b/src/main/java/no/nb/nna/veidemann/contentwriter/ApiServer.java
deleted file mode 100644
index 9a578b4..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/ApiServer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package no.nb.nna.veidemann.contentwriter;
-
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.opentracing.contrib.ServerTracingInterceptor;
-import io.opentracing.util.GlobalTracer;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollectionRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class ApiServer implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(ApiServer.class);
- private final Server server;
- private final ExecutorService threadPool;
- private int shutdownTimeoutSeconds = 60;
-
-
- /**
- * Construct a new REST API server.
- */
- public ApiServer(int port, int shutdownTimeoutSeconds, WarcCollectionRegistry warcCollectionRegistry) {
- this(ServerBuilder.forPort(port), warcCollectionRegistry);
- this.shutdownTimeoutSeconds = shutdownTimeoutSeconds;
- }
-
- public ApiServer(ServerBuilder> serverBuilder, WarcCollectionRegistry warcCollectionRegistry) {
-
- ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor.Builder(GlobalTracer.get())
- .withTracedAttributes(ServerTracingInterceptor.ServerRequestAttribute.CALL_ATTRIBUTES,
- ServerTracingInterceptor.ServerRequestAttribute.METHOD_TYPE)
- .build();
-
- serverBuilder.intercept(tracingInterceptor);
-
- threadPool = Executors.newCachedThreadPool();
- serverBuilder.executor(threadPool);
-
- server = serverBuilder.addService(new ContentWriterService(warcCollectionRegistry)).build();
- }
-
- public ApiServer start() {
- try {
- server.start();
-
- LOG.info("Content Writer api listening on {}", server.getPort());
-
- return this;
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-
- @Override
- public void close() {
- long startTime = System.currentTimeMillis();
- server.shutdown();
- try {
- server.awaitTermination();
- } catch (InterruptedException e) {
- server.shutdownNow();
- }
- threadPool.shutdown();
- long timeoutSeconds = shutdownTimeoutSeconds - ((System.currentTimeMillis() - startTime) / 1000);
- try {
- threadPool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- threadPool.shutdownNow();
- }
- }
-
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/ContentBuffer.java b/src/main/java/no/nb/nna/veidemann/contentwriter/ContentBuffer.java
deleted file mode 100644
index 743d95a..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/ContentBuffer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import com.google.protobuf.ByteString;
-import no.nb.nna.veidemann.commons.util.Sha1Digest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static io.netty.handler.codec.http.HttpConstants.CR;
-import static io.netty.handler.codec.http.HttpConstants.LF;
-
-/**
- *
- */
-public class ContentBuffer implements AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ContentBuffer.class);
-
- static final byte[] CRLF = {CR, LF};
-
- private final static String EMPTY_DIGEST_STRING = "sha1:da39a3ee5e6b4b0d3255bfef95601890afd80709";
-
- private final Sha1Digest blockDigest;
- private Sha1Digest payloadDigest;
- private Sha1Digest headerDigest;
-
- private ByteString headerBuf;
- private ByteString payloadBuf;
-
- private final String warcId;
-
- public ContentBuffer() {
- this.blockDigest = new Sha1Digest();
- this.warcId = Util.createIdentifier();
- }
-
- public void setHeader(ByteString header) {
- this.headerBuf = header;
- updateDigest(headerBuf, blockDigest);
-
- // Get the partial result after creating a digest of the headers
- headerDigest = blockDigest.clone();
- }
-
- public void addPayload(ByteString payload) {
- if (payloadBuf == null) {
- payloadBuf = payload;
- if (hasHeader()) {
- // Add the payload separator to the digest
- blockDigest.update(CRLF);
- payloadDigest = new Sha1Digest();
- }
- } else {
- payloadBuf = payloadBuf.concat(payload);
- }
- updateDigest(payload, blockDigest, payloadDigest);
- }
-
- private void updateDigest(ByteString buf, Sha1Digest... digests) {
- for (Sha1Digest d : digests) {
- if (d != null) {
- d.update(buf);
- }
- }
- }
-
- public String getBlockDigest() {
- return blockDigest.getPrefixedDigestString();
- }
-
- public String getPayloadDigest() {
- if (hasHeader()) {
- if (payloadDigest == null) {
- return EMPTY_DIGEST_STRING;
- } else {
- return payloadDigest.getPrefixedDigestString();
- }
- }
- return "";
- }
-
- public String getHeaderDigest() {
- if (headerDigest == null) {
- return EMPTY_DIGEST_STRING;
- }
- return headerDigest.getPrefixedDigestString();
- }
-
- public long getPayloadSize() {
- return payloadBuf == null ? 0 : payloadBuf.size();
- }
-
- public long getHeaderSize() {
- return headerBuf == null ? 0 : headerBuf.size();
- }
-
- public long getTotalSize() {
- return getHeaderSize() + getPayloadSize() + (hasHeader() && hasPayload() ? 2L : 0L);
- }
-
- public ByteString getHeader() {
- return headerBuf;
- }
-
- public ByteString getPayload() {
- return payloadBuf;
- }
-
- public void removeHeader() {
- payloadBuf = null;
- }
-
- public void removePayload() {
- payloadBuf = null;
- }
-
- public boolean hasHeader() {
- return headerBuf != null;
- }
-
- public boolean hasPayload() {
- return payloadBuf != null;
- }
-
- public String getWarcId() {
- return warcId;
- }
-
- public void close() {
- // Clean up resources
- headerBuf = null;
- payloadBuf = null;
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/ContentWriter.java b/src/main/java/no/nb/nna/veidemann/contentwriter/ContentWriter.java
deleted file mode 100644
index 35f6714..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/ContentWriter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigBeanFactory;
-import com.typesafe.config.ConfigException;
-import com.typesafe.config.ConfigFactory;
-import no.nb.nna.veidemann.commons.db.DbException;
-import no.nb.nna.veidemann.commons.db.DbService;
-import no.nb.nna.veidemann.commons.opentracing.TracerFactory;
-import no.nb.nna.veidemann.contentwriter.settings.Settings;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollectionRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class for launching the service.
- */
-public class ContentWriter {
-
- private static final Logger LOG = LoggerFactory.getLogger(ContentWriter.class);
-
- private static final Settings SETTINGS;
-
- static {
- Config config = ConfigFactory.load();
- config.checkValid(ConfigFactory.defaultReference());
- SETTINGS = ConfigBeanFactory.create(config, Settings.class);
-
- TracerFactory.init("ContentWriter");
- }
-
- /**
- * Create a new ContentWriter service.
- */
- public ContentWriter() {
- }
-
- /**
- * Start the service.
- *
- *
- * @return this instance
- */
- public ContentWriter start() {
- try (DbService db = DbService.configure(SETTINGS);
- WarcCollectionRegistry warcCollectionRegistry = new WarcCollectionRegistry();
- ApiServer apiServer = new ApiServer(SETTINGS.getApiPort(), SETTINGS.getTerminationGracePeriodSeconds(), warcCollectionRegistry)) {
-
- registerShutdownHook();
-
- apiServer.start();
-
- LOG.info("Veidemann Content Writer (v. {}) started",
- ContentWriter.class.getPackage().getImplementationVersion());
-
- try {
- Thread.currentThread().join();
- } catch (InterruptedException ex) {
- // Interrupted, shut down
- }
- } catch (ConfigException | DbException ex) {
- LOG.error("Configuration error: {}", ex.getLocalizedMessage());
- System.exit(1);
- }
-
- return this;
- }
-
- private void registerShutdownHook() {
- Thread mainThread = Thread.currentThread();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- System.err.println("*** shutting down since JVM is shutting down");
- mainThread.interrupt();
- try {
- mainThread.join();
- } catch (InterruptedException e) {
- //
- }
- }));
- }
-
- /**
- * Get the settings object.
- *
- *
- * @return the settings
- */
- public static Settings getSettings() {
- return SETTINGS;
- }
-
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/ContentWriterService.java b/src/main/java/no/nb/nna/veidemann/contentwriter/ContentWriterService.java
deleted file mode 100644
index 294c7fa..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/ContentWriterService.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.stub.StreamObserver;
-import no.nb.nna.veidemann.api.contentwriter.v1.ContentWriterGrpc;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteReply;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequest;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteResponseMeta;
-import no.nb.nna.veidemann.contentwriter.WriteSessionContext.RecordData;
-import no.nb.nna.veidemann.contentwriter.warc.SingleWarcWriter;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollection;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollection.Instance;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollectionRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- *
- */
-public class ContentWriterService extends ContentWriterGrpc.ContentWriterImplBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(ContentWriterService.class);
-
- private final WarcCollectionRegistry warcCollectionRegistry;
-
- public ContentWriterService(WarcCollectionRegistry warcCollectionRegistry) {
- this.warcCollectionRegistry = warcCollectionRegistry;
- }
-
- @Override
- public StreamObserver write(StreamObserver responseObserver) {
- return new StreamObserver<>() {
- private final WriteSessionContext context = new WriteSessionContext();
-
- @Override
- public void onNext(WriteRequest value) {
- try {
- context.initMDC();
-
- ContentBuffer contentBuffer;
- switch (value.getValueCase()) {
- case META:
- try {
- context.setWriteRequestMeta(value.getMeta());
- } catch (StatusException e) {
- responseObserver.onError(e);
- }
- break;
- case PROTOCOL_HEADER:
- contentBuffer = context.getRecordData(value.getProtocolHeader().getRecordNum()).getContentBuffer();
- if (contentBuffer.hasHeader()) {
- LOG.error("Header received twice");
- Status status = Status.INVALID_ARGUMENT.withDescription("Header received twice");
- responseObserver.onError(status.asException());
- break;
- }
- contentBuffer.setHeader(value.getProtocolHeader().getData());
- break;
- case PAYLOAD:
- contentBuffer = context.getRecordData(value.getPayload().getRecordNum()).getContentBuffer();
- contentBuffer.addPayload(value.getPayload().getData());
- break;
- case CANCEL:
- context.cancelSession(value.getCancel());
- break;
- default:
- break;
- }
- } catch (Exception ex) {
- Status status = Status.UNKNOWN.withDescription(ex.toString());
- LOG.error(ex.getMessage(), ex);
- responseObserver.onError(status.asException());
- }
- }
-
- @Override
- public void onError(Throwable t) {
- context.initMDC();
- LOG.error("Error caught: {}", t.getMessage(), t);
- context.cancelSession(t.getMessage());
- }
-
- @Override
- public void onCompleted() {
- context.initMDC();
- if (context.isCanceled()) {
- responseObserver.onNext(WriteReply.getDefaultInstance());
- responseObserver.onCompleted();
- return;
- }
-
- if (!context.hasWriteRequestMeta()) {
- LOG.error("Missing metadata object");
- Status status = Status.INVALID_ARGUMENT.withDescription("Missing metadata object");
- responseObserver.onError(status.asException());
- return;
- }
-
- WriteReply.Builder reply = WriteReply.newBuilder();
- try {
- context.validateSession();
- } catch (StatusException e) {
- responseObserver.onError(e);
- return;
- } catch (Exception ex) {
- Status status = Status.UNKNOWN.withDescription(ex.toString());
- LOG.error(ex.getMessage(), ex);
- responseObserver.onError(status.asException());
- return;
- }
-
- WarcCollection collection = warcCollectionRegistry.getWarcCollection(context.getCollectionConfig());
- try (Instance warcWriters = collection.getWarcWriters()) {
- for (Integer recordNum : context.getRecordNums()) {
- try (RecordData recordData = context.getRecordData(recordNum)) {
- context.detectRevisit(recordNum, collection);
-
- URI ref = warcWriters.getWarcWriter(recordData.getSubCollectionType()).writeRecord(recordData);
-
- WriteResponseMeta.RecordMeta.Builder responseMeta = WriteResponseMeta.RecordMeta.newBuilder()
- .setRecordNum(recordNum)
- .setType(recordData.getRecordType())
- .setWarcId(recordData.getWarcId())
- .setStorageRef(ref.toString())
- .setBlockDigest(recordData.getContentBuffer().getBlockDigest())
- .setPayloadDigest(recordData.getContentBuffer().getPayloadDigest())
- .setCollectionFinalName(collection.getCollectionName(recordData.getSubCollectionType()));
- if (recordData.getRevisitRef() != null) {
- responseMeta.setRevisitReferenceId(recordData.getRevisitRef().getWarcId());
- }
-
- reply.getMetaBuilder().putRecordMeta(responseMeta.getRecordNum(), responseMeta.build());
- } catch (IOException ex) {
- Status status = Status.UNKNOWN.withDescription(ex.toString());
- LOG.error("Failed write: {}", ex.getMessage(), ex);
- responseObserver.onError(status.asException());
- } catch (SingleWarcWriter.SizeMismatchException ex) {
- Status status = Status.OUT_OF_RANGE.withDescription(ex.getMessage());
- LOG.error(status.getDescription());
- throw status.asException();
- } catch (Exception ex) {
- LOG.error("Failed write: {}", ex.getMessage(), ex);
- responseObserver.onError(Status.fromThrowable(ex).asException());
- }
- }
- responseObserver.onNext(reply.build());
- responseObserver.onCompleted();
- } catch (Exception ex) {
- Status status = Status.UNKNOWN.withDescription(ex.toString());
- LOG.error(ex.getMessage(), ex);
- responseObserver.onError(status.asException());
- }
- }
-
- };
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/Main.java b/src/main/java/no/nb/nna/veidemann/contentwriter/Main.java
deleted file mode 100644
index c7b3d4b..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/Main.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-/**
- * Main class for launching the service.
- */
-public final class Main {
-
- /**
- * Private constructor to avoid instantiation.
- */
- private Main() {
- }
-
- /**
- * Start the server.
- *
- * @param args the command line arguments
- */
- public static void main(String[] args) {
- // This class intentionally doesn't do anything except for instanciating a ResourceResolverServer.
- // This is necessary to be able to replace the LogManager. The system property must be set before any other
- // logging is even loaded.
- System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager");
-
- new ContentWriter().start();
- }
-
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/Util.java b/src/main/java/no/nb/nna/veidemann/contentwriter/Util.java
deleted file mode 100644
index ec59d1a..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/Util.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package no.nb.nna.veidemann.contentwriter;
-
-import no.nb.nna.veidemann.api.contentwriter.v1.RecordType;
-
-import java.util.UUID;
-
-public class Util {
- private Util() {
- }
-
- public static String createIdentifier() {
- return UUID.randomUUID().toString();
- }
-
- public static String formatIdentifierAsUrn(String id) {
- return "";
- }
-
- public static String getRecordTypeString(RecordType recordType) {
- return recordType.name().toLowerCase();
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/WriteSessionContext.java b/src/main/java/no/nb/nna/veidemann/contentwriter/WriteSessionContext.java
deleted file mode 100644
index a45ebf1..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/WriteSessionContext.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package no.nb.nna.veidemann.contentwriter;
-
-import com.google.protobuf.Timestamp;
-import io.grpc.Status;
-import io.grpc.StatusException;
-import no.nb.nna.veidemann.api.config.v1.Collection.SubCollectionType;
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.api.contentwriter.v1.CrawledContent;
-import no.nb.nna.veidemann.api.contentwriter.v1.RecordType;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequestMeta;
-import no.nb.nna.veidemann.commons.db.ConfigAdapter;
-import no.nb.nna.veidemann.commons.db.ExecutionsAdapter;
-import no.nb.nna.veidemann.commons.db.DbException;
-import no.nb.nna.veidemann.commons.db.DbService;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-public class WriteSessionContext {
- private static final Logger LOG = LoggerFactory.getLogger(WriteSessionContext.class);
- private static final ConfigAdapter config = DbService.getInstance().getConfigAdapter();
- private static final ExecutionsAdapter dbAdapter = DbService.getInstance().getExecutionsAdapter();
-
- // private final Map contentBuffers = new HashMap<>();
- final Map recordDataMap = new HashMap<>();
-
- WriteRequestMeta.Builder writeRequestMeta;
- ConfigObject collectionConfig;
- private boolean canceled = false;
-
- public RecordData getRecordData(Integer recordNum) {
- return recordDataMap.computeIfAbsent(recordNum, RecordData::new);
- }
-
- public void initMDC() {
- if (writeRequestMeta != null) {
- MDC.put("eid", writeRequestMeta.getExecutionId());
- MDC.put("uri", writeRequestMeta.getTargetUri());
- }
- }
-
- public void setWriteRequestMeta(WriteRequestMeta writeRequestMeta) throws StatusException {
- this.writeRequestMeta = writeRequestMeta.toBuilder();
- initMDC();
-
- try {
- if (!writeRequestMeta.hasCollectionRef()) {
- String msg = "No collection id in request";
- LOG.error(msg);
- Status status = Status.INVALID_ARGUMENT.withDescription(msg);
- throw status.asException();
- } else {
- collectionConfig = config.getConfigObject(writeRequestMeta.getCollectionRef());
- if (collectionConfig == null || !collectionConfig.hasMeta() || !collectionConfig.hasCollection()) {
- String msg = "Collection with id '" + writeRequestMeta.getCollectionRef() + "' is missing or insufficient: " + collectionConfig;
- LOG.error(msg);
- Status status = Status.UNKNOWN.withDescription(msg);
- throw status.asException();
- }
- }
- } catch (Exception e) {
- String msg = "Error getting collection config " + writeRequestMeta.getCollectionRef();
- LOG.error(msg, e);
- Status status = Status.UNKNOWN.withDescription(msg);
- throw status.asException();
- }
- }
-
- public boolean hasWriteRequestMeta() {
- return writeRequestMeta != null;
- }
-
- public ConfigObject getCollectionConfig() {
- return collectionConfig;
- }
-
- public void validateSession() throws StatusException {
- for (Entry recordEntry : recordDataMap.entrySet()) {
- ContentBuffer contentBuffer = recordEntry.getValue().getContentBuffer();
- WriteRequestMeta.RecordMeta recordMeta = writeRequestMeta.getRecordMetaOrDefault(recordEntry.getKey(), null);
- if (recordMeta == null) {
- throw Status.INVALID_ARGUMENT.withDescription("Missing metadata for record num: " + recordEntry.getKey()).asException();
- }
-
- if (contentBuffer.getTotalSize() == 0L) {
- LOG.error("Nothing to store");
- throw Status.INVALID_ARGUMENT.withDescription("Nothing to store").asException();
- }
-
- if (contentBuffer.getTotalSize() != recordMeta.getSize()) {
- LOG.error("Size mismatch. Expected {}, but was {}",
- recordMeta.getSize(), contentBuffer.getTotalSize());
- throw Status.INVALID_ARGUMENT.withDescription("Size mismatch").asException();
- }
-
- if (!contentBuffer.getBlockDigest().equals(recordMeta.getBlockDigest())) {
- LOG.error("Block digest mismatch. Expected {}, but was {}",
- recordMeta.getBlockDigest(), contentBuffer.getBlockDigest());
- throw Status.INVALID_ARGUMENT.withDescription("Block digest mismatch").asException();
- }
-
- if (writeRequestMeta.getIpAddress().isEmpty()) {
- LOG.error("Missing IP-address");
- throw Status.INVALID_ARGUMENT.withDescription("Missing IP-address").asException();
- }
- }
- }
-
- public Set getRecordNums() {
- return writeRequestMeta.getRecordMetaMap().keySet();
- }
-
- public void detectRevisit(final Integer recordNum, final WarcCollection collection) {
- RecordData rd = getRecordData(recordNum);
- if (rd.getRecordType() == RecordType.RESPONSE || rd.getRecordType() == RecordType.RESOURCE) {
- Optional isDuplicate = Optional.empty();
- try {
- String digest = rd.getContentBuffer().getPayloadDigest();
- if (digest == null || digest.isEmpty()) {
- digest = rd.getContentBuffer().getBlockDigest();
- }
- CrawledContent cr = CrawledContent.newBuilder()
- .setDigest(digest + ":" + collection.getCollectionName(rd.getSubCollectionType()))
- .setWarcId(rd.getWarcId())
- .setTargetUri(writeRequestMeta.getTargetUri())
- .setDate(writeRequestMeta.getFetchTimeStamp())
- .build();
- isDuplicate = dbAdapter
- .hasCrawledContent(cr);
- } catch (DbException e) {
- LOG.error("Failed checking for revisit, treating as new object", e);
- }
-
- if (isDuplicate.isPresent()) {
- CrawledContent cc = isDuplicate.get();
- LOG.debug("Detected {} as a revisit of {}",
- MDC.get("uri"), cc.getWarcId());
-
- WriteRequestMeta.RecordMeta newRecordMeta = rd.getRecordMeta().toBuilder()
- .setType(RecordType.REVISIT)
- .setBlockDigest(rd.getContentBuffer().getHeaderDigest())
- .setPayloadDigest(rd.getContentBuffer().getPayloadDigest())
- .setSize(rd.getContentBuffer().getHeaderSize())
- .build();
- writeRequestMeta.putRecordMeta(recordNum, newRecordMeta);
-
- rd.getContentBuffer().removePayload();
-
- rd.revisitRef = cc;
- }
- }
-
- if (rd.revisitRef == null) {
- WriteRequestMeta.RecordMeta newRecordMeta = rd.getRecordMeta().toBuilder()
- .setBlockDigest(rd.getContentBuffer().getBlockDigest())
- .setPayloadDigest(rd.getContentBuffer().getPayloadDigest())
- .build();
- writeRequestMeta.putRecordMeta(recordNum, newRecordMeta);
- }
- }
-
- public boolean isCanceled() {
- return canceled;
- }
-
- public void cancelSession(String cancelReason) {
- canceled = true;
- LOG.debug("Request cancelled before WARC record written. Reason {}", cancelReason);
- for (RecordData cb : recordDataMap.values()) {
- cb.close();
- }
- }
-
- public class RecordData implements AutoCloseable {
- private final Integer recordNum;
- private final ContentBuffer contentBuffer = new ContentBuffer();
- private CrawledContent revisitRef;
-
- public RecordData(Integer recordNum) {
- this.recordNum = recordNum;
- }
-
- public ContentBuffer getContentBuffer() {
- return contentBuffer;
- }
-
- public WriteRequestMeta.RecordMeta getRecordMeta() {
- return writeRequestMeta.getRecordMetaOrThrow(recordNum);
- }
-
- public CrawledContent getRevisitRef() {
- return revisitRef;
- }
-
- public String getWarcId() {
- return contentBuffer.getWarcId();
- }
-
- public RecordType getRecordType() {
- return getRecordMeta().getType();
- }
-
- public SubCollectionType getSubCollectionType() {
- return getRecordMeta().getSubCollection();
- }
-
- public String getTargetUri() {
- return writeRequestMeta.getTargetUri();
- }
-
- public Timestamp getFetchTimeStamp() {
- return writeRequestMeta.getFetchTimeStamp();
- }
-
- public String getIpAddress() {
- return writeRequestMeta.getIpAddress();
- }
-
- public List getWarcConcurrentToIds() {
- List ids = recordDataMap.values().stream()
- .map(cb -> cb.getContentBuffer().getWarcId())
- .collect(Collectors.toList());
- ids.addAll(getRecordMeta().getWarcConcurrentToList());
- return ids;
- }
-
- public void close() {
- contentBuffer.close();
- }
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/settings/Settings.java b/src/main/java/no/nb/nna/veidemann/contentwriter/settings/Settings.java
deleted file mode 100644
index 0a9b6ff..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/settings/Settings.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter.settings;
-
-import no.nb.nna.veidemann.commons.settings.CommonSettings;
-
-/**
- * Configuration settings for Veidemann Content Writer.
- */
-public class Settings extends CommonSettings {
-
- private int apiPort;
-
- private String hostName;
-
- private String warcDir;
-
- private int warcWriterPoolSize;
-
- private String workDir;
-
- private int terminationGracePeriodSeconds;
-
- public int getApiPort() {
- return apiPort;
- }
-
- public void setApiPort(int apiPort) {
- this.apiPort = apiPort;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public String getWarcDir() {
- return warcDir;
- }
-
- public void setWarcDir(String warcDir) {
- this.warcDir = warcDir;
- }
-
- public int getWarcWriterPoolSize() {
- return warcWriterPoolSize;
- }
-
- public void setWarcWriterPoolSize(int warcWriterPoolSize) {
- this.warcWriterPoolSize = warcWriterPoolSize;
- }
-
- public String getWorkDir() {
- return workDir;
- }
-
- public void setWorkDir(String workDir) {
- this.workDir = workDir;
- }
-
- public int getTerminationGracePeriodSeconds() {
- return terminationGracePeriodSeconds;
- }
-
- public void setTerminationGracePeriodSeconds(int terminationGracePeriodSeconds) {
- this.terminationGracePeriodSeconds = terminationGracePeriodSeconds;
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/SingleWarcWriter.java b/src/main/java/no/nb/nna/veidemann/contentwriter/warc/SingleWarcWriter.java
deleted file mode 100644
index c1f86c7..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/SingleWarcWriter.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import no.nb.nna.veidemann.api.config.v1.Collection.SubCollection;
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequestMeta.RecordMeta;
-import no.nb.nna.veidemann.commons.util.Sha1Digest;
-import no.nb.nna.veidemann.contentwriter.ContentBuffer;
-import no.nb.nna.veidemann.contentwriter.Util;
-import no.nb.nna.veidemann.contentwriter.WriteSessionContext.RecordData;
-import no.nb.nna.veidemann.db.ProtoUtils;
-import org.jwat.warc.WarcFileWriter;
-import org.jwat.warc.WarcFileWriterConfig;
-import org.jwat.warc.WarcRecord;
-import org.jwat.warc.WarcWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
-import static io.netty.handler.codec.http.HttpConstants.CR;
-import static io.netty.handler.codec.http.HttpConstants.LF;
-import static org.jwat.warc.WarcConstants.*;
-
-/**
- *
- */
-public class SingleWarcWriter implements AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SingleWarcWriter.class);
-
- static final byte[] CRLF = {CR, LF};
- static final String WARC_FILE_SCHEME = "warcfile";
-
- final WarcFileWriter warcFileWriter;
- final VeidemannWarcFileNaming warcFileNaming;
- final ConfigObject config;
- final SubCollection subCollection;
-
- public SingleWarcWriter(ConfigObject config, SubCollection subCollection, String filePrefix, File targetDir, String hostName) {
- this.config = config;
- this.subCollection = subCollection;
- warcFileNaming = new VeidemannWarcFileNaming(filePrefix, hostName);
- WarcFileWriterConfig writerConfig = new WarcFileWriterConfig(targetDir, config.getCollection().getCompress(),
- config.getCollection().getFileSize(), false);
- warcFileWriter = WarcFileWriter.getWarcWriterInstance(warcFileNaming, writerConfig);
- }
-
- public URI writeRecord(final RecordData recordData) throws IOException, SizeMismatchException {
- ContentBuffer contentBuffer = recordData.getContentBuffer();
- long size = 0L;
- boolean newFile;
-
- try {
- newFile = warcFileWriter.nextWriter();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- File currentFile = warcFileWriter.getFile();
- String finalFileName = currentFile.getName().substring(0, currentFile.getName().length() - 5);
-
- if (newFile) {
- writeFileDescriptionRecords(finalFileName);
- }
-
- writeWarcHeader(recordData);
-
- if (contentBuffer.hasHeader()) {
- size += addPayload(contentBuffer.getHeader().newInput());
- }
-
- if (contentBuffer.hasPayload()) {
- // If both headers and payload are present, add separator
- if (contentBuffer.hasHeader()) {
- size += addPayload(CRLF);
- }
- long payloadSize = addPayload(contentBuffer.getPayload().newInput());
-
- LOG.debug("Payload of size {}b written for {}", payloadSize, recordData.getTargetUri());
- size += payloadSize;
- }
-
- try {
- closeRecord();
- } catch (IllegalStateException e) {
- throw new SizeMismatchException(e.getMessage());
- } catch (IOException ex) {
- if (recordData.getRecordMeta().getSize() != size) {
- SizeMismatchException sizeMismatchException = new SizeMismatchException(recordData.getRecordMeta().getSize(), size);
- sizeMismatchException.initCause(ex);
- throw sizeMismatchException;
- } else {
- throw ex;
- }
- }
- try {
- return new URI(WARC_FILE_SCHEME + ":" + finalFileName + ":" + currentFile.length());
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() throws Exception {
- warcFileWriter.close();
- }
-
- void writeFileDescriptionRecords(String finalFileName) throws IOException {
- WarcWriter writer = warcFileWriter.getWriter();
- WarcRecord record = WarcRecord.createRecord(writer);
- record.header.major = 1;
- record.header.minor = 0;
-
- record.header.addHeader(FN_WARC_TYPE, RT_WARCINFO);
- GregorianCalendar cal = new GregorianCalendar();
- cal.setTimeZone(TimeZone.getTimeZone("UTC"));
- cal.setTimeInMillis(System.currentTimeMillis());
- record.header.addHeader(FN_WARC_DATE, cal.getTime(), null);
- record.header.addHeader(FN_WARC_FILENAME, finalFileName);
- record.header.addHeader(FN_WARC_RECORD_ID, "<" + warcFileWriter.warcinfoRecordId + ">");
- record.header.addHeader(FN_CONTENT_TYPE, "application/warc-fields");
-
- Map payload = new HashMap<>();
- payload.put("isPartOf", warcFileNaming.getFilePrefix());
- payload.put("collection", config.getMeta().getName());
- if (subCollection != null) {
- payload.put("subCollection", subCollection.getName());
- }
- payload.put("host", warcFileNaming.getHostName());
- payload.put("format", "WARC File Format 1.0");
- payload.put("description", config.getMeta().getDescription());
-
- Yaml yaml = new Yaml();
-
- byte[] payloadBytes = yaml.dumpAsMap(payload).getBytes();
- Sha1Digest payloadDigest = new Sha1Digest();
- payloadDigest.update(payloadBytes);
-
- record.header.addHeader(FN_CONTENT_LENGTH, payloadBytes.length, null);
- record.header.addHeader(FN_WARC_BLOCK_DIGEST, payloadDigest.getPrefixedDigestString());
- writer.writeHeader(record);
-
- writer.writePayload(payloadBytes);
- writer.closeRecord();
- }
-
- void writeWarcHeader(final RecordData recordData) throws IOException {
- WarcWriter writer = warcFileWriter.getWriter();
- WarcRecord record = WarcRecord.createRecord(writer);
- record.header.major = 1;
- record.header.minor = 0;
-
- record.header.addHeader(FN_WARC_TYPE, Util.getRecordTypeString(recordData.getRecordType()));
- record.header.addHeader(FN_WARC_TARGET_URI, recordData.getTargetUri());
- Date warcDate = Date.from(ProtoUtils.tsToOdt(recordData.getFetchTimeStamp()).toInstant());
- record.header.addHeader(FN_WARC_DATE, warcDate, null);
- record.header.addHeader(FN_WARC_RECORD_ID, Util.formatIdentifierAsUrn(recordData.getWarcId()));
-
- if (recordData.getRevisitRef() != null) {
- record.header.addHeader(FN_WARC_PROFILE, PROFILE_IDENTICAL_PAYLOAD_DIGEST);
- record.header.addHeader(FN_WARC_REFERS_TO, Util.formatIdentifierAsUrn(recordData.getRevisitRef().getWarcId()));
- if (!recordData.getRevisitRef().getTargetUri().isEmpty() && recordData.getRevisitRef().hasDate()) {
- record.header.addHeader(FN_WARC_REFERS_TO_TARGET_URI,
- recordData.getRevisitRef().getTargetUri());
- record.header.addHeader(FN_WARC_REFERS_TO_DATE,
- Date.from(ProtoUtils.tsToOdt(recordData.getRevisitRef().getDate()).toInstant()), null);
- }
- }
-
- record.header.addHeader(FN_WARC_IP_ADDRESS, recordData.getIpAddress());
- record.header.addHeader(FN_WARC_WARCINFO_ID, "<" + warcFileWriter.warcinfoRecordId + ">");
-
- RecordMeta recordMeta = recordData.getRecordMeta();
- record.header.addHeader(FN_WARC_BLOCK_DIGEST, recordMeta.getBlockDigest());
- if (!recordMeta.getPayloadDigest().isEmpty()) {
- record.header.addHeader(FN_WARC_PAYLOAD_DIGEST, recordMeta.getPayloadDigest());
- }
-
- record.header.addHeader(FN_CONTENT_LENGTH, recordMeta.getSize(), null);
-
- if (!recordMeta.getRecordContentType().isEmpty()) {
- record.header.addHeader(FN_CONTENT_TYPE, recordMeta.getRecordContentType());
- }
-
- for (String otherId : recordData.getWarcConcurrentToIds()) {
- if (!otherId.equals(recordData.getWarcId())) {
- record.header.addHeader(FN_WARC_CONCURRENT_TO, Util.formatIdentifierAsUrn(otherId));
- }
- }
-
- writer.writeHeader(record);
- }
-
- long addPayload(byte[] data) throws UncheckedIOException {
- try {
- return warcFileWriter.getWriter().writePayload(data);
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-
- long addPayload(InputStream data) throws UncheckedIOException {
- try {
- return warcFileWriter.getWriter().streamPayload(data);
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-
- void closeRecord() throws IOException {
- warcFileWriter.getWriter().closeRecord();
- }
-
- public static class SizeMismatchException extends Exception {
- SizeMismatchException(String message) {
- super(message);
- }
-
- SizeMismatchException(long expectedSize, long actualSize) {
- super("Size doesn't match metadata. Expected " + expectedSize + ", but was " + actualSize);
- }
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/VeidemannWarcFileNaming.java b/src/main/java/no/nb/nna/veidemann/contentwriter/warc/VeidemannWarcFileNaming.java
deleted file mode 100644
index 20b268f..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/VeidemannWarcFileNaming.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import org.jwat.warc.WarcFileNaming;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class VeidemannWarcFileNaming implements WarcFileNaming {
-
- /**
- * DateFormat
to the following format 'yyyyMMddHHmmss'.
- */
- protected final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-
- /**
- * Prefix component.
- */
- protected final String filePrefix;
-
- /**
- * Host name component.
- */
- protected final String hostName;
-
- /**
- * Extension component (including leading ".").
- */
- protected final String extension;
-
- protected final static AtomicInteger sequenceNumber = new AtomicInteger(0);
-
- /**
- * Construct file naming instance.
- *
- * @param filePrefix prefix or null, will default to "Veidemann"
- * @param hostName host name or null, if you want to use default local host name
- */
- public VeidemannWarcFileNaming(String filePrefix, String hostName) {
- this.filePrefix = Objects.requireNonNullElse(filePrefix, "Veidemann");
- this.hostName = hostName;
- extension = ".warc";
- }
-
- @Override
- public boolean supportMultipleFiles() {
- return true;
- }
-
- @Override
- public String getFilename(int sequenceNr, boolean bCompressed) {
- String dateStr = dateFormat.format(new Date());
-
- String filename = filePrefix + "-" + dateStr
- + "-" + hostName.replace("-", "_")
- + "-" + String.format("%05d", sequenceNumber.getAndIncrement()) + extension;
- if (bCompressed) {
- filename += ".gz";
- }
- return filename;
- }
-
- public String getFilePrefix() {
- return filePrefix;
- }
-
- public String getHostName() {
- return hostName;
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollection.java b/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollection.java
deleted file mode 100644
index 9f7e4b5..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollection.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import no.nb.nna.veidemann.api.config.v1.Collection.RotationPolicy;
-import no.nb.nna.veidemann.api.config.v1.Collection.SubCollection;
-import no.nb.nna.veidemann.api.config.v1.Collection.SubCollectionType;
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.commons.util.Pool.Lease;
-import no.nb.nna.veidemann.contentwriter.ContentWriter;
-import no.nb.nna.veidemann.contentwriter.settings.Settings;
-import no.nb.nna.veidemann.db.ProtoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.OffsetDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.EnumMap;
-import java.util.Map;
-
-public class WarcCollection implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(WarcCollection.class);
-
- static final DateTimeFormatter HOUR_FORMAT = DateTimeFormatter.ofPattern("YYYYMMddHH");
- static final DateTimeFormatter DAY_FORMAT = DateTimeFormatter.ofPattern("YYYYMMdd");
- static final DateTimeFormatter MONTH_FORMAT = DateTimeFormatter.ofPattern("YYYYMM");
- static final DateTimeFormatter YEAR_FORMAT = DateTimeFormatter.ofPattern("YYYY");
-
- final ConfigObject config;
- final WarcWriterPool warcWriterPool;
- final Map subCollections;
- final String filePrefix;
- final String currentFileRotationKey;
- final Settings settings = ContentWriter.getSettings();
-
- public WarcCollection(ConfigObject config) {
-
- this.config = config;
- filePrefix = createFilePrefix(ProtoUtils.getNowOdt());
- currentFileRotationKey = createFileRotationKey(config.getCollection().getFileRotationPolicy(), ProtoUtils.getNowOdt());
-
- this.warcWriterPool = new WarcWriterPool(
- config,
- null,
- filePrefix,
- new File(settings.getWarcDir()),
- config.getCollection().getFileSize(),
- config.getCollection().getCompress(),
- settings.getWarcWriterPoolSize(),
- settings.getHostName());
-
- subCollections = new EnumMap<>(SubCollectionType.class);
- for (SubCollection sub : config.getCollection().getSubCollectionsList()) {
- subCollections.put(sub.getType(), new WarcWriterPool(
- config,
- sub,
- filePrefix + "_" + sub.getName(),
- new File(settings.getWarcDir()),
- config.getCollection().getFileSize(),
- config.getCollection().getCompress(),
- settings.getWarcWriterPoolSize(),
- settings.getHostName()));
- }
- }
-
- public Instance getWarcWriters() {
- return new Instance();
- }
-
- public String getCollectionName(SubCollectionType subType) {
- return subCollections.getOrDefault(subType, warcWriterPool).getName();
- }
-
- public boolean shouldFlushFiles(ConfigObject config, OffsetDateTime timestamp) {
- if (!createFileRotationKey(config.getCollection().getFileRotationPolicy(), timestamp)
- .equals(currentFileRotationKey)) {
- return true;
- }
-
- if (!createFilePrefix(timestamp).equals(filePrefix)) {
- return true;
- }
-
- if (config == this.config) {
- return false;
- } else {
- ConfigObject c = this.config;
- ConfigObject other = config;
- boolean isEqual = true;
- isEqual = isEqual && c.hasMeta() == other.hasMeta();
- isEqual = isEqual && c.getMeta().getName().equals(other.getMeta().getName());
- isEqual = isEqual && c.getMeta().getDescription().equals(other.getMeta().getDescription());
-
- isEqual = isEqual && c.getCollection().getCollectionDedupPolicy() == other.getCollection().getCollectionDedupPolicy();
- isEqual = isEqual && c.getCollection().getFileRotationPolicy() == other.getCollection().getFileRotationPolicy();
- isEqual = isEqual && c.getCollection().getCompress() == other.getCollection().getCompress();
- isEqual = isEqual && c.getCollection().getFileSize() == other.getCollection().getFileSize();
- isEqual = isEqual && c.getCollection().getSubCollectionsList().equals(other.getCollection().getSubCollectionsList());
-
- return !isEqual;
- }
- }
-
- String createFilePrefix(OffsetDateTime timestamp) {
- String name = config.getMeta().getName();
- String dedupRotationKey = createFileRotationKey(config.getCollection().getCollectionDedupPolicy(), timestamp);
- if (dedupRotationKey.isEmpty()) {
- return name;
- } else {
- return name + "_" + dedupRotationKey;
- }
- }
-
- String createFileRotationKey(RotationPolicy fileRotationPolicy, OffsetDateTime timestamp) {
- switch (fileRotationPolicy) {
- case HOURLY:
- return timestamp.format(HOUR_FORMAT);
- case DAILY:
- return timestamp.format(DAY_FORMAT);
- case MONTHLY:
- return timestamp.format(MONTH_FORMAT);
- case YEARLY:
- return timestamp.format(YEAR_FORMAT);
- default:
- return "";
- }
- }
-
- @Override
- public void close() {
- try {
- warcWriterPool.close();
- } catch (InterruptedException e) {
- LOG.error("Failed closing collection " + warcWriterPool.getName(), e);
- }
- for (WarcWriterPool sub : subCollections.values()) {
- try {
- sub.close();
- } catch (InterruptedException e) {
- LOG.error("Failed closing collection " + sub.getName(), e);
- }
- }
- }
-
- public void deleteFiles() throws IOException {
- Path dir = Paths.get(settings.getWarcDir());
- try (DirectoryStream stream = Files.newDirectoryStream(dir, warcWriterPool.getName() + "*.warc*")) {
- for (Path path : stream) {
- LOG.info("Deleting " + path);
- Files.delete(path);
- }
- }
- }
-
- public class Instance implements AutoCloseable {
- Lease warcWriterLease;
- final Map> subCollectionWarcWriterLeases =
- new EnumMap<>(SubCollectionType.class);
-
- public SingleWarcWriter getWarcWriter(SubCollectionType subType) {
- if (subCollections.containsKey(subType)) {
- Lease sub = subCollectionWarcWriterLeases.computeIfAbsent(subType, k -> {
- try {
- return subCollections.get(k).lease();
- } catch (InterruptedException e) {
- LOG.error("Can't get WarcWriter", e);
- throw new RuntimeException(e);
- }
- });
- return sub.getObject();
- } else {
- if (warcWriterLease == null) {
- try {
- warcWriterLease = warcWriterPool.lease();
- } catch (InterruptedException e) {
- LOG.error("Can't get WarcWriter", e);
- throw new RuntimeException(e);
- }
- }
- return warcWriterLease.getObject();
- }
- }
-
- @Override
- public void close() {
- if (warcWriterLease != null) {
- warcWriterLease.close();
- }
- for (Lease sub : subCollectionWarcWriterLeases.values()) {
- sub.close();
- }
- }
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollectionRegistry.java b/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollectionRegistry.java
deleted file mode 100644
index 6b1a2a7..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollectionRegistry.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.db.ProtoUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class WarcCollectionRegistry implements AutoCloseable {
- private final Map collections = new HashMap<>();
-
- public WarcCollection getWarcCollection(ConfigObject config) {
- WarcCollection c = collections.get(config.getId());
- if (c == null) {
- c = new WarcCollection(config);
- collections.put(config.getId(), c);
- } else if (c.shouldFlushFiles(config, ProtoUtils.getNowOdt())) {
- c.close();
- c = new WarcCollection(config);
- collections.put(config.getId(), c);
- }
- return c;
- }
-
- @Override
- public void close() {
- for (Iterator> it = collections.entrySet().iterator(); it.hasNext(); ) {
- it.next().getValue().close();
- it.remove();
- }
- }
-
- public void deleteFiles(ConfigObject config) throws IOException {
- close();
- WarcCollection c = new WarcCollection(config);
- c.deleteFiles();
- c.close();
- }
-}
diff --git a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcWriterPool.java b/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcWriterPool.java
deleted file mode 100644
index 21d382e..0000000
--- a/src/main/java/no/nb/nna/veidemann/contentwriter/warc/WarcWriterPool.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import no.nb.nna.veidemann.api.config.v1.Collection.SubCollection;
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.commons.util.Pool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-/**
- *
- */
-public class WarcWriterPool extends Pool {
- private static final Logger LOG = LoggerFactory.getLogger(WarcWriterPool.class);
-
- private final String name;
-
- /**
- * Creates the pool.
- *
- *
- * @param poolSize maximum number of writers residing in the pool
- */
- public WarcWriterPool(final ConfigObject config, SubCollection subCollection, final String name, final File targetDir, final long maxFileSize,
- final boolean compress, final int poolSize, final String hostName) {
-
- super(poolSize,
- () -> new SingleWarcWriter(config, subCollection, name, targetDir, hostName),
- null,
- singleWarcWriter -> {
- try {
- singleWarcWriter.close();
- } catch (Exception e) {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- System.err.println("Failed closing collection " + name + ": " + e.getLocalizedMessage());
- }
- });
-
- this.name = name;
- targetDir.mkdirs();
- }
-
- public String getName() {
- return name;
- }
-}
diff --git a/src/main/jib/app/LICENSE.txt b/src/main/jib/app/LICENSE.txt
deleted file mode 100644
index a3b8182..0000000
--- a/src/main/jib/app/LICENSE.txt
+++ /dev/null
@@ -1,14 +0,0 @@
-
-Copyright 2019 National Library of Norway.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
diff --git a/src/main/jib/app/resources/application.conf b/src/main/jib/app/resources/application.conf
deleted file mode 100644
index 88996cf..0000000
--- a/src/main/jib/app/resources/application.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-logTraffic=false
-
-# The port the server listens to.
-apiPort=8080
-apiPort=${?API_PORT}
-
-# Where to put WARC files
-warcDir=".";
-warcDir=${?WARC_DIR}
-
-warcWriterPoolSize=2
-warcWriterPoolSize=${?WARC_WRITER_POOL_SIZE}
-
-# Where to put temporary files
-workDir="."
-workDir=${?WORK_DIR}
-
-# Regular expression matching url's which are allowed to do cross origin resource requests
-corsAllowedOriginPattern=""
-
-hostName="unknown"
-hostName=${?HOST_NAME}
-
-terminationGracePeriodSeconds=60
-terminationGracePeriodSeconds=${?TERMINATION_GRACE_PERIOD_SECONDS}
diff --git a/src/main/jib/app/resources/log4j2.xml b/src/main/jib/app/resources/log4j2.xml
deleted file mode 100644
index 0526386..0000000
--- a/src/main/jib/app/resources/log4j2.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/ContentWriterServiceTestIT.java b/src/test/java/no/nb/nna/veidemann/contentwriter/ContentWriterServiceTestIT.java
deleted file mode 100644
index 0becc77..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/ContentWriterServiceTestIT.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package no.nb.nna.veidemann.contentwriter;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.util.Timestamps;
-import com.rethinkdb.RethinkDB;
-import io.grpc.StatusException;
-import no.nb.nna.veidemann.api.config.v1.Collection.SubCollectionType;
-import no.nb.nna.veidemann.api.config.v1.ConfigRef;
-import no.nb.nna.veidemann.api.config.v1.Kind;
-import no.nb.nna.veidemann.api.contentwriter.v1.Data;
-import no.nb.nna.veidemann.api.contentwriter.v1.RecordType;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequestMeta;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequestMeta.RecordMeta;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteResponseMeta;
-import no.nb.nna.veidemann.commons.client.ContentWriterClient;
-import no.nb.nna.veidemann.commons.client.ContentWriterClient.ContentWriterSession;
-import no.nb.nna.veidemann.commons.db.ConfigAdapter;
-import no.nb.nna.veidemann.commons.db.DbConnectionException;
-import no.nb.nna.veidemann.commons.db.DbException;
-import no.nb.nna.veidemann.commons.db.DbService;
-import no.nb.nna.veidemann.commons.settings.CommonSettings;
-import no.nb.nna.veidemann.commons.util.Sha1Digest;
-import org.assertj.core.api.AbstractAssert;
-import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.jwat.warc.WarcRecord;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.text.ParseException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.jwat.warc.WarcConstants.*;
-
-public class ContentWriterServiceTestIT {
- static ContentWriterClient contentWriterClient;
-
- static ConfigAdapter db;
-
- static RethinkDB r = RethinkDB.r;
-
- @BeforeClass
- public static void init() throws DbConnectionException {
- String contentWriterHost = System.getProperty("contentwriter.host");
- int contentWriterPort = Integer.parseInt(System.getProperty("contentwriter.port"));
- String dbHost = System.getProperty("db.host");
- int dbPort = Integer.parseInt(System.getProperty("db.port"));
- System.out.println("Database address: " + dbHost + ":" + dbPort);
-
- contentWriterClient = new ContentWriterClient(contentWriterHost, contentWriterPort);
-
- if (!DbService.isConfigured()) {
- CommonSettings dbSettings = new CommonSettings()
- .withDbHost(dbHost)
- .withDbPort(dbPort)
- .withDbName("veidemann")
- .withDbUser("admin")
- .withDbPassword("");
- DbService.configure(dbSettings);
- }
- db = DbService.getInstance().getConfigAdapter();
- }
-
- @AfterClass
- public static void shutdown() {
- contentWriterClient.close();
- }
-
- @Test
- public void write() throws StatusException, InterruptedException, DbException, ParseException {
- Map responses = new HashMap<>();
- assertThatCode(() -> {
- responses.put("http", writeHttpRecords());
- }).doesNotThrowAnyException();
- WriteResponseMeta httpResponseMeta = responses.get("http");
- String httpResponseWarcId = httpResponseMeta.getRecordMetaOrThrow(1).getWarcId();
-
- assertThatCode(() -> {
- responses.put("screenshot", writeScreenshotRecord(httpResponseWarcId));
- }).doesNotThrowAnyException();
- WriteResponseMeta screenshotResponseMeta = responses.get("screenshot");
-
- writeHttpRecords();
- writeHttpRecords();
- writeDnsRecord();
-
- WarcFileSet wfs = WarcInspector.getWarcFiles();
- wfs.listFiles().forEach(wf -> {
- System.out.println(wf.getName());
- try (Stream stream = wf.getContent()) {
- stream.forEach(r -> {
- System.out.println(" - " + r.header.warcRecordIdStr + " " + r.header.versionStr + " " + r.header.warcTypeStr);
- });
- }
- });
-
- wfs.listFiles().forEach(wf -> {
- assertThat(wf.getContent()).allSatisfy(r -> {
- MyProjectAssertions.assertThat(r)
- .hasVersion(1, 0)
- .hasValidHeaders();
- });
- });
-
- wfs.listFiles().forEach(wf -> {
- try (Stream stream = wf.getContent()) {
- String fileName = wf.getName();
- stream.filter(r -> r.header.warcTypeStr.equals(RT_WARCINFO)).forEach(r -> {
- try (Stream lines = new BufferedReader(new InputStreamReader(r.getPayloadContent())).lines()) {
- assertThat(lines.filter(l -> l.startsWith("host: ")).map(l -> l.replace("host: ", "")))
- .allSatisfy(hostName -> {
- assertThat(fileName.contains(hostName)).isFalse();
- assertThat(fileName).contains(hostName.replace("-", "_"));
- });
- }
- });
- }
- });
- }
-
- private WriteResponseMeta writeHttpRecords() throws ParseException, StatusException, InterruptedException {
- ContentWriterSession session = contentWriterClient.createSession();
-
- Sha1Digest requestBlockDigest = new Sha1Digest();
- Sha1Digest responseBlockDigest = new Sha1Digest();
- Sha1Digest responsePayloadDigest = new Sha1Digest();
-
- ByteString requestHeaderData = ByteString.copyFromUtf8("GET /images/logoc.jpg HTTP/1.0\n" +
- "User-Agent: Mozilla/5.0 (compatible; heritrix/1.10.0)\n" +
- "From: stack@example.org\n" +
- "Connection: close\n" +
- "Referer: http://www.archive.org/\n" +
- "Host: www.archive.org\n" +
- "Cookie: PHPSESSID=009d7bb11022f80605aa87e18224d824\n");
-
- requestBlockDigest.update(requestHeaderData);
-
- ByteString responseHeaderData = ByteString.copyFromUtf8("HTTP/1.1 200 OK\n" +
- "Date: Tue, 19 Sep 2016 17:18:40 GMT\n" +
- "Server: Apache/2.0.54 (Ubuntu)\n" +
- "Last-Modified: Mon, 16 Jun 2013 22:28:51 GMT\n" +
- "ETag: \"3e45-67e-2ed02ec0\"\n" +
- "Accept-Ranges: bytes\n" +
- "Content-Length: 37\n" +
- "Connection: close\n" +
- "Content-Type: text/html\n");
-
- ByteString responsePayloadData = ByteString.copyFromUtf8("test
");
-
- responseBlockDigest.update(responseHeaderData);
- responseBlockDigest.update('\r', '\n');
- responsePayloadDigest.update(responsePayloadData);
- responseBlockDigest.update(responsePayloadData);
-
- RecordMeta requestMeta = RecordMeta.newBuilder()
- .setRecordNum(0)
- .setSize(requestHeaderData.size())
- .setBlockDigest(requestBlockDigest.getPrefixedDigestString())
- .setPayloadDigest("sha1:da39a3ee5e6b4b0d3255bfef95601890afd80709")
- .setType(RecordType.REQUEST)
- .setRecordContentType("application/http; msgtype=request")
- .build();
- RecordMeta responseMeta = RecordMeta.newBuilder()
- .setRecordNum(1)
- .setSize(responseHeaderData.size() + responsePayloadData.size() + 2)
- .setBlockDigest(responseBlockDigest.getPrefixedDigestString())
- .setPayloadDigest(responsePayloadDigest.getPrefixedDigestString())
- .setType(RecordType.RESPONSE)
- .setRecordContentType("application/http; msgtype=response")
- .build();
- WriteRequestMeta meta = WriteRequestMeta.newBuilder()
- .setCollectionRef(ConfigRef.newBuilder().setKind(Kind.collection).setId("2fa23773-d7e1-4748-8ab6-9253e470a3f5"))
- .setIpAddress("127.0.0.1")
- .setTargetUri("http://www.example.com/index.html")
- .setFetchTimeStamp(Timestamps.parse("2016-09-19T17:20:24Z"))
- .putRecordMeta(0, requestMeta)
- .putRecordMeta(1, responseMeta)
- .build();
- session.sendMetadata(meta);
-
- session.sendHeader(Data.newBuilder()
- .setRecordNum(0)
- .setData(requestHeaderData)
- .build());
-
- session.sendHeader(Data.newBuilder()
- .setRecordNum(1)
- .setData(responseHeaderData)
- .build());
-
- session.sendPayload(Data.newBuilder()
- .setRecordNum(1)
- .setData(responsePayloadData)
- .build());
-
- WriteResponseMeta res = null;
- try {
- res = session.finish();
- } catch (Exception e) {
- e.printStackTrace();
- }
- assertThat(session.isOpen()).isFalse();
- return res;
- }
-
- private WriteResponseMeta writeScreenshotRecord(String warcId) throws ParseException, StatusException, InterruptedException {
- ContentWriterSession session = contentWriterClient.createSession();
- assertThat(session.isOpen()).isTrue();
-
- Sha1Digest blockDigest = new Sha1Digest();
- ByteString payloadData = ByteString.copyFromUtf8("binary png");
- blockDigest.update(payloadData);
-
- RecordMeta screenshotMeta = RecordMeta.newBuilder()
- .setRecordNum(0)
- .setSize(payloadData.size())
- .setBlockDigest(blockDigest.getPrefixedDigestString())
- .setType(RecordType.RESOURCE)
- .setRecordContentType("image/png")
- .setSubCollection(SubCollectionType.SCREENSHOT)
- .addWarcConcurrentTo(warcId)
- .build();
- WriteRequestMeta meta = WriteRequestMeta.newBuilder()
- .setCollectionRef(ConfigRef.newBuilder().setKind(Kind.collection).setId("2fa23773-d7e1-4748-8ab6-9253e470a3f5"))
- .setIpAddress("127.0.0.1")
- .setTargetUri("http://www.example.com/index.html")
- .setFetchTimeStamp(Timestamps.parse("2016-09-19T17:20:24Z"))
- .putRecordMeta(0, screenshotMeta)
- .build();
- session.sendMetadata(meta);
-
- Data screenshot = Data.newBuilder()
- .setRecordNum(0)
- .setData(payloadData)
- .build();
- session.sendPayload(screenshot);
-
- WriteResponseMeta res = session.finish();
- assertThat(session.isOpen()).isFalse();
- return res;
- }
-
- private WriteResponseMeta writeDnsRecord() throws ParseException, StatusException, InterruptedException {
- ContentWriterSession session = contentWriterClient.createSession();
- assertThat(session.isOpen()).isTrue();
-
- Sha1Digest blockDigest = new Sha1Digest();
- ByteString payloadData = ByteString.copyFromUtf8("dns record");
- blockDigest.update(payloadData);
-
- RecordMeta dnsMeta = RecordMeta.newBuilder()
- .setRecordNum(0)
- .setType(RecordType.RESOURCE)
- .setRecordContentType("text/dns")
- .setSize(payloadData.size())
- .setBlockDigest(blockDigest.getPrefixedDigestString())
- .setSubCollection(SubCollectionType.DNS)
- .build();
- WriteRequestMeta meta = WriteRequestMeta.newBuilder()
- .setTargetUri("dns:www.example.com")
- .setFetchTimeStamp(Timestamps.parse("2016-09-19T17:20:24Z"))
- .setIpAddress("127.0.0.1")
- .setCollectionRef(ConfigRef.newBuilder().setKind(Kind.collection).setId("2fa23773-d7e1-4748-8ab6-9253e470a3f5"))
- .putRecordMeta(0, dnsMeta)
- .build();
- session.sendMetadata(meta);
-
- Data dns = Data.newBuilder()
- .setRecordNum(0)
- .setData(payloadData)
- .build();
- session.sendPayload(dns);
-
- WriteResponseMeta res = session.finish();
- assertThat(session.isOpen()).isFalse();
- return res;
- }
-
- public static class MyProjectAssertions extends Assertions {
- public static WarcRecordAssert assertThat(WarcRecord actual) {
- return new WarcRecordAssert(actual);
- }
- }
-
- public static class WarcRecordAssert extends AbstractAssert {
- public WarcRecordAssert(WarcRecord actual) {
- super(actual, WarcRecordAssert.class);
- }
-
- public WarcRecordAssert hasVersion(int major, int minor) {
- isNotNull();
- if (actual.header.major != major || actual.header.minor != minor) {
- failWithMessage("Expected WARC version to be <%d.%d> but was <%d.%d>", major, minor, actual.header.major, actual.header.minor);
- }
- return this;
- }
-
- public WarcRecordAssert hasValidHeaders() {
- isNotNull();
- assertThat(actual.header.warcRecordIdStr).as("%s should not be null", FN_WARC_RECORD_ID).isNotEmpty();
- assertThat(actual.header.warcTypeStr).as("%s should not be null", FN_WARC_TYPE).isNotEmpty();
- assertThat(actual.header.warcDateStr).as("%s should not be null", FN_WARC_DATE).isNotEmpty();
- assertThat(actual.header.contentLengthStr).as("%s should not be null", FN_CONTENT_LENGTH).isNotEmpty();
- assertThat(actual.header.contentTypeStr).as("%s should not be null", FN_CONTENT_TYPE).isNotEmpty();
-
- if (!actual.diagnostics.getErrors().isEmpty()) {
- System.out.println("ERRORS: " + actual.diagnostics.getErrors()
- .stream()
- .map(d -> "\n " + d.type.toString() + ":" + d.entity + ":" + Arrays.toString(d.getMessageArgs()))
- .collect(Collectors.joining()));
- actual.getHeaderList().forEach(h -> System.out.print(" W: " + new String(h.raw)));
- }
- if (!actual.diagnostics.getWarnings().isEmpty()) {
- System.out.println("WARNINGS: " + actual.diagnostics.getWarnings()
- .stream()
- .map(d -> "\n " + d.type.toString() + ":" + d.entity + ":" + Arrays.toString(d.getMessageArgs()))
- .collect(Collectors.joining()));
- actual.getHeaderList().forEach(h -> System.out.print(" W: " + new String(h.raw)));
- }
- assertThat(actual.isCompliant()).as("Record of type '%s' is not compliant", actual.header.warcTypeStr).isTrue();
-
- switch (actual.header.warcTypeStr) {
- case RT_CONTINUATION:
- break;
- case RT_CONVERSION:
- break;
- case RT_METADATA:
- break;
- case RT_REQUEST:
- case RT_RESPONSE:
- assertThat(actual.header.warcTargetUriStr)
- .as("%s for record type '%s' should not be null", FN_WARC_TARGET_URI, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcConcurrentToList)
- .as("%s for record type '%s' should not be empty", FN_WARC_CONCURRENT_TO, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcBlockDigestStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcPayloadDigestStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_PAYLOAD_DIGEST, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcIpAddress)
- .as("%s for record type '%s' should not be empty", FN_WARC_IP_ADDRESS, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcWarcinfoIdStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_WARCINFO_ID, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.isValidBlockDigest)
- .as("%s for record type '%s' doesn't validate", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isTrue();
- assertThat(actual.isValidPayloadDigest)
- .as("%s for record type '%s' doesn't validate", FN_WARC_PAYLOAD_DIGEST, actual.header.warcTypeStr)
- .isTrue();
- break;
- case RT_RESOURCE:
- assertThat(actual.header.warcTargetUriStr)
- .as("%s for record type '%s' should not be null", FN_WARC_TARGET_URI, actual.header.warcTypeStr)
- .isNotEmpty();
- if ("text/dns".equals(actual.header.contentTypeStr)) {
- assertThat(actual.header.warcConcurrentToList)
- .as("%s for record type '%s' should be empty", FN_WARC_CONCURRENT_TO, actual.header.warcTypeStr)
- .isEmpty();
- } else {
- assertThat(actual.header.warcConcurrentToList)
- .as("%s for record type '%s' should not be empty", FN_WARC_CONCURRENT_TO, actual.header.warcTypeStr)
- .isNotEmpty();
- }
- assertThat(actual.header.warcBlockDigestStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcIpAddress)
- .as("%s for record type '%s' should not be empty", FN_WARC_IP_ADDRESS, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcWarcinfoIdStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_WARCINFO_ID, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.isValidBlockDigest)
- .as("%s for record type '%s' doesn't validate", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isTrue();
- assertThat(actual.header.warcPayloadDigestStr)
- .as("%s for record type '%s' should be empty", FN_WARC_PAYLOAD_DIGEST, actual.header.warcTypeStr)
- .isNullOrEmpty();
- break;
- case RT_REVISIT:
- assertThat(actual.header.warcTargetUriStr)
- .as("%s for record type '%s' should not be null", FN_WARC_TARGET_URI, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcRefersToStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_REFERS_TO, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcConcurrentToList)
- .as("%s for record type '%s' should not be empty", FN_WARC_CONCURRENT_TO, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcBlockDigestStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcPayloadDigestStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_PAYLOAD_DIGEST, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcIpAddress)
- .as("%s for record type '%s' should not be empty", FN_WARC_IP_ADDRESS, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.header.warcWarcinfoIdStr)
- .as("%s for record type '%s' should not be empty", FN_WARC_WARCINFO_ID, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.isValidBlockDigest)
- .as("%s for record type '%s' doesn't validate", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isTrue();
- break;
- case RT_WARCINFO:
- assertThat(actual.header.warcPayloadDigestStr)
- .as("%s for record type '%s' should be empty", FN_WARC_PAYLOAD_DIGEST, actual.header.warcTypeStr)
- .isNullOrEmpty();
- assertThat(actual.header.warcFilename)
- .as("%s for record type '%s' should not be null", FN_WARC_FILENAME, actual.header.warcTypeStr)
- .isNotEmpty();
- assertThat(actual.isValidBlockDigest)
- .as("%s for record type '%s' doesn't validate", FN_WARC_BLOCK_DIGEST, actual.header.warcTypeStr)
- .isTrue();
- break;
- default:
- failWithMessage("Illegal WARC-Type <%s>", actual.header.warcTypeStr);
- }
- return this;
- }
- }
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/ContentwriterServiceTest.java b/src/test/java/no/nb/nna/veidemann/contentwriter/ContentwriterServiceTest.java
deleted file mode 100644
index ebdc0c4..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/ContentwriterServiceTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusException;
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import no.nb.nna.veidemann.commons.client.ContentWriterClient;
-import no.nb.nna.veidemann.commons.db.DbService;
-import no.nb.nna.veidemann.commons.db.DbServiceSPI;
-import no.nb.nna.veidemann.contentwriter.warc.SingleWarcWriter;
-import no.nb.nna.veidemann.contentwriter.warc.WarcCollectionRegistry;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.net.URISyntaxException;
-
-import static org.mockito.Mockito.mock;
-
-/**
- *
- */
-public class ContentwriterServiceTest {
-
- private final String uniqueServerName = "in-process server for " + getClass();
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testSaveEntity() throws StatusException, InterruptedException, URISyntaxException {
- DbServiceSPI dbProviderMock = mock(DbServiceSPI.class);
- DbService.configure(dbProviderMock);
- WarcCollectionRegistry warcCollectionRegistry = mock(WarcCollectionRegistry.class);
- SingleWarcWriter singleWarcWriterMock = mock(SingleWarcWriter.class);
-
- InProcessServerBuilder inProcessServerBuilder = InProcessServerBuilder.forName(uniqueServerName).directExecutor();
- ManagedChannelBuilder inProcessChannelBuilder = InProcessChannelBuilder.forName(uniqueServerName).directExecutor();
- try (ApiServer inProcessServer = new ApiServer(inProcessServerBuilder, warcCollectionRegistry).start();
- ContentWriterClient client = new ContentWriterClient(inProcessChannelBuilder);) {
-
-// when(warcWriterPoolMock.borrow()).thenReturn(pooledWarcWriterMock);
-// when(pooledWarcWriterMock.getWarcWriter()).thenReturn(singleWarcWriterMock);
-
-
-// when(singleWarcWriterMock.writeWarcHeader(any())).thenReturn(new URI("foo:bar"));
-//
-// ContentWriterSession session1 = client.createSession();
-// ContentWriterSession session2 = client.createSession();
-//
-// session1.sendHeader(ByteString.copyFromUtf8("head1"));
-// session2.sendHeader(ByteString.copyFromUtf8("head2"));
-// session1.sendCrawlLog(CrawlLog.getDefaultInstance());
-// session2.sendCrawlLog(CrawlLog.getDefaultInstance());
-// session1.finish();
-// session2.finish();
- }
- }
-
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/WarcFile.java b/src/test/java/no/nb/nna/veidemann/contentwriter/WarcFile.java
deleted file mode 100644
index b71bd75..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/WarcFile.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import okhttp3.HttpUrl;
-import okhttp3.Request;
-import okhttp3.Response;
-import org.jwat.warc.WarcReader;
-import org.jwat.warc.WarcReaderFactory;
-import org.jwat.warc.WarcRecord;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Spliterators;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- *
- */
-public class WarcFile {
-
- private String name;
-
- private long size;
-
- private String uri;
-
- WarcFile(Object o) {
- if (o instanceof Map) {
- Map m = (Map) o;
- name = (String) m.get("name");
- size = ((Double) m.get("size")).longValue();
- uri = (String) m.get("uri");
- } else {
- throw new IllegalArgumentException("expected java.util.Map, found " + o.getClass());
- }
- }
-
- public String getName() {
- return name;
- }
-
- public long getSize() {
- return size;
- }
-
- public Stream getContent() {
- HttpUrl url = WarcInspector.WARC_SERVER_URL.resolve("warcs/" + name);
- Request request = new Request.Builder().url(url).build();
- try {
- Response response = WarcInspector.CLIENT.newCall(request).execute();
- if (response.isSuccessful()) {
- WarcReader warcReader = WarcReaderFactory.getReader(response.body().byteStream());
- warcReader.setBlockDigestEnabled(true);
- warcReader.setPayloadDigestEnabled(true);
- return StreamSupport.stream(Spliterators.spliteratorUnknownSize(warcReader.iterator(), 0), false)
- .onClose(() -> {
- warcReader.close();
- response.close();
- if (!(warcReader.diagnostics.getErrors().isEmpty() && warcReader.diagnostics.getWarnings().isEmpty())) {
- System.err.println("WARC file '" + getName() + "' is not valid:");
- System.err.println(" Errors: " + warcReader.diagnostics.getErrors());
- System.err.println(" Warnings: " + warcReader.diagnostics.getWarnings());
- throw new RuntimeException("WARC file '" + getName() + "' is not valid");
- }
- });
- } else {
- throw new IOException("Unexpected code " + response);
- }
- } catch (Exception e) {
- System.out.println("---------------");
- e.printStackTrace();
- }
- return null;
- }
-
- @Override
- public String toString() {
- return "WarcFile{" + "name=" + name + ", uri=" + uri + ", size=" + size + '}';
- }
-
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/WarcFileSet.java b/src/test/java/no/nb/nna/veidemann/contentwriter/WarcFileSet.java
deleted file mode 100644
index e22c62e..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/WarcFileSet.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import org.jwat.warc.WarcRecord;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- *
- */
-public class WarcFileSet {
-
- private final List warcFiles;
-
- public WarcFileSet(Stream fileStream) {
- warcFiles = fileStream.collect(Collectors.toList());
- }
-
- public Stream listFiles() {
- return warcFiles.stream();
- }
-
- public Stream getRecordStream() {
- Stream[] streams = listFiles()
- .map(f -> f.getContent()).collect(Collectors.toList()).toArray(new Stream[]{});
- return Stream.of(streams).flatMap(s -> s).onClose(() -> {
- for (Stream s : streams) {
- try {
- s.close();
- } catch (Exception e) {
- // Nothing we can do except ensure that other streams are closed
- // even if one throws an exception.
- }
- }
- });
- }
-
- public Stream getContentRecordStream() {
- return getRecordStream().filter(r -> r.header.warcTargetUriStr != null);
- }
-
- public long getRecordCount() {
- return getContentRecordStream().count();
- }
-
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/WarcInspector.java b/src/test/java/no/nb/nna/veidemann/contentwriter/WarcInspector.java
deleted file mode 100644
index c7d789f..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/WarcInspector.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter;
-
-import com.google.common.net.HttpHeaders;
-import com.google.gson.Gson;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.List;
-
-/**
- *
- */
-public class WarcInspector {
-
- final static OkHttpClient CLIENT = new OkHttpClient.Builder().followRedirects(true).build();
-
- final static Gson GSON = new Gson();
-
- final static HttpUrl WARC_SERVER_URL;
-
- static {
- String warcServerHost = System.getProperty("contentexplorer.host");
- int warcServerPort = Integer.parseInt(System.getProperty("contentexplorer.port"));
-
- WARC_SERVER_URL = new HttpUrl.Builder()
- .scheme("http")
- .host(warcServerHost)
- .port(warcServerPort)
- .build();
- }
-
- private WarcInspector() {
- }
-
- public static WarcFileSet getWarcFiles() throws UncheckedIOException {
- HttpUrl url = WARC_SERVER_URL.resolve("warcs");
-
- Request request = new Request.Builder()
- .url(url)
- .header(HttpHeaders.ACCEPT, "application/json")
- .build();
-
- try (Response response = CLIENT.newCall(request).execute();) {
- if (response.isSuccessful()) {
- return new WarcFileSet(GSON.fromJson(response.body().charStream(), List.class)
- .stream().map(m -> new WarcFile(m)));
- } else {
- throw new IOException("Unexpected code " + response);
- }
- } catch(IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-
- public static void deleteWarcFiles() {
- HttpUrl url = WARC_SERVER_URL.resolve("warcs");
-
- Request request = new Request.Builder()
- .delete()
- .url(url)
- .build();
-
- try (Response response = CLIENT.newCall(request).execute();) {
- if (!response.isSuccessful()) {
- throw new IOException("Unexpected code " + response);
- }
- } catch(IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-
- public static String getWarcHeadersForStorageRef(String storageRef) {
- HttpUrl url = WARC_SERVER_URL.resolve("storageref/" + storageRef + "/warcheader");
-
- Request request = new Request.Builder()
- .url(url)
- .build();
-
- try (Response response = CLIENT.newCall(request).execute();) {
- if (response.isSuccessful()) {
- return response.body().string();
- } else {
- throw new IOException("Unexpected code " + response);
- }
- } catch(IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-
- public static byte[] getWarcContentForStorageRef(String storageRef) {
- HttpUrl url = WARC_SERVER_URL.resolve("storageref/" + storageRef);
-
- Request request = new Request.Builder()
- .url(url)
- .build();
-
- try (Response response = CLIENT.newCall(request).execute();) {
- if (response.isSuccessful()) {
- return response.body().bytes();
- } else {
- throw new IOException("Unexpected code " + response);
- }
- } catch(IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/WriteSessionContextBuilder.java b/src/test/java/no/nb/nna/veidemann/contentwriter/WriteSessionContextBuilder.java
deleted file mode 100644
index a43ea05..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/WriteSessionContextBuilder.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package no.nb.nna.veidemann.contentwriter;
-
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequestMeta;
-
-public class WriteSessionContextBuilder {
-
- private WriteRequestMeta.Builder writeRequestMeta;
- private ConfigObject collectionConfig;
-
- public WriteSessionContextBuilder withWriteRequestMeta(WriteRequestMeta.Builder writeRequestMeta) {
- this.writeRequestMeta = writeRequestMeta;
- return this;
- }
-
- public WriteSessionContextBuilder withCollectionConfig(ConfigObject collectionConfig) {
- this.collectionConfig = collectionConfig;
- return this;
- }
-
- public WriteSessionContext build() {
- WriteSessionContext context = new WriteSessionContext();
-
- context.collectionConfig = collectionConfig;
- context.writeRequestMeta = writeRequestMeta;
-
- return context;
- }
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/warc/SingleWarcWriterTest.java b/src/test/java/no/nb/nna/veidemann/contentwriter/warc/SingleWarcWriterTest.java
deleted file mode 100644
index f2d8293..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/warc/SingleWarcWriterTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Copyright 2017 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import com.google.protobuf.ByteString;
-import no.nb.nna.veidemann.api.config.v1.Collection;
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.api.contentwriter.v1.RecordType;
-import no.nb.nna.veidemann.api.contentwriter.v1.WriteRequestMeta;
-import no.nb.nna.veidemann.commons.db.ExecutionsAdapter;
-import no.nb.nna.veidemann.commons.db.DbService;
-import no.nb.nna.veidemann.commons.db.DbServiceSPI;
-import no.nb.nna.veidemann.contentwriter.ContentBuffer;
-import no.nb.nna.veidemann.contentwriter.WriteSessionContext;
-import no.nb.nna.veidemann.contentwriter.WriteSessionContextBuilder;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- *
- */
-public class SingleWarcWriterTest {
-
- private final static String requestHeader = "Host: elg.no\n" +
- "User-Agent: Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:69.0) Gecko/20100101 Firefox/69.0\n" +
- "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\n" +
- "Accept-Language: nb-NO,nb;q=0.9,no-NO;q=0.8,no;q=0.6,nn-NO;q=0.5,nn;q=0.4,en-US;q=0.3,en;q=0.1\n" +
- "Accept-Encoding: gzip, deflate\n" +
- "Connection: keep-alive\n" +
- "Upgrade-Insecure-Requests: 1\n";
-
- private final static String responseHeader = "HTTP/1.1 200 OK\n" +
- "Date: Thu, 03 Oct 2019 09:17:44 GMT\n" +
- "Content-Type: text/html\n" +
- "Transfer-Encoding: chunked\n" +
- "Connection: keep-alive\n" +
- "Set-Cookie: __cfduid=deafaee9d9fb85ec39631049d132a1e481570094263; expires=Fri, 02-Oct-20 09:17:43 GMT; path=/; domain=.elg.no; HttpOnly\n" +
- "Last-Modified: Wed, 11 Sep 2019 10:56:04 GMT\n" +
- "CF-Cache-Status: DYNAMIC\n" +
- "Server: cloudflare\n" +
- "CF-RAY: 51fdd31d6a85d895-CPH\n" +
- "Content-Encoding: gzip\n";
-
- private final static String responsePayload = "\n" +
- "\n" +
- "\n" +
- "\n" +
- "\n" +
- "\n" +
- "\n" +
- "\n" +
- "www.elg.no
\n" +
- "\n" +
- "Elger er gromme dyr.
\n" +
- "Elgkalvene er mat for bl.a. ulv.
\n" +
- "Last ned Vivaldi på vivaldi.com
\n" +
- "\n" +
- "\n";
-
- private static final String hostName = "test-host";
-
- private static final String filePrefix = "test";
-
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- /**
- * Test of write method, of class SingleWarcWriter.
- */
- @Test
- public void testWrite() throws Exception {
- DbServiceSPI dbProviderMock = mock(DbServiceSPI.class);
- when(dbProviderMock.getExecutionsAdapter()).thenReturn(mock(ExecutionsAdapter.class));
- DbService.configure(dbProviderMock);
-
- final File targetDir = temporaryFolder.getRoot();
- final boolean compress = false;
-
- final Collection collection = Collection.newBuilder()
- .setCompress(compress)
- .setFileSize(2048)
- .build();
-
- final ConfigObject collectionConfig = ConfigObject.newBuilder()
- .setCollection(collection)
- .build();
-
-
- final WriteRequestMeta.Builder writeRequestMeta = WriteRequestMeta.newBuilder()
- .setTargetUri("http://elg.no")
- .setIpAddress("104.24.117.137");
-
- final WriteSessionContext context = new WriteSessionContextBuilder()
- .withWriteRequestMeta(writeRequestMeta)
- .withCollectionConfig(collectionConfig)
- .build();
-
- // Request
- ByteString header0 = ByteString.copyFromUtf8(requestHeader);
-
- WriteSessionContext.RecordData recordData0 = context.getRecordData(0);
- ContentBuffer contentBuffer0 = recordData0.getContentBuffer();
- contentBuffer0.setHeader(header0);
-
- WriteRequestMeta.RecordMeta rm0 = WriteRequestMeta.RecordMeta.newBuilder()
- .setSize(contentBuffer0.getTotalSize())
- .setType(RecordType.REQUEST)
- .build();
- writeRequestMeta.putRecordMeta(0, rm0);
-
- // Response
- ByteString header1 = ByteString.copyFromUtf8(responseHeader);
- ByteString payload1 = ByteString.copyFromUtf8(responsePayload);
-
- WriteSessionContext.RecordData recordData1 = context.getRecordData(1);
- ContentBuffer contentBuffer1 = recordData1.getContentBuffer();
- contentBuffer1.setHeader(header1);
- contentBuffer1.addPayload(payload1);
-
- WriteRequestMeta.RecordMeta rm1 = WriteRequestMeta.RecordMeta.newBuilder()
- .setType(RecordType.RESPONSE)
- .setRecordContentType("text/html")
- .setSize(contentBuffer1.getTotalSize())
- .build();
-
- writeRequestMeta.putRecordMeta(1, rm1);
-
- try (SingleWarcWriter writer = new SingleWarcWriter(collectionConfig, null, filePrefix, targetDir, hostName)) {
- for (Integer recordNum : context.getRecordNums()) {
- try (WriteSessionContext.RecordData recordData = context.getRecordData(recordNum)) {
- writer.writeRecord(recordData);
- } catch (Exception e) {
- e.printStackTrace(System.err);
- throw e;
- }
- }
- }
-
-
- final File[] files = targetDir.listFiles();
- assertThat(files).isNotNull();
- for (final File f : files) {
- assertThat(f).hasExtension(compress ? "gz" : "warc");
- }
- }
-}
diff --git a/src/test/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollectionTest.java b/src/test/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollectionTest.java
deleted file mode 100644
index 2bba3cc..0000000
--- a/src/test/java/no/nb/nna/veidemann/contentwriter/warc/WarcCollectionTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2019 National Library of Norway.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package no.nb.nna.veidemann.contentwriter.warc;
-
-import no.nb.nna.veidemann.api.config.v1.Collection.RotationPolicy;
-import no.nb.nna.veidemann.api.config.v1.ConfigObject;
-import no.nb.nna.veidemann.db.ProtoUtils;
-import org.junit.Test;
-
-import java.time.OffsetDateTime;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class WarcCollectionTest {
-
- @Test
- public void shouldFlushFiles() {
- ConfigObject.Builder config = ConfigObject.newBuilder();
- config.getCollectionBuilder().setFileRotationPolicy(RotationPolicy.NONE);
-
- WarcCollection col = new WarcCollection(config.build());
- OffsetDateTime now = ProtoUtils.getNowOdt();
-
- assertThat(col.shouldFlushFiles(config.build(), now)).isFalse();
- OffsetDateTime tomorrow = now.plusDays(1);
- assertThat(col.shouldFlushFiles(config.build(), tomorrow)).isFalse();
-
- config.getCollectionBuilder().setFileRotationPolicy(RotationPolicy.DAILY);
- assertThat(col.shouldFlushFiles(config.build(), now)).isTrue();
- assertThat(col.shouldFlushFiles(config.build(), tomorrow)).isTrue();
-
- col = new WarcCollection(config.build());
- assertThat(col.shouldFlushFiles(config.build(), now)).isFalse();
- assertThat(col.shouldFlushFiles(config.build(), tomorrow)).isTrue();
- }
-}
\ No newline at end of file
diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf
deleted file mode 100644
index 88996cf..0000000
--- a/src/test/resources/application.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-logTraffic=false
-
-# The port the server listens to.
-apiPort=8080
-apiPort=${?API_PORT}
-
-# Where to put WARC files
-warcDir=".";
-warcDir=${?WARC_DIR}
-
-warcWriterPoolSize=2
-warcWriterPoolSize=${?WARC_WRITER_POOL_SIZE}
-
-# Where to put temporary files
-workDir="."
-workDir=${?WORK_DIR}
-
-# Regular expression matching url's which are allowed to do cross origin resource requests
-corsAllowedOriginPattern=""
-
-hostName="unknown"
-hostName=${?HOST_NAME}
-
-terminationGracePeriodSeconds=60
-terminationGracePeriodSeconds=${?TERMINATION_GRACE_PERIOD_SECONDS}
diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml
deleted file mode 100644
index 0526386..0000000
--- a/src/test/resources/log4j2.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/telemetry/metrics.go b/telemetry/metrics.go
new file mode 100644
index 0000000..fecbe9a
--- /dev/null
+++ b/telemetry/metrics.go
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2020 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package telemetry
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+ CanonicalizationsTotal = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: metricsNs,
+ Subsystem: metricsSubsystem,
+ Name: "canonicalizations_total",
+ Help: "Total URIs canonicalized",
+ })
+
+ ScopechecksTotal = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: metricsNs,
+ Subsystem: metricsSubsystem,
+ Name: "scopechecks_total",
+ Help: "Total URIs checked for scope inclusion",
+ })
+
+ ScopecheckResponseTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: metricsNs,
+ Subsystem: metricsSubsystem,
+ Name: "scopecheck_response_total",
+ Help: "Total scopecheck responses for each response code",
+ },
+ []string{"code"},
+ )
+
+ CompileScriptSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: metricsNs,
+ Subsystem: metricsSubsystem,
+ Name: "script_compile_seconds",
+ Help: "Time for compiling a script in seconds",
+ Buckets: []float64{.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10, 20, 30, 40, 50, 60, 120, 180, 240},
+ })
+
+ ExecuteScriptSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: metricsNs,
+ Subsystem: metricsSubsystem,
+ Name: "script_execute_seconds",
+ Help: "Time for executing a script in seconds",
+ Buckets: []float64{.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10, 20, 30, 40, 50, 60, 120, 180, 240},
+ })
+)
+
+const (
+ metricsNs = "veidemann"
+ metricsSubsystem = "scopeservice"
+)
diff --git a/telemetry/metrics_server.go b/telemetry/metrics_server.go
new file mode 100644
index 0000000..2a0271f
--- /dev/null
+++ b/telemetry/metrics_server.go
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2020 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package telemetry
+
+import (
+ "context"
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/collectors"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/rs/zerolog/log"
+ "net/http"
+ "sync"
+ "time"
+)
+
+var once sync.Once
+
+// MetricsServer is the Prometheus metrics endpoint for the Browser Controller
+type MetricsServer struct {
+ addr string
+ path string
+ server *http.Server
+}
+
+// NewMetricsServer returns a new instance of MetricsServer listening on the given port
+func NewMetricsServer(listenInterface string, listenPort int, path string) *MetricsServer {
+ a := &MetricsServer{
+ addr: fmt.Sprintf("%s:%d", listenInterface, listenPort),
+ path: path,
+ }
+ once.Do(func() {
+ prometheus.MustRegister(
+ CanonicalizationsTotal,
+ ScopechecksTotal,
+ ScopecheckResponseTotal,
+ CompileScriptSeconds,
+ ExecuteScriptSeconds,
+ collectors.NewBuildInfoCollector(),
+ )
+ })
+
+ return a
+}
+
+func (a *MetricsServer) Start() error {
+ router := http.NewServeMux()
+ router.Handle(a.path, promhttp.Handler())
+
+ a.server = &http.Server{
+ Addr: a.addr,
+ Handler: router,
+ ReadTimeout: 5 * time.Second,
+ WriteTimeout: 5 * time.Second,
+ IdleTimeout: 5 * time.Second,
+ }
+
+ log.Info().Msgf("Metrics server listening on address: %s", a.addr)
+ err := a.server.ListenAndServe()
+ if err != nil && !errors.Is(err, http.ErrServerClosed) {
+ return fmt.Errorf("failed to listen on %s: %w", a.addr, err)
+ }
+ return nil
+}
+
+func (a *MetricsServer) Close() {
+ log.Info().Msgf("Shutting down Metrics server")
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ a.server.SetKeepAlivesEnabled(false)
+ _ = a.server.Shutdown(ctx)
+}
diff --git a/telemetry/tracer.go b/telemetry/tracer.go
new file mode 100644
index 0000000..fe67d24
--- /dev/null
+++ b/telemetry/tracer.go
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2020 National Library of Norway.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package telemetry
+
+import (
+ "github.com/opentracing/opentracing-go"
+ "github.com/uber/jaeger-client-go/config"
+ "github.com/uber/jaeger-client-go/log"
+ "io"
+)
+
+// Init returns an instance of Jaeger Tracer that samples 100% of traces and logs all spans to stdout.
+func InitTracer(service string) (opentracing.Tracer, io.Closer) {
+ cfg, err := config.FromEnv()
+ if err != nil {
+ log.StdLogger.Infof("ERROR: cannot init Jaeger from environment: %v", err)
+ return nil, nil
+ }
+ if cfg.ServiceName == "" {
+ cfg.ServiceName = service
+ }
+
+ tracer, closer, err := cfg.NewTracer(config.Logger(log.StdLogger))
+ if err != nil {
+ log.StdLogger.Infof("ERROR: cannot init Jaeger: %v", err)
+ return nil, nil
+ }
+ return tracer, closer
+}