diff --git a/client/client.go b/client/client.go index 30d43998..25bb1e6f 100644 --- a/client/client.go +++ b/client/client.go @@ -80,4 +80,13 @@ type OpAMPClient interface { // May be called anytime after Start(), including from OnMessage handler. // nil values are not allowed and will return an error. SetPackageStatuses(statuses *protobufs.PackageStatuses) error + + // RequestConnectionSettings sets a ConnectionSettingsRequest. The ConnectionSettingsRequest + // will be included in the next AgentToServer message sent to the Server. + // Used for client-initiated connection setting acquisition flows. + // It is the responsibility of the caller to ensure that the Server supports + // AcceptsConnectionSettingsRequest capability. + // May be called before or after Start(). + // May be also called from OnMessage handler. + RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error } diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index b66e21d8..97927fc8 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -682,7 +682,7 @@ func TestAgentIdentification(t *testing.T) { }) } -func TestConnectionSettings(t *testing.T) { +func TestServerOfferConnectionSettings(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { hash := []byte{1, 2, 3} opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"} @@ -765,6 +765,64 @@ func TestConnectionSettings(t *testing.T) { }) } +func TestClientRequestConnectionSettings(t *testing.T) { + testClients( + t, func(t *testing.T, client OpAMPClient) { + opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"} + + var srvReceivedRequest int64 + // Start a Server. + srv := internal.StartMockServer(t) + srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + if msg != nil && msg.ConnectionSettingsRequest != nil { + atomic.AddInt64(&srvReceivedRequest, 1) + return &protobufs.ServerToAgent{ + ConnectionSettings: &protobufs.ConnectionSettingsOffers{ + Opamp: opampSettings, + }, + } + } + return nil + } + + var clientGotOpampSettings int64 + + // Start a client. + settings := types.StartSettings{ + Callbacks: types.CallbacksStruct{ + OnOpampConnectionSettingsFunc: func( + ctx context.Context, settings *protobufs.OpAMPConnectionSettings, + ) error { + assert.True(t, proto.Equal(opampSettings, settings)) + atomic.AddInt64(&clientGotOpampSettings, 1) + return nil + }, + }, + Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings, + } + settings.OpAMPServerURL = "ws://" + srv.Endpoint + prepareClient(t, &settings, client) + + assert.NoError(t, client.Start(context.Background(), settings)) + + client.RequestConnectionSettings(&protobufs.ConnectionSettingsRequest{}) + + // Wait until server receives the request. + eventually(t, func() bool { return atomic.LoadInt64(&srvReceivedRequest) == 1 }) + + // Wait until client receives the server's response. + eventually(t, func() bool { return atomic.LoadInt64(&clientGotOpampSettings) == 1 }) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err := client.Stop(context.Background()) + assert.NoError(t, err) + }, + ) +} + func TestReportAgentDescription(t *testing.T) { testClients(t, func(t *testing.T, client OpAMPClient) { diff --git a/client/httpclient.go b/client/httpclient.go index 993eff8d..6356c517 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -81,6 +81,10 @@ func (c *httpClient) SetAgentDescription(descr *protobufs.AgentDescription) erro return c.common.SetAgentDescription(descr) } +func (c *httpClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error { + return c.common.RequestConnectionSettings(request) +} + // SetHealth implements OpAMPClient.SetHealth. func (c *httpClient) SetHealth(health *protobufs.ComponentHealth) error { return c.common.SetHealth(health) diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 535dd1ac..ac040b60 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -243,6 +243,16 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er return nil } +func (c *ClientCommon) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error { + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.ConnectionSettingsRequest = request + }, + ) + c.sender.ScheduleSend() + return nil +} + // SetHealth sends a status update to the Server with the new agent health // and remembers the health in the client state so that it can be sent // to the Server when the Server asks for it. diff --git a/client/wsclient.go b/client/wsclient.go index 6dd2dbb0..b40f153a 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -100,6 +100,10 @@ func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error return c.common.SetAgentDescription(descr) } +func (c *wsClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error { + return c.common.RequestConnectionSettings(request) +} + func (c *wsClient) SetHealth(health *protobufs.ComponentHealth) error { return c.common.SetHealth(health) } diff --git a/internal/examples/agent/agent/agent.go b/internal/examples/agent/agent/agent.go index 5e0a496c..3bef4e9d 100644 --- a/internal/examples/agent/agent/agent.go +++ b/internal/examples/agent/agent/agent.go @@ -1,9 +1,13 @@ package agent import ( + "bytes" "context" + cryptorand "crypto/rand" + "crypto/rsa" "crypto/tls" "crypto/x509" + "crypto/x509/pkix" "encoding/pem" "fmt" "math/rand" @@ -63,6 +67,9 @@ type Agent struct { // The TLS certificate used for the OpAMP connection. Can be nil, meaning no client-side // certificate is used. opampClientCert *tls.Certificate + + certRequested bool + clientPrivateKeyPEM []byte } func NewAgent(logger types.Logger, agentType string, agentVersion string) *Agent { @@ -133,6 +140,14 @@ func (agent *Agent) connect() error { return err } + // This sets the request to create a client certificate before the OpAMP client + // is started, before the connection is established. However, this assumes the + // server supports "AcceptsConnectionRequest" capability. + // Alternatively the agent can perform this request after receiving the first + // message from the server (in onMessage), i.e. after the server capabilities + // become known and can be checked. + agent.requestClientCertificate() + agent.logger.Debugf("Starting OpAMP client...") err = agent.opampClient.Start(context.Background(), settings) @@ -341,17 +356,96 @@ func (agent *Agent) Shutdown() { } } +// requestClientCertificate sets a request to be sent to the Server to create +// a client certificate that the Agent can use in subsequent OpAMP connections. +// This is the initiating step of the Client Signing Request (CSR) flow. +func (agent *Agent) requestClientCertificate() { + if agent.certRequested { + // Request only once, for bootstrapping. + // TODO: the Agent may also for example check that the current certificate + // is approaching expiration date and re-requests a new certificate. + return + } + + // Generate a keypair for new client cert. + clientCertKeyPair, err := rsa.GenerateKey(cryptorand.Reader, 4096) + if err != nil { + agent.logger.Errorf("Cannot generate keypair: %v", err) + return + } + + // Encode the private key of the keypair as DER. + privateKeyDER := x509.MarshalPKCS1PrivateKey(clientCertKeyPair) + + // Convert private key from DER to PEM. + privateKeyPEM := new(bytes.Buffer) + pem.Encode( + privateKeyPEM, &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: privateKeyDER, + }, + ) + // Keep it. We will need it in later steps of the flow. + agent.clientPrivateKeyPEM = privateKeyPEM.Bytes() + + // Create the CSR. + template := x509.CertificateRequest{ + Subject: pkix.Name{ + CommonName: "OpAMP Example Client", + Organization: []string{"OpenTelemetry OpAMP Workgroup"}, + Locality: []string{"Agent-initiated"}, + // Where do we put instance_uid? + }, + SignatureAlgorithm: x509.SHA256WithRSA, + } + + derBytes, err := x509.CreateCertificateRequest(cryptorand.Reader, &template, clientCertKeyPair) + if err != nil { + agent.logger.Errorf("Failed to create certificate request: %s", err) + return + } + + // Convert CSR from DER to PEM format. + csrPEM := new(bytes.Buffer) + pem.Encode( + csrPEM, &pem.Block{ + Type: "CERTIFICATE REQUEST", + Bytes: derBytes, + }, + ) + + // Send the request to the Server (immediately if already connected + // or upon next successful connection). + err = agent.opampClient.RequestConnectionSettings( + &protobufs.ConnectionSettingsRequest{ + Opamp: &protobufs.OpAMPConnectionSettingsRequest{ + CertificateRequest: &protobufs.CertificateRequest{ + Csr: csrPEM.Bytes(), + }, + }, + }, + ) + if err != nil { + agent.logger.Errorf("Failed to send CSR to server: %s", err) + return + } + + agent.certRequested = true +} + func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false if msg.RemoteConfig != nil { var err error configChanged, err = agent.applyRemoteConfig(msg.RemoteConfig) if err != nil { - agent.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ - LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, - ErrorMessage: err.Error(), - }) + agent.opampClient.SetRemoteConfigStatus( + &protobufs.RemoteConfigStatus{ + LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, + ErrorMessage: err.Error(), + }, + ) } else { agent.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, @@ -378,6 +472,15 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { agent.logger.Errorf(err.Error()) } } + + // TODO: check that the Server has AcceptsConnectionSettingsRequest capability before + // requesting a certificate. + // This is actually a no-op since we already made the request when connecting + // (see connect()). However we keep this call here to demonstrate that requesting it + // in onMessage callback is also an option. This approach should be used if it is + // necessary to check for AcceptsConnectionSettingsRequest (if the Agent is + // not certain that the Server has this capability). + agent.requestClientCertificate() } func (agent *Agent) tryChangeOpAMPCert(cert *tls.Certificate) { @@ -419,13 +522,33 @@ func (agent *Agent) onOpampConnectionSettings(ctx context.Context, settings *pro } func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) (*tls.Certificate, error) { - // Parse the key pair to a certificate that can be used for network connections. - cert, err := tls.X509KeyPair( - certificate.PublicKey, - certificate.PrivateKey, - ) + // Parse the key pair to a TLS certificate that can be used for network connections. + + // There are 2 types of certificate creation flows in OpAMP: client-initiated CSR + // and server-initiated. In this example we demonstrate both flows. + // Real-world Agent implementations will probably choose and use only one of these flows. + + var cert tls.Certificate + var err error + if certificate.PrivateKey == nil && agent.clientPrivateKeyPEM != nil { + // Client-initiated CSR flow. This is currently initiated when connecting + // to the Server for the first time (see requestClientCertificate()). + cert, err = tls.X509KeyPair( + certificate.PublicKey, // We received the certificate from the Server. + agent.clientPrivateKeyPEM, // Private key was earlier locally generated. + ) + } else { + // Server-initiated flow. This is currently initiated by user clicking a button in + // the Server UI. + // Both certificate and private key are from the Server. + cert, err = tls.X509KeyPair( + certificate.PublicKey, + certificate.PrivateKey, + ) + } + if err != nil { - agent.logger.Errorf("Received invalid certificate offer: %s\n", err) + agent.logger.Errorf("Received invalid certificate offer: %s\n", err.Error()) return nil, err } diff --git a/internal/examples/server/certman/certman.go b/internal/examples/server/certman/certman.go new file mode 100644 index 00000000..8241b766 --- /dev/null +++ b/internal/examples/server/certman/certman.go @@ -0,0 +1,153 @@ +package certman + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "io/ioutil" + "log" + "math/big" + "net" + "path" + "sync" + "time" + + "github.com/open-telemetry/opamp-go/protobufs" +) + +var logger = log.New(log.Default().Writer(), "[CertMan] ", log.Default().Flags()|log.Lmsgprefix|log.Lmicroseconds) + +var caCert *x509.Certificate +var caPrivKey *rsa.PrivateKey +var caCertBytes []byte + +var loadCACertOnce sync.Once + +func loadCACert() { + certsDir := "../../certs" + + // Load CA certificate. + var err error + caCertBytes, err = ioutil.ReadFile(path.Join(certsDir, "certs/ca.cert.pem")) + if err != nil { + logger.Fatalf("Cannot read CA cert: %v", err) + } + + caKeyBytes, err := ioutil.ReadFile(path.Join(certsDir, "private/ca.key.pem")) + if err != nil { + logger.Fatalf("Cannot read CA key: %v", err) + } + + // Convert from DER to PEM format. + caCertPB, _ := pem.Decode(caCertBytes) + caKeyPB, _ := pem.Decode(caKeyBytes) + + caCert, err = x509.ParseCertificate(caCertPB.Bytes) + if err != nil { + logger.Fatalf("Cannot parse CA certificate: %v", err) + } + + caPrivKey, err = x509.ParsePKCS1PrivateKey(caKeyPB.Bytes) + if err != nil { + logger.Fatalf("Cannot parse CA key: %v", err) + } +} + +func createClientTLSCertTemplate() *x509.Certificate { + return &x509.Certificate{ + SerialNumber: big.NewInt(1), + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)}, + NotBefore: time.Now(), + NotAfter: time.Now().Add(time.Hour * 1000), + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + } +} + +func CreateClientTLSCertFromCSR(csr *x509.CertificateRequest) (*protobufs.TLSCertificate, error) { + loadCACertOnce.Do(loadCACert) + + template := createClientTLSCertTemplate() + + // Use the Subject from CSR. + template.Subject = csr.Subject + + // Create the client cert and sign it using CA cert. + certBytes, err := x509.CreateCertificate(rand.Reader, template, caCert, csr.PublicKey, caPrivKey) + if err != nil { + err := fmt.Errorf("cannot create certificate: %v", err) + return nil, err + } + + // Convert from DER to PEM format. + certPEM := new(bytes.Buffer) + pem.Encode( + certPEM, &pem.Block{ + Type: "CERTIFICATE", + Bytes: certBytes, + }, + ) + + // We have a client certificate with a public and private key. + certificate := &protobufs.TLSCertificate{ + PublicKey: certPEM.Bytes(), + CaPublicKey: caCertBytes, + } + + return certificate, nil +} + +func CreateClientTLSCert() (*protobufs.TLSCertificate, error) { + loadCACertOnce.Do(loadCACert) + + // Generate a keypair for new client cert. + clientCertKeyPair, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + err := fmt.Errorf("cannot generate keypair: %v", err) + return nil, err + } + + // Prepare certificate template. + template := createClientTLSCertTemplate() + template.Subject = pkix.Name{ + CommonName: "OpAMP Example Client", + Organization: []string{"OpenTelemetry OpAMP Workgroup"}, + Locality: []string{"Server-initiated"}, + } + + // Create the client cert. Sign it using CA cert. + certDER, err := x509.CreateCertificate(rand.Reader, template, caCert, &clientCertKeyPair.PublicKey, caPrivKey) + if err != nil { + err := fmt.Errorf("cannot create certificate: %v", err) + return nil, err + } + + certPEM := new(bytes.Buffer) + pem.Encode( + certPEM, &pem.Block{ + Type: "CERTIFICATE", + Bytes: certDER, + }, + ) + + privateKeyPEM := new(bytes.Buffer) + pem.Encode( + privateKeyPEM, &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(clientCertKeyPair), + }, + ) + + // We have a client certificate with a public and private key. + certificate := &protobufs.TLSCertificate{ + PublicKey: certPEM.Bytes(), + PrivateKey: privateKeyPEM.Bytes(), + CaPublicKey: caCertBytes, + } + + return certificate, nil +} diff --git a/internal/examples/server/data/agent.go b/internal/examples/server/data/agent.go index b562bd3e..a28295c1 100644 --- a/internal/examples/server/data/agent.go +++ b/internal/examples/server/data/agent.go @@ -6,12 +6,14 @@ import ( "crypto/sha256" "crypto/tls" "crypto/x509" + "encoding/pem" "fmt" "sync" "time" "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opamp-go/internal/examples/server/certman" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server/types" ) @@ -104,6 +106,10 @@ func (agent *Agent) UpdateStatus( agent.processStatusUpdate(statusMsg, response) + if statusMsg.ConnectionSettingsRequest != nil { + agent.processConnectionSettingsRequest(statusMsg.ConnectionSettingsRequest.Opamp, response) + } + statusUpdateWatchers := agent.statusUpdateWatchers agent.statusUpdateWatchers = nil @@ -423,7 +429,73 @@ func (agent *Agent) SendToAgent(msg *protobufs.ServerToAgent) { } func (agent *Agent) OfferConnectionSettings(offers *protobufs.ConnectionSettingsOffers) { - agent.SendToAgent(&protobufs.ServerToAgent{ - ConnectionSettings: offers, - }) + agent.SendToAgent( + &protobufs.ServerToAgent{ + ConnectionSettings: offers, + }, + ) +} + +func (agent *Agent) addErrorResponse(errMsg string, response *protobufs.ServerToAgent) { + logger.Println(errMsg) + if response.ErrorResponse == nil { + response.ErrorResponse = &protobufs.ServerErrorResponse{ + Type: protobufs.ServerErrorResponseType_ServerErrorResponseType_BadRequest, + ErrorMessage: errMsg, + Details: nil, + } + } else if response.ErrorResponse.Type == protobufs.ServerErrorResponseType_ServerErrorResponseType_BadRequest { + // Append this error message to the existing error message. + response.ErrorResponse.ErrorMessage += errMsg + } else { + // Can't report it since it is a different error type. + // TODO: consider adding support for reporting multiple errors of different type in the response. + } +} + +func (agent *Agent) processConnectionSettingsRequest( + request *protobufs.OpAMPConnectionSettingsRequest, response *protobufs.ServerToAgent, +) { + if request == nil || request.CertificateRequest == nil { + return + } + + csrDer, _ := pem.Decode(request.CertificateRequest.Csr) + if csrDer == nil { + agent.addErrorResponse("Failed to decode PEM certificate request", response) + return + } + + csr, err := x509.ParseCertificateRequest(csrDer.Bytes) + if err != nil { + agent.addErrorResponse("Failed to parse received certificate request: "+err.Error(), response) + return + } + + if csr.CheckSignature() != err { + agent.addErrorResponse("Certificate request signature check failed: "+err.Error(), response) + return + } + + // Verify the CSR's details and decide if we want to honor the request. + // For example verify the CommonName. + if csr.Subject.CommonName != "OpAMP Example Client" { + agent.addErrorResponse("Invalid CommonName in certificate request", response) + return + } + + // Create a new certificate for the agent. + certificate, err := certman.CreateClientTLSCertFromCSR(csr) + if err != nil { + agent.addErrorResponse("Failed to create client certificate from CSR: "+err.Error(), response) + return + } + + // Create an offer for the agent. + if response.ConnectionSettings == nil { + response.ConnectionSettings = &protobufs.ConnectionSettingsOffers{} + } + response.ConnectionSettings.Opamp = &protobufs.OpAMPConnectionSettings{ + Certificate: certificate, + } } diff --git a/internal/examples/server/uisrv/ui.go b/internal/examples/server/uisrv/ui.go index 3e18f481..4e11b7f5 100644 --- a/internal/examples/server/uisrv/ui.go +++ b/internal/examples/server/uisrv/ui.go @@ -121,7 +121,7 @@ func rotateInstanceClientCert(w http.ResponseWriter, r *http.Request) { } // Create a new certificate for the agent. - certificate, err := internal.CreateTLSCert("../../certs/certs/ca.cert.pem", "../../certs/certs/ca.key.pem") + certificate, err := internal.CreateTLSCert("../../certs/certs/ca.cert.pem", "../../certs/private/ca.key.pem") if err != nil { w.WriteHeader(http.StatusInternalServerError) logger.Println(err)