Skip to content

Commit

Permalink
Add remote-storage service (#3836)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Aug 2, 2022
1 parent ea71bd1 commit 4daa4e9
Show file tree
Hide file tree
Showing 10 changed files with 869 additions and 1 deletion.
16 changes: 16 additions & 0 deletions cmd/remote-storage/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ARG base_image
ARG debug_image

ARG SVC=remote-storage

FROM $base_image AS release
ARG TARGETARCH
COPY $SVC-linux-$TARGETARCH /go/bin/$SVC-linux
EXPOSE 16686/tcp
ENTRYPOINT ["/go/bin/$SVC-linux"]

FROM $debug_image AS debug
ARG TARGETARCH=amd64
COPY $SVC-debug-linux-$TARGETARCH /go/bin/$SVC-linux
EXPOSE 12345/tcp 16686/tcp
ENTRYPOINT ["/go/bin/dlv", "exec", "/go/bin/$SVC-linux", "--headless", "--listen=:12345", "--api-version=2", "--accept-multiclient", "--log", "--"]
64 changes: 64 additions & 0 deletions cmd/remote-storage/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

const (
flagGRPCHostPort = "grpc.host-port"
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "grpc",
}

// Options holds configuration for remote-storage service.
type Options struct {
// GRPCHostPort is the host:port address for gRPC server
GRPCHostPort string
// TLSGRPC configures secure transport
TLSGRPC tlscfg.Options
// Tenancy configuration
Tenancy tenancy.Options
}

// AddFlags adds flags to flag set.
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(flagGRPCHostPort, ports.PortToHostPort(ports.RemoteStorageGRPC), "The host:port (e.g. 127.0.0.1:17271 or :17271) of the gRPC server")
tlsGRPCFlagsConfig.AddFlags(flagSet)
tenancy.AddFlags(flagSet)
}

// InitFromViper initializes Options with properties from CLI flags.
func (o *Options) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Options, error) {
o.GRPCHostPort = v.GetString(flagGRPCHostPort)
if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil {
o.TLSGRPC = tlsGrpc
} else {
return o, fmt.Errorf("failed to process gRPC TLS options: %w", err)
}
o.Tenancy = tenancy.InitFromViper(v)
return o, nil
}
47 changes: 47 additions & 0 deletions cmd/remote-storage/app/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
)

func TestFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--grpc.host-port=127.0.0.1:8081",
})
qOpts, err := new(Options).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.Equal(t, "127.0.0.1:8081", qOpts.GRPCHostPort)
}

func TestFailedTLSFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--grpc.tls.enabled=false",
"--grpc.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = new(Options).InitFromViper(v, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to process gRPC TLS options")
}
151 changes: 151 additions & 0 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"fmt"
"net"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Server runs a gRPC server
type Server struct {
logger *zap.Logger
opts *Options

grpcConn net.Listener
grpcServer *grpc.Server
unavailableChannel chan healthcheck.Status // used to signal to admin server that gRPC server is unavailable
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.TenancyManager, logger *zap.Logger) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, logger)
if err != nil {
return nil, err
}

grpcServer, err := createGRPCServer(options, tm, handler, logger)
if err != nil {
return nil, err
}

return &Server{
logger: logger,
opts: options,
grpcServer: grpcServer,
unavailableChannel: make(chan healthcheck.Status),
}, nil
}

func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
}
writer, err := f.CreateSpanWriter()
if err != nil {
return nil, err
}
depReader, err := f.CreateDependencyReader()
if err != nil {
return nil, err
}

impl := &shared.GRPCHandlerStorageImpl{
SpanReader: func() spanstore.Reader { return reader },
SpanWriter: func() spanstore.Writer { return writer },
DependencyReader: func() dependencystore.Reader { return depReader },
StreamingSpanWriter: func() spanstore.Writer { return nil },
}

// borrow code from Query service for archive storage
qOpts := &querysvc.QueryServiceOptions{}
// when archive storage not initialized (returns false), the reader/writer will be nil
_ = qOpts.InitArchiveStorage(f, logger)
impl.ArchiveSpanReader = func() spanstore.Reader { return qOpts.ArchiveSpanReader }
impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter }

handler := shared.NewGRPCHandler(impl)
return handler, nil
}

// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(opts *Options, tm *tenancy.TenancyManager, handler *shared.GRPCHandler, logger *zap.Logger) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if opts.TLSGRPC.Enabled {
tlsCfg, err := opts.TLSGRPC.Config(logger)
if err != nil {
return nil, fmt.Errorf("invalid TLS config: %w", err)
}
creds := credentials.NewTLS(tlsCfg)
grpcOpts = append(grpcOpts, grpc.Creds(creds))
}
if tm.Enabled {
grpcOpts = append(grpcOpts,
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
)
}

server := grpc.NewServer(grpcOpts...)
reflection.Register(server)
handler.Register(server)

return server, nil
}

// Start gRPC server concurrently
func (s *Server) Start() error {
listener, err := net.Listen("tcp", s.opts.GRPCHostPort)
if err != nil {
return err
}
s.logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.grpcConn = listener
go func() {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("GRPC server exited", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()

return nil
}

// Close stops http, GRPC servers and closes the port listener.
func (s *Server) Close() error {
s.grpcServer.Stop()
s.grpcConn.Close()
s.opts.TLSGRPC.Close()
return nil
}
Loading

0 comments on commit 4daa4e9

Please sign in to comment.