Skip to content

Commit

Permalink
fixup! Migrate device-provisioning-service to repository
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Aug 9, 2024
1 parent 64ac1bb commit 0d310bf
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
4 changes: 2 additions & 2 deletions coap-gateway/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (s *Service) processCommandTask(req *mux.Message, client *session, span tra
span.SetStatus(otelCodes.Error, err.Error())
}
if resp != nil {
otelcoap.MessageSentEvent(req.Context(), resp)
otelcoap.MessageSentEvent(req.Context(), otelcoap.MakeMessage(resp))
span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code()))
}
client.logRequestResponse(req, resp, err)
Expand All @@ -426,7 +426,7 @@ func (s *Service) makeCommandTask(req *mux.Message, client *session, fnc func(re
ctx, span := otelcoap.Start(req.Context(), path, req.Code().String(), otelcoap.WithTracerProvider(s.tracerProvider), otelcoap.WithSpanOptions(trace.WithSpanKind(trace.SpanKindServer)))
span.SetAttributes(semconv.NetPeerNameKey.String(client.deviceID()))
req.SetContext(ctx)
otelcoap.MessageReceivedEvent(ctx, req.Message)
otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(req.Message))
otelcoap.SetRequest(ctx, req.Message)

x := struct {
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ func (c *session) do(req *pool.Message) (*pool.Message, error) {
defer span.End()
span.SetAttributes(semconv.NetPeerNameKey.String(c.deviceID()))

otelcoap.MessageSentEvent(ctx, req)
otelcoap.MessageSentEvent(ctx, otelcoap.MakeMessage(req))

resp, err := c.coapConn.Do(req)
if err != nil {
span.RecordError(err)
span.SetStatus(otelCodes.Error, err.Error())
return nil, err
}
otelcoap.MessageReceivedEvent(ctx, resp)
otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(resp))
span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code()))

return resp, nil
Expand Down
20 changes: 11 additions & 9 deletions device-provisioning-service/service/provision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
)

const (
DPSCoapGwHost = "localhost:40002"
DPSHost = "localhost:20030"
DPSCoapGwHost = "127.0.0.1:40002"
DPSHost = "127.0.0.1:20030"
)

func TestProvisioning(t *testing.T) {
Expand Down Expand Up @@ -118,7 +118,9 @@ func TestProvisioning(t *testing.T) {
require.NotEmpty(t, got[0].GetDeviceId())
require.NotEmpty(t, got[0].GetEnrollmentGroupId())
require.NotEmpty(t, got[0].GetCreationDate())
require.NotEmpty(t, got[0].GetLocalEndpoints())
if !config.COAP_GATEWAY_UDP_ENABLED {
require.NotEmpty(t, got[0].GetLocalEndpoints())
}
require.NotEmpty(t, got[0].GetAcl().GetAccessControlList())
require.NotEmpty(t, got[0].GetAcl().GetStatus().GetDate())
require.NotEmpty(t, got[0].GetAcl().GetStatus().GetCoapCode())
Expand Down Expand Up @@ -442,10 +444,10 @@ func TestProvisioningFromNewDPSAddress(t *testing.T) {
_, shutdownSim := test.OnboardDpsSim(ctx, t, c, deviceID, dpsCfg.APIs.COAP.Addr, test.TestDevsimResources)
defer shutdownSim()

// change DPS to new address from "localhost:40030" to "localhost:50030" and restart DPS
// change DPS to new address from "127.0.0.1:40030" to "127.0.0.1:50030" and restart DPS
deferedDpsCleanUp = false
dpsShutDown()
dpsCfg.APIs.COAP.Addr = "localhost:50030"
dpsCfg.APIs.COAP.Addr = "127.0.0.1:50030"
h := newTestRequestHandler(t, dpsCfg, defaultTestDpsHandlerConfig())
h.StartDps(service.WithRequestHandler(h))
defer h.StopDps()
Expand Down Expand Up @@ -496,7 +498,7 @@ func TestProvisiongConnectToSecondaryServerByObserver(t *testing.T) {
}}, nil)
}
dpsCfg := test.MakeConfig(t)
dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways = []string{config.ACTIVE_COAP_SCHEME + "://" + "localhost:20999"} // should be unreachable
dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways = []string{config.ACTIVE_COAP_SCHEME + "://" + "127.0.0.1:20999"} // should be unreachable
dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways = append(dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST)
rh := newTestRequestHandler(t, dpsCfg, defaultTestDpsHandlerConfig())
testProvisioningWithDPSHandler(t, rh, time.Minute, test.WithConfigureDevice(setLowCloudObserverCheckCount))
Expand All @@ -517,7 +519,7 @@ func TestProvisiongConnectToSecondaryServerByRetryTimeout(t *testing.T) {
}}, nil)
}
dpsCfg := test.MakeConfig(t)
dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways = []string{config.ACTIVE_COAP_SCHEME + "://" + "localhost:20999"} // should be unreachable
dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways = []string{config.ACTIVE_COAP_SCHEME + "://" + "127.0.0.1:20999"} // should be unreachable
dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways = append(dpsCfg.EnrollmentGroups[0].Hubs[0].Gateways, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST)
rh := newTestRequestHandler(t, dpsCfg, defaultTestDpsHandlerConfig())
testProvisioningWithDPSHandler(t, rh, time.Minute, test.WithConfigureDevice(setShortRetry))
Expand All @@ -538,7 +540,7 @@ func TestProvisiongConnectToSecondaryCloudByObserver(t *testing.T) {
},
}}, nil)
}
hubCfg := test.MakeHubConfig(uuid.NewString(), "localhost:20999") // should be unreachable
hubCfg := test.MakeHubConfig(uuid.NewString(), "127.0.0.1:20999") // should be unreachable
dpsCfg := test.MakeConfig(t)
dpsCfg.EnrollmentGroups[0].Hubs = append([]service.HubConfig{hubCfg}, dpsCfg.EnrollmentGroups[0].Hubs...)
rhCfg := defaultTestDpsHandlerConfig()
Expand All @@ -562,7 +564,7 @@ func TestProvisiongConnectToSecondaryCloudByRetryTimeout(t *testing.T) {
},
}}, nil)
}
hubCfg := test.MakeHubConfig(uuid.NewString(), "localhost:20999") // should be unreachable
hubCfg := test.MakeHubConfig(uuid.NewString(), "127.0.0.1:20999") // should be unreachable
dpsCfg := test.MakeConfig(t)
dpsCfg.EnrollmentGroups[0].Hubs = append([]service.HubConfig{hubCfg}, dpsCfg.EnrollmentGroups[0].Hubs...)
rhCfg := defaultTestDpsHandlerConfig()
Expand Down
5 changes: 3 additions & 2 deletions device-provisioning-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (server *Service) toInternalHandler(w mux.ResponseWriter, r *mux.Message, h
ctx, span := otelcoap.Start(r.Context(), path, r.Code().String(), otelcoap.WithTracerProvider(server.tracerProvider), otelcoap.WithSpanOptions(trace.WithSpanKind(trace.SpanKindServer)))
defer span.End()
r.SetContext(ctx)
otelcoap.MessageReceivedEvent(ctx, r.Message)
otelcoap.MessageReceivedEvent(ctx, otelcoap.MakeMessage(r.Message))

ctx, cancel := context.WithTimeout(r.Context(), session.server.config.APIs.COAP.InactivityMonitor.Timeout)
defer cancel()
Expand All @@ -289,10 +289,11 @@ func (server *Service) toInternalHandler(w mux.ResponseWriter, r *mux.Message, h
}()
}
if resp != nil {
otelMsg := otelcoap.MakeMessage(resp)
if err := session.WriteMessage(resp); err != nil {
session.Errorf("cannot send error: %w", err)
}
otelcoap.MessageSentEvent(r.Context(), resp)
otelcoap.MessageSentEvent(r.Context(), otelMsg)
span.SetAttributes(otelcoap.StatusCodeAttr(resp.Code()))
}
session.logRequestResponse(ctx, startTime, r, resp, errResp)
Expand Down
25 changes: 16 additions & 9 deletions pkg/opentelemetry/otelcoap/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ var (

type MessageType attribute.KeyValue

// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m MessageType) Event(ctx context.Context, msg *pool.Message) {
span := trace.SpanFromContext(ctx)
type Message struct {
Size int
}

func MakeMessage(msg *pool.Message) Message {
tcpMsg := message.Message{
Code: msg.Code(),
Token: msg.Token(),
Expand All @@ -50,12 +51,18 @@ func (m MessageType) Event(ctx context.Context, msg *pool.Message) {
size = 0
}

if bodySize, err := msg.BodySize(); err != nil {
size += int(bodySize)
return Message{
Size: size,
}
}

// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m MessageType) Event(ctx context.Context, msg Message) {
span := trace.SpanFromContext(ctx)
span.AddEvent("message", trace.WithAttributes(
attribute.KeyValue(m),
semconv.MessageUncompressedSizeKey.Int(size),
semconv.MessageUncompressedSizeKey.Int(msg.Size),
))
}

Expand Down Expand Up @@ -84,11 +91,11 @@ func StatusCodeAttr(c codes.Code) attribute.KeyValue {
return COAPStatusCodeKey.Int64(int64(c))
}

func MessageReceivedEvent(ctx context.Context, message *pool.Message) {
func MessageReceivedEvent(ctx context.Context, message Message) {
messageReceived.Event(ctx, message)
}

func MessageSentEvent(ctx context.Context, message *pool.Message) {
func MessageSentEvent(ctx context.Context, message Message) {
messageSent.Event(ctx, message)
}

Expand Down

0 comments on commit 0d310bf

Please sign in to comment.