Skip to content

Commit

Permalink
Merge pull request #23 from nlnwa/fix-uuid
Browse files Browse the repository at this point in the history
Fixes, features and refactors
  • Loading branch information
andrbo authored Oct 14, 2021
2 parents dd690e1 + 76d6060 commit 36ae4ea
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 125 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.0 // indirect
github.com/google/uuid v1.2.0
github.com/nlnwa/gowarc v1.0.0-alpha.14
github.com/nlnwa/veidemann-api/go v0.0.0-20210414094839-b36ce92632fe
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func main() {
pflag.Bool("flush-record", false, "if true, flush WARC-file to disk after each record.")
pflag.String("work-dir", "", "")
pflag.Int("termination-grace-period-seconds", 0, "")
pflag.Bool("strict-validation", false, "if true, use strict record validation")

pflag.String("db-host", "rethinkdb-proxy", "DB host")
pflag.Int("db-port", 28015, "DB port")
Expand Down
81 changes: 39 additions & 42 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package server

import (
"fmt"
"github.com/nlnwa/gowarc"
"github.com/nlnwa/veidemann-contentwriter/database"
"github.com/nlnwa/veidemann-contentwriter/settings"
"google.golang.org/grpc/codes"
"io"
"net"

"github.com/nlnwa/gowarc"
"github.com/nlnwa/veidemann-api/go/contentwriter/v1"
"github.com/nlnwa/veidemann-contentwriter/database"
"github.com/nlnwa/veidemann-contentwriter/settings"
"github.com/nlnwa/veidemann-contentwriter/telemetry"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"net"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type GrpcServer struct {
Expand All @@ -43,15 +44,22 @@ type GrpcServer struct {
}

func New(host string, port int, settings settings.Settings, configCache database.ConfigCache) *GrpcServer {
recordOpts := []gowarc.WarcRecordOption{
gowarc.WithBufferTmpDir(settings.WorkDir()),
gowarc.WithVersion(settings.WarcVersion()),
}
if settings.UseStrictValidation() {
recordOpts = append(recordOpts, gowarc.WithStrictValidation())
}
s := &GrpcServer{
listenHost: host,
listenPort: port,
settings: settings,
configCache: configCache,
service: &ContentWriterService{
settings: settings,
warcWriterRegistry: newWarcWriterRegistry(settings, configCache),
configCache: configCache,
recordOptions: recordOpts,
},
}
return s
Expand Down Expand Up @@ -83,78 +91,67 @@ func (s *GrpcServer) Shutdown() {

type ContentWriterService struct {
contentwriter.UnimplementedContentWriterServer
settings settings.Settings
configCache database.ConfigCache
warcWriterRegistry *warcWriterRegistry
recordOptions []gowarc.WarcRecordOption
}

func (s *ContentWriterService) Write(stream contentwriter.ContentWriter_WriteServer) error {
func (s *ContentWriterService) Write(stream contentwriter.ContentWriter_WriteServer) (err error) {
telemetry.ScopechecksTotal.Inc()
//telemetry.ScopecheckResponseTotal.With(prometheus.Labels{"code": strconv.Itoa(int(result.ExcludeReason))}).Inc()
ctx := newWriteSessionContext(s.settings, s.configCache)
ctx := newWriteSessionContext(s.configCache, s.recordOptions)
defer ctx.cancelSession()
defer func() {
if err != nil {
log.Error().Err(err).Str("code", status.Code(err).String()).Msg("")
}
}()

for {
request, err := stream.Recv()
if err == io.EOF {
return s.onCompleted(ctx, stream)
break
}
if err != nil {
log.Err(err).Msgf("Error caught: %s", err.Error())
ctx.cancelSession(err.Error())
return err
}

switch v := request.Value.(type) {
case *contentwriter.WriteRequest_Meta:
log.Trace().Msgf("Got API request %T for %d records", v, len(v.Meta.RecordMeta))
if err := ctx.setWriteRequestMeta(v.Meta); err != nil {
ctx.cancelSession(err.Error())
return err
}
ctx.setWriteRequestMeta(v.Meta)
case *contentwriter.WriteRequest_ProtocolHeader:
log.Trace().Msgf("Got API request %T for record #%d. Size: %d", v, v.ProtocolHeader.RecordNum, len(v.ProtocolHeader.GetData()))
if err := ctx.writeProtocolHeader(v.ProtocolHeader); err != nil {
return err
return status.Errorf(codes.Unknown, "failed to write protocol header: %v", err)
}
case *contentwriter.WriteRequest_Payload:
log.Trace().Msgf("Got API request %T for record #%d. Size: %d", v, v.Payload.RecordNum, len(v.Payload.GetData()))
if err := ctx.writePayoad(v.Payload); err != nil {
return err
if err := ctx.writePayload(v.Payload); err != nil {
return status.Errorf(codes.Unknown, "failed to write payload: %v", err)
}
case *contentwriter.WriteRequest_Cancel:
log.Trace().Msgf("Got API request %T", v)
ctx.cancelSession(v.Cancel)
log.Debug().Str("reason", v.Cancel).Msg("Write request cancelled")
return stream.SendAndClose(new(contentwriter.WriteReply))
default:
return fmt.Errorf("Invalid request %s", v)
return status.Errorf(codes.InvalidArgument, "invalid write request: %v", v)
}
}
}

func (s *ContentWriterService) onCompleted(context *writeSessionContext, stream contentwriter.ContentWriter_WriteServer) error {
if context.canceled {
return context.handleErr(codes.Canceled, "Session canceled")
//return stream.SendAndClose(&contentwriter.WriteReply{})
}

if context.meta == nil {
return context.handleErr(codes.InvalidArgument, "Missing metadata object")
}

if err := context.validateSession(); err != nil {
context.cancelSession("Validation failed: " + err.Error())
return err
if err := ctx.validateSession(); err != nil {
return status.Errorf(codes.Unknown, "validation failed: %v", err)
}

records := make([]gowarc.WarcRecord, len(context.records))
records := make([]gowarc.WarcRecord, len(ctx.records))
for i := 0; i < len(records); i++ {
records[i] = context.records[int32(i)]
records[i] = ctx.records[int32(i)]
}
writer := s.warcWriterRegistry.GetWarcWriter(context.collectionConfig, context.meta.RecordMeta[0])
writeResponseMeta, err := writer.Write(context.meta, records...)
writer := s.warcWriterRegistry.GetWarcWriter(ctx.collectionConfig, ctx.meta.RecordMeta[0])
writeReply, err := writer.Write(ctx.meta, records...)
if err != nil {
context.cancelSession("Failed writing record: " + err.Error())
return err
return status.Errorf(codes.Unknown, "failed writing record(s): %v", err)
}

return stream.SendAndClose(writeResponseMeta)
return stream.SendAndClose(writeReply)
}
12 changes: 6 additions & 6 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestContentWriterService_Write(t *testing.T) {

assert.Equal(int32(0), reply.Meta.RecordMeta[0].RecordNum)
assert.Equal(contentwriter.RecordType_REQUEST, reply.Meta.RecordMeta[0].Type)
assert.Regexp("<urn:uuid:.{8}-.{4}-.{4}-.{4}-.{12}>", reply.Meta.RecordMeta[0].WarcId)
assert.Regexp(".{8}-.{4}-.{4}-.{4}-.{12}", reply.Meta.RecordMeta[0].WarcId)
assert.Equal("sha1:AD6944346BF47CEACBE14E387EB031FCBDB59227", reply.Meta.RecordMeta[0].BlockDigest)
assert.Equal("sha1:DA39A3EE5E6B4B0D3255BFEF95601890AFD80709", reply.Meta.RecordMeta[0].PayloadDigest)
assert.Equal("c1_2000101002", reply.Meta.RecordMeta[0].CollectionFinalName)
Expand All @@ -263,7 +263,7 @@ func TestContentWriterService_Write(t *testing.T) {

assert.Equal(int32(1), reply.Meta.RecordMeta[1].RecordNum)
assert.Equal(contentwriter.RecordType_RESPONSE, reply.Meta.RecordMeta[1].Type)
assert.Regexp("<urn:uuid:.{8}-.{4}-.{4}-.{4}-.{12}>", reply.Meta.RecordMeta[1].WarcId)
assert.Regexp(".{8}-.{4}-.{4}-.{4}-.{12}", reply.Meta.RecordMeta[1].WarcId)
assert.Equal("sha1:4126C2DC27F113BEEC37A46276514CD4300DA10D", reply.Meta.RecordMeta[1].BlockDigest)
assert.Equal("sha1:C37FFB221569C553A2476C22C7DAD429F3492977", reply.Meta.RecordMeta[1].PayloadDigest)
assert.Equal("c1_2000101002", reply.Meta.RecordMeta[1].CollectionFinalName)
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestContentWriterService_Write_Compressed(t *testing.T) {

assert.Equal(int32(0), reply.Meta.RecordMeta[0].RecordNum)
assert.Equal(contentwriter.RecordType_REQUEST, reply.Meta.RecordMeta[0].Type)
assert.Regexp("<urn:uuid:.{8}-.{4}-.{4}-.{4}-.{12}>", reply.Meta.RecordMeta[0].WarcId)
assert.Regexp(".{8}-.{4}-.{4}-.{4}-.{12}", reply.Meta.RecordMeta[0].WarcId)
assert.Equal("sha1:AD6944346BF47CEACBE14E387EB031FCBDB59227", reply.Meta.RecordMeta[0].BlockDigest)
assert.Equal("sha1:DA39A3EE5E6B4B0D3255BFEF95601890AFD80709", reply.Meta.RecordMeta[0].PayloadDigest)
assert.Equal("c2_2000101002", reply.Meta.RecordMeta[0].CollectionFinalName)
Expand All @@ -323,7 +323,7 @@ func TestContentWriterService_Write_Compressed(t *testing.T) {

assert.Equal(int32(1), reply.Meta.RecordMeta[1].RecordNum)
assert.Equal(contentwriter.RecordType_RESPONSE, reply.Meta.RecordMeta[1].Type)
assert.Regexp("<urn:uuid:.{8}-.{4}-.{4}-.{4}-.{12}>", reply.Meta.RecordMeta[1].WarcId)
assert.Regexp(".{8}-.{4}-.{4}-.{4}-.{12}", reply.Meta.RecordMeta[1].WarcId)
assert.Equal("sha1:4126C2DC27F113BEEC37A46276514CD4300DA10D", reply.Meta.RecordMeta[1].BlockDigest)
assert.Equal("sha1:C37FFB221569C553A2476C22C7DAD429F3492977", reply.Meta.RecordMeta[1].PayloadDigest)
assert.Equal("c2_2000101002", reply.Meta.RecordMeta[1].CollectionFinalName)
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestContentWriterService_WriteRevisit(t *testing.T) {

assert.Equal(int32(0), reply.Meta.RecordMeta[0].RecordNum)
assert.Equal(contentwriter.RecordType_REQUEST, reply.Meta.RecordMeta[0].Type)
assert.Regexp("<urn:uuid:.{8}-.{4}-.{4}-.{4}-.{12}>", reply.Meta.RecordMeta[0].WarcId)
assert.Regexp(".{8}-.{4}-.{4}-.{4}-.{12}", reply.Meta.RecordMeta[0].WarcId)
assert.Equal("sha1:AD6944346BF47CEACBE14E387EB031FCBDB59227", reply.Meta.RecordMeta[0].BlockDigest)
assert.Equal("sha1:DA39A3EE5E6B4B0D3255BFEF95601890AFD80709", reply.Meta.RecordMeta[0].PayloadDigest)
assert.Equal("c1_2000101002", reply.Meta.RecordMeta[0].CollectionFinalName)
Expand All @@ -378,7 +378,7 @@ func TestContentWriterService_WriteRevisit(t *testing.T) {

assert.Equal(int32(1), reply.Meta.RecordMeta[1].RecordNum)
assert.Equal(contentwriter.RecordType_REVISIT, reply.Meta.RecordMeta[1].Type)
assert.Regexp("<urn:uuid:.{8}-.{4}-.{4}-.{4}-.{12}>", reply.Meta.RecordMeta[1].WarcId)
assert.Regexp(".{8}-.{4}-.{4}-.{4}-.{12}", reply.Meta.RecordMeta[1].WarcId)
assert.Equal("sha1:C3BAD90968CC446FF64FED82D030AAB5A0B5884A", reply.Meta.RecordMeta[1].BlockDigest)
assert.Equal("sha1:C37FFB221569C553A2476C22C7DAD429F3492977", reply.Meta.RecordMeta[1].PayloadDigest)
assert.Equal("c1_2000101002", reply.Meta.RecordMeta[1].CollectionFinalName)
Expand Down
Loading

0 comments on commit 36ae4ea

Please sign in to comment.