Skip to content

Commit

Permalink
[v11] Add Cassandra/Scylla database support (#17207)
Browse files Browse the repository at this point in the history
  • Loading branch information
smallinsky authored Oct 10, 2022
1 parent 7e300d0 commit 85a1cdb
Show file tree
Hide file tree
Showing 43 changed files with 5,978 additions and 831 deletions.
152 changes: 152 additions & 0 deletions api/proto/teleport/legacy/types/events/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,11 @@ message OneOf {
events.KubernetesClusterDelete KubernetesClusterDelete = 98;
events.SSMRun SSMRun = 99;
events.ElasticsearchRequest ElasticsearchRequest = 100;
// Cassandra audit events.
events.CassandraBatch CassandraBatch = 101;
events.CassandraPrepare CassandraPrepare = 102;
events.CassandraRegister CassandraRegister = 103;
events.CassandraExecute CassandraExecute = 104;
}
}

Expand Down Expand Up @@ -3855,3 +3860,150 @@ message SSMRun {
// Region is the AWS region the command was ran in.
string Region = 7 [(gogoproto.jsontag) = "region"];
}

// CassandraSession is emitted when a Cassandra client sends
// the prepare a CQL statement.
message CassandraPrepare {
// Metadata is a common event metadata.
Metadata Metadata = 1 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// User is a common user event metadata.
UserMetadata User = 2 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// SessionMetadata is a common event session metadata.
SessionMetadata Session = 3 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Database contains database related metadata.
DatabaseMetadata Database = 4 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Query is the CQL statement.
string Query = 5 [(gogoproto.jsontag) = "query,omitempty"];
// Keyspace is the keyspace the statement is in.
string Keyspace = 6 [(gogoproto.jsontag) = "keyspace,omitempty"];
}

// CassandraExecute is emitted when a Cassandra client executes a CQL statement.
message CassandraExecute {
// Metadata is a common event metadata.
Metadata Metadata = 1 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// User is a common user event metadata.
UserMetadata User = 2 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// SessionMetadata is a common event session metadata.
SessionMetadata Session = 3 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Database contains database related metadata.
DatabaseMetadata Database = 4 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// QueryId is the prepared query id to execute.
string QueryId = 5 [(gogoproto.jsontag) = "query_id,omitempty"];
}

// CassandraBatch is emitted when a Cassandra client executes a batch of CQL statements.
message CassandraBatch {
// Metadata is a common event metadata.
Metadata Metadata = 1 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// User is a common user event metadata.
UserMetadata User = 2 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// SessionMetadata is a common event session metadata.
SessionMetadata Session = 3 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Database contains database related metadata.
DatabaseMetadata Database = 4 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];

// Consistency is the consistency level to use.
string Consistency = 5 [(gogoproto.jsontag) = "consistency,omitempty"];
// Keyspace is the keyspace the statement is in.
string Keyspace = 6 [(gogoproto.jsontag) = "keyspace,omitempty"];
// BatchType is the type of batch.
string BatchType = 7 [(gogoproto.jsontag) = "batch_type,omitempty"];
// BatchChild represents a single child batch statement.
message BatchChild {
// Value is a single value to bind to the query.
message Value {
// Type is the type of the value.
uint32 Type = 1 [(gogoproto.jsontag) = "type,omitempty"];
// Contents is the value contents.
bytes Contents = 2 [(gogoproto.jsontag) = "contents,omitempty"];
}
// ID is the id of the statement.
string ID = 1 [(gogoproto.jsontag) = "id,omitempty"];
// Query the CQL statement to execute.
string Query = 2 [(gogoproto.jsontag) = "query,omitempty"];
// Values is the values to bind to the query.
repeated Value Values = 3 [(gogoproto.jsontag) = "values,omitempty"];
}
// Children is batch children statements.
repeated BatchChild Children = 8 [(gogoproto.jsontag) = "children,omitempty"];
}

// CassandraRegister is emitted when a Cassandra client
// request to register for the specified event types.
message CassandraRegister {
// Metadata is a common event metadata.
Metadata Metadata = 1 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// User is a common user event metadata.
UserMetadata User = 2 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// SessionMetadata is a common event session metadata.
SessionMetadata Session = 3 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// Database contains database related metadata.
DatabaseMetadata Database = 4 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
// EventTypes is the list of event types to register for.
repeated string EventTypes = 5 [(gogoproto.jsontag) = "event_types,omitempty"];
}
43 changes: 37 additions & 6 deletions api/types/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"text/template"
"time"

"github.com/gravitational/teleport/api/utils"
awsutils "github.com/gravitational/teleport/api/utils/aws"
azureutils "github.com/gravitational/teleport/api/utils/azure"

"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/utils"
awsutils "github.com/gravitational/teleport/api/utils/aws"
azureutils "github.com/gravitational/teleport/api/utils/azure"
)

// Database represents a database proxied by a database server.
Expand Down Expand Up @@ -371,9 +371,14 @@ func (d *DatabaseV3) IsMemoryDB() bool {
return d.GetType() == DatabaseTypeMemoryDB
}

// IsAWSKeyspaces returns true if this is an AWS hosted Cassandra database.
func (d *DatabaseV3) IsAWSKeyspaces() bool {
return d.GetType() == DatabaseTypeAWSKeyspaces
}

// IsAWSHosted returns true if database is hosted by AWS.
func (d *DatabaseV3) IsAWSHosted() bool {
return d.IsRDS() || d.IsRedshift() || d.IsElastiCache() || d.IsMemoryDB()
return d.IsRDS() || d.IsRedshift() || d.IsElastiCache() || d.IsMemoryDB() || d.IsAWSKeyspaces()
}

// IsCloudHosted returns true if database is hosted in the cloud (AWS, Azure or
Expand All @@ -384,6 +389,9 @@ func (d *DatabaseV3) IsCloudHosted() bool {

// GetType returns the database type.
func (d *DatabaseV3) GetType() string {
if d.GetAWS().AccountID != "" && d.Spec.Protocol == DatabaseTypeCassandra {
return DatabaseTypeAWSKeyspaces
}
if d.GetAWS().Redshift.ClusterID != "" {
return DatabaseTypeRedshift
}
Expand Down Expand Up @@ -453,11 +461,19 @@ func (d *DatabaseV3) CheckAndSetDefaults() error {
return trace.BadParameter("database %q protocol is empty", d.GetName())
}
if d.Spec.URI == "" {
return trace.BadParameter("database %q URI is empty", d.GetName())
switch {
case d.IsAWSKeyspaces() && d.GetAWS().Region != "":
// In case of AWS Hosted Cassandra allow to omit URI.
// The URL will be constructed from the database resource based on the region and account ID.
d.Spec.URI = awsutils.CassandraEndpointURLForRegion(d.Spec.AWS.Region)
default:
return trace.BadParameter("database %q URI is empty", d.GetName())
}
}
if d.Spec.MySQL.ServerVersion != "" && d.Spec.Protocol != "mysql" {
return trace.BadParameter("MySQL ServerVersion can be only set for MySQL database")
}

// In case of RDS, Aurora or Redshift, AWS information such as region or
// cluster ID can be extracted from the endpoint if not provided.
switch {
Expand Down Expand Up @@ -521,6 +537,17 @@ func (d *DatabaseV3) CheckAndSetDefaults() error {
if d.Spec.Azure.Name == "" {
d.Spec.Azure.Name = name
}
case strings.Contains(d.Spec.URI, awsutils.AWSEndpointSuffix) || strings.Contains(d.Spec.URI, awsutils.AWSCNEndpointSuffix):
if d.Spec.AWS.AccountID == "" {
return trace.BadParameter("database %q AWS account ID is empty", d.GetName())
}
if d.Spec.AWS.Region == "" {
region, err := awsutils.CassandraEndpointRegion(d.Spec.URI)
if err != nil {
return trace.Wrap(err)
}
d.Spec.AWS.Region = region
}
case azureutils.IsCacheForRedisEndpoint(d.Spec.URI):
// ResourceID is required for fetching Redis tokens.
if d.Spec.Azure.ResourceID == "" {
Expand Down Expand Up @@ -673,6 +700,10 @@ const (
DatabaseTypeElastiCache = "elasticache"
// DatabaseTypeMemoryDB is AWS-hosted MemoryDB database.
DatabaseTypeMemoryDB = "memorydb"
// DatabaseTypeAWSKeyspaces is AWS-hosted Keyspaces database (Cassandra).
DatabaseTypeAWSKeyspaces = "keyspace"
// DatabaseTypeCassandra is AWS-hosted Keyspace database.
DatabaseTypeCassandra = "cassandra"
)

// GetServerName returns the GCP database project and instance as "<project-id>:<instance-id>".
Expand Down
46 changes: 46 additions & 0 deletions api/types/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,49 @@ func TestMySQLServerVersion(t *testing.T) {
database.SetMySQLServerVersion("8.0.1")
require.Equal(t, "8.0.1", database.GetMySQLServerVersion())
}

func TestCassandraAWSEndpoint(t *testing.T) {
t.Parallel()

t.Run("aws cassandra url from region", func(t *testing.T) {
database, err := NewDatabaseV3(Metadata{
Name: "test",
}, DatabaseSpecV3{
Protocol: "cassandra",
AWS: AWS{
Region: "us-west-1",
AccountID: "12345",
},
})
require.NoError(t, err)
require.Equal(t, "cassandra.us-west-1.amazonaws.com:9142", database.GetURI())
})

t.Run("aws cassandra custom uri", func(t *testing.T) {
database, err := NewDatabaseV3(Metadata{
Name: "test",
}, DatabaseSpecV3{
Protocol: "cassandra",
URI: "cassandra.us-west-1.amazonaws.com:9142",
AWS: AWS{
AccountID: "12345",
},
})
require.NoError(t, err)
require.Equal(t, "cassandra.us-west-1.amazonaws.com:9142", database.GetURI())
require.Equal(t, "us-west-1", database.GetAWS().Region)
})

t.Run("aws cassandra missing AccountID", func(t *testing.T) {
_, err := NewDatabaseV3(Metadata{
Name: "test",
}, DatabaseSpecV3{
Protocol: "cassandra",
URI: "cassandra.us-west-1.amazonaws.com:9142",
AWS: AWS{
AccountID: "",
},
})
require.Error(t, err)
})
}
Loading

0 comments on commit 85a1cdb

Please sign in to comment.