From c9c1cea5b861969d0b5b10afeab466156d253653 Mon Sep 17 00:00:00 2001 From: Chris Morse <120681681+christhemorse@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:23:42 -0400 Subject: [PATCH] Add database support for Kafka (#337) * Initial support for Managed Kafka changes and endpoints * Add support for Managed Kafka advanced config options * Fix database test after data type changes * Make adjustment to quota URL param --- database.go | 434 +++++++++++++++++++++++++++++++++++++++-------- database_test.go | 14 +- 2 files changed, 366 insertions(+), 82 deletions(-) diff --git a/database.go b/database.go index 8952949..8e822eb 100644 --- a/database.go +++ b/database.go @@ -35,6 +35,17 @@ type DatabaseService interface { GetDB(ctx context.Context, databaseID string, dbname string) (*DatabaseDB, *http.Response, error) DeleteDB(ctx context.Context, databaseID string, dbname string) error + ListTopics(ctx context.Context, databaseID string) ([]DatabaseTopic, *Meta, *http.Response, error) + CreateTopic(ctx context.Context, databaseID string, databaseTopicReq *DatabaseTopicCreateReq) (*DatabaseTopic, *http.Response, error) + GetTopic(ctx context.Context, databaseID string, topicName string) (*DatabaseTopic, *http.Response, error) + UpdateTopic(ctx context.Context, databaseID string, topicName string, databaseTopicReq *DatabaseTopicUpdateReq) (*DatabaseTopic, *http.Response, error) //nolint:lll + DeleteTopic(ctx context.Context, databaseID string, topicName string) error + + ListQuotas(ctx context.Context, databaseID string) ([]DatabaseQuota, *Meta, *http.Response, error) + CreateQuota(ctx context.Context, databaseID string, databaseQuotaReq *DatabaseQuotaCreateReq) (*DatabaseQuota, *http.Response, error) + GetQuota(ctx context.Context, databaseID string, clientID, username string) (*DatabaseQuota, *http.Response, error) + DeleteQuota(ctx context.Context, databaseID string, clientID, username string) error + ListMaintenanceUpdates(ctx context.Context, databaseID string) ([]string, *http.Response, error) StartMaintenance(ctx context.Context, databaseID string) (string, *http.Response, error) @@ -124,7 +135,8 @@ type Database struct { PlanDisk int `json:"plan_disk"` PlanRAM int `json:"plan_ram"` PlanVCPUs int `json:"plan_vcpus"` - PlanReplicas int `json:"plan_replicas"` + PlanReplicas *int `json:"plan_replicas,omitempty"` + PlanBrokers int `json:"plan_brokers,omitempty"` Region string `json:"region"` DatabaseEngine string `json:"database_engine"` DatabaseEngineVersion string `json:"database_engine_version"` @@ -136,9 +148,12 @@ type Database struct { FerretDBCredentials *FerretDBCredentials `json:"ferretdb_credentials,omitempty"` Host string `json:"host"` PublicHost string `json:"public_host,omitempty"` + Port string `json:"port"` + SASLPort string `json:"sasl_port,omitempty"` User string `json:"user"` Password string `json:"password"` - Port string `json:"port"` + AccessKey string `json:"access_key,omitempty"` + AccessCert string `json:"access_cert,omitempty"` MaintenanceDOW string `json:"maintenance_dow"` MaintenanceTime string `json:"maintenance_time"` LatestBackup string `json:"latest_backup"` @@ -254,6 +269,9 @@ type DatabaseUser struct { Password string `json:"password"` Encryption string `json:"encryption,omitempty"` AccessControl *DatabaseUserACL `json:"access_control,omitempty"` + Permission string `json:"permission,omitempty"` + AccessKey string `json:"access_key,omitempty"` + AccessCert string `json:"access_cert,omitempty"` } // DatabaseUserACL represents an access control configuration for a user within a Redis Managed Database cluster @@ -264,12 +282,13 @@ type DatabaseUserACL struct { RedisACLKeys []string `json:"redis_acl_keys"` } -// DatabaseUserACLReq represents input for updating a user's access control within a Redis Managed Database cluster +// DatabaseUserACLReq represents input for updating a user's access control within a Managed Database cluster type DatabaseUserACLReq struct { RedisACLCategories *[]string `json:"redis_acl_categories,omitempty"` RedisACLChannels *[]string `json:"redis_acl_channels,omitempty"` RedisACLCommands *[]string `json:"redis_acl_commands,omitempty"` RedisACLKeys *[]string `json:"redis_acl_keys,omitempty"` + Permission string `json:"permission,omitempty"` } // databaseUserBase holds the API response for retrieving a single database user within a Managed Database @@ -288,6 +307,7 @@ type DatabaseUserCreateReq struct { Username string `json:"username"` Password string `json:"password,omitempty"` Encryption string `json:"encryption,omitempty"` + Permission string `json:"permission,omitempty"` } // DatabaseUserUpdateReq struct used to update a user within a Managed Database. @@ -316,6 +336,72 @@ type DatabaseDBCreateReq struct { Name string `json:"name"` } +// DatabaseTopic represents a Kafka topic within a Managed Database cluster +type DatabaseTopic struct { + Name string `json:"name"` + Partitions int `json:"partitions"` + Replication int `json:"replication"` + RetentionHours int `json:"retention_hours"` + RetentionBytes int `json:"retention_bytes"` +} + +// databaseTopicBase holds the API response for retrieving a single Kafka topic within a Managed Database +type databaseTopicBase struct { + DatabaseTopic *DatabaseTopic `json:"topic"` +} + +// databaseTopicsBase holds the API response for retrieving a list of Kafka topics within a Managed Database +type databaseTopicsBase struct { + DatabaseTopics []DatabaseTopic `json:"topics"` + Meta *Meta `json:"meta"` +} + +// DatabaseTopicCreateReq struct used to create a Kafka topic within a Managed Database. +type DatabaseTopicCreateReq struct { + Name string `json:"name"` + Partitions int `json:"partitions"` + Replication int `json:"replication"` + RetentionHours int `json:"retention_hours"` + RetentionBytes int `json:"retention_bytes"` +} + +// DatabaseTopicUpdateReq struct used to update a Kafka topic within a Managed Database. +type DatabaseTopicUpdateReq struct { + Partitions int `json:"partitions"` + Replication int `json:"replication"` + RetentionHours int `json:"retention_hours"` + RetentionBytes int `json:"retention_bytes"` +} + +// DatabaseQuota represents a Kafka quota within a Managed Database cluster +type DatabaseQuota struct { + ClientID string `json:"client_id"` + ConsumerByteRate int `json:"consumer_byte_rate"` + ProducerByteRate int `json:"producer_byte_rate"` + RequestPercentage int `json:"request_percentage"` + User string `json:"user"` +} + +// databaseQuotaBase holds the API response for retrieving a single Kafka quota within a Managed Database +type databaseQuotaBase struct { + DatabaseQuota *DatabaseQuota `json:"quota"` +} + +// databaseQuotasBase holds the API response for retrieving a list of Kafka quotas within a Managed Database +type databaseQuotasBase struct { + DatabaseQuotas []DatabaseQuota `json:"quotas"` + Meta *Meta `json:"meta"` +} + +// DatabaseQuotaCreateReq struct used to create a Kafka quota within a Managed Database. +type DatabaseQuotaCreateReq struct { + ClientID string `json:"name"` + ConsumerByteRate int `json:"consumer_byte_rate"` + ProducerByteRate int `json:"producer_byte_rate"` + RequestPercentage int `json:"request_percentage"` + User string `json:"user"` +} + // databaseDBsBase holds the API response for retrieving a list of available maintenance updates within a Managed Database type databaseUpdatesBase struct { AvailableUpdates []string `json:"available_updates"` @@ -465,74 +551,120 @@ type DatabaseConnectionPoolUpdateReq struct { // DatabaseAdvancedOptions represents user configurable advanced options within a MySQL/PostgreSQL Managed Database cluster type DatabaseAdvancedOptions struct { - AutovacuumAnalyzeScaleFactor float32 `json:"autovacuum_analyze_scale_factor,omitempty"` - AutovacuumAnalyzeThreshold int `json:"autovacuum_analyze_threshold,omitempty"` - AutovacuumFreezeMaxAge int `json:"autovacuum_freeze_max_age,omitempty"` - AutovacuumMaxWorkers int `json:"autovacuum_max_workers,omitempty"` - AutovacuumNaptime int `json:"autovacuum_naptime,omitempty"` - AutovacuumVacuumCostDelay int `json:"autovacuum_vacuum_cost_delay,omitempty"` - AutovacuumVacuumCostLimit int `json:"autovacuum_vacuum_cost_limit,omitempty"` - AutovacuumVacuumScaleFactor float32 `json:"autovacuum_vacuum_scale_factor,omitempty"` - AutovacuumVacuumThreshold int `json:"autovacuum_vacuum_threshold,omitempty"` - BGWRITERDelay int `json:"bgwriter_delay,omitempty"` - BGWRITERFlushAFter int `json:"bgwriter_flush_after,omitempty"` - BGWRITERLRUMaxPages int `json:"bgwriter_lru_maxpages,omitempty"` - BGWRITERLRUMultiplier float32 `json:"bgwriter_lru_multiplier,omitempty"` - ConnectTimeout int `json:"connect_timeout,omitempty"` - DeadlockTimeout int `json:"deadlock_timeout,omitempty"` - DefaultToastCompression string `json:"default_toast_compression,omitempty"` - GroupConcatMaxLen int `json:"group_concat_max_len,omitempty"` - IdleInTransactionSessionTimeout int `json:"idle_in_transaction_session_timeout,omitempty"` - InnoDBChangeBufferMaxSize int `json:"innodb_change_buffer_max_size,omitempty"` - InnoDBFlushNeighbors int `json:"innodb_flush_neighbors,omitempty"` - InnoDBFTMinTokenSize int `json:"innodb_ft_min_token_size,omitempty"` - InnoDBFTServerStopwordTable string `json:"innodb_ft_server_stopword_table,omitempty"` - InnoDBLockWaitTimeout int `json:"innodb_lock_wait_timeout,omitempty"` - InnoDBLogBufferSize int `json:"innodb_log_buffer_size,omitempty"` - InnoDBOnlineAlterLogMaxSize int `json:"innodb_online_alter_log_max_size,omitempty"` - InnoDBPrintAllDeadlocks *bool `json:"innodb_print_all_deadlocks,omitempty"` - InnoDBReadIOThreads int `json:"innodb_read_io_threads,omitempty"` - InnoDBRollbackOnTimeout *bool `json:"innodb_rollback_on_timeout,omitempty"` - InnoDBThreadConcurrency int `json:"innodb_thread_concurrency,omitempty"` - InnoDBWriteIOThreads int `json:"innodb_write_io_threads,omitempty"` - InteractiveTimeout int `json:"interactive_timeout,omitempty"` - InternalTmpMemStorageEngine string `json:"internal_tmp_mem_storage_engine,omitempty"` - Jit *bool `json:"jit,omitempty"` - LogAutovacuumMinDuration int `json:"log_autovacuum_min_duration,omitempty"` - LogErrorVerbosity string `json:"log_error_verbosity,omitempty"` - LogLinePrefix string `json:"log_line_prefix,omitempty"` - LogMinDurationStatement int `json:"log_min_duration_statement,omitempty"` - MaxAllowedPacket int `json:"max_allowed_packet,omitempty"` - MaxFilesPerProcess int `json:"max_files_per_process,omitempty"` - MaxHeapTableSize int `json:"max_heap_table_size,omitempty"` - MaxLocksPerTransaction int `json:"max_locks_per_transaction,omitempty"` - MaxLogicalReplicationWorkers int `json:"max_logical_replication_workers,omitempty"` - MaxParallelWorkers int `json:"max_parallel_workers,omitempty"` - MaxParallelWorkersPerGather int `json:"max_parallel_workers_per_gather,omitempty"` - MaxPredLocksPerTransaction int `json:"max_pred_locks_per_transaction,omitempty"` - MaxPreparedTransactions int `json:"max_prepared_transactions,omitempty"` - MaxReplicationSlots int `json:"max_replication_slots,omitempty"` - MaxStackDepth int `json:"max_stack_depth,omitempty"` - MaxStandbyArchiveDelay int `json:"max_standby_archive_delay,omitempty"` - MaxStandbyStreamingDelay int `json:"max_standby_streaming_delay,omitempty"` - MaxWalSenders int `json:"max_wal_senders,omitempty"` - MaxWorkerProcesses int `json:"max_worker_processes,omitempty"` - NetBufferLength int `json:"net_buffer_length,omitempty"` - NetReadTimeout int `json:"net_read_timeout,omitempty"` - NetWriteTimeout int `json:"net_write_timeout,omitempty"` - PGPartmanBGWInterval int `json:"pg_partman_bgw.interval,omitempty"` - PGPartmanBGWRole string `json:"pg_partman_bgw.role,omitempty"` - PGStateStatementsTrack string `json:"pg_stat_statements.track,omitempty"` - SortBufferSize int `json:"sort_buffer_size,omitempty"` - TempFileLimit int `json:"temp_file_limit,omitempty"` - TmpTableSize int `json:"tmp_table_size,omitempty"` - TrackActivityQuerySize int `json:"track_activity_query_size,omitempty"` - TrackCommitTimestamp string `json:"track_commit_timestamp,omitempty"` - TrackFunctions string `json:"track_functions,omitempty"` - TrackIOTiming string `json:"track_io_timing,omitempty"` - WaitTimeout int `json:"wait_timeout,omitempty"` - WALSenderTImeout int `json:"wal_sender_timeout,omitempty"` - WALWriterDelay int `json:"wal_writer_delay,omitempty"` + AutovacuumAnalyzeScaleFactor float32 `json:"autovacuum_analyze_scale_factor,omitempty"` + AutovacuumAnalyzeThreshold int `json:"autovacuum_analyze_threshold,omitempty"` + AutovacuumFreezeMaxAge int `json:"autovacuum_freeze_max_age,omitempty"` + AutovacuumMaxWorkers int `json:"autovacuum_max_workers,omitempty"` + AutovacuumNaptime int `json:"autovacuum_naptime,omitempty"` + AutovacuumVacuumCostDelay int `json:"autovacuum_vacuum_cost_delay,omitempty"` + AutovacuumVacuumCostLimit int `json:"autovacuum_vacuum_cost_limit,omitempty"` + AutovacuumVacuumScaleFactor float32 `json:"autovacuum_vacuum_scale_factor,omitempty"` + AutovacuumVacuumThreshold int `json:"autovacuum_vacuum_threshold,omitempty"` + BGWRITERDelay int `json:"bgwriter_delay,omitempty"` + BGWRITERFlushAFter int `json:"bgwriter_flush_after,omitempty"` + BGWRITERLRUMaxPages int `json:"bgwriter_lru_maxpages,omitempty"` + BGWRITERLRUMultiplier float32 `json:"bgwriter_lru_multiplier,omitempty"` + DeadlockTimeout int `json:"deadlock_timeout,omitempty"` + DefaultToastCompression string `json:"default_toast_compression,omitempty"` + IdleInTransactionSessionTimeout int `json:"idle_in_transaction_session_timeout,omitempty"` + Jit *bool `json:"jit,omitempty"` + LogAutovacuumMinDuration int `json:"log_autovacuum_min_duration,omitempty"` + LogErrorVerbosity string `json:"log_error_verbosity,omitempty"` + LogLinePrefix string `json:"log_line_prefix,omitempty"` + LogMinDurationStatement int `json:"log_min_duration_statement,omitempty"` + MaxFilesPerProcess int `json:"max_files_per_process,omitempty"` + MaxLocksPerTransaction int `json:"max_locks_per_transaction,omitempty"` + MaxLogicalReplicationWorkers int `json:"max_logical_replication_workers,omitempty"` + MaxParallelWorkers int `json:"max_parallel_workers,omitempty"` + MaxParallelWorkersPerGather int `json:"max_parallel_workers_per_gather,omitempty"` + MaxPredLocksPerTransaction int `json:"max_pred_locks_per_transaction,omitempty"` + MaxPreparedTransactions int `json:"max_prepared_transactions,omitempty"` + MaxReplicationSlots int `json:"max_replication_slots,omitempty"` + MaxStackDepth int `json:"max_stack_depth,omitempty"` + MaxStandbyArchiveDelay int `json:"max_standby_archive_delay,omitempty"` + MaxStandbyStreamingDelay int `json:"max_standby_streaming_delay,omitempty"` + MaxWalSenders int `json:"max_wal_senders,omitempty"` + MaxWorkerProcesses int `json:"max_worker_processes,omitempty"` + PGPartmanBGWInterval int `json:"pg_partman_bgw.interval,omitempty"` + PGPartmanBGWRole string `json:"pg_partman_bgw.role,omitempty"` + PGStateStatementsTrack string `json:"pg_stat_statements.track,omitempty"` + TempFileLimit int `json:"temp_file_limit,omitempty"` + TrackActivityQuerySize int `json:"track_activity_query_size,omitempty"` + TrackCommitTimestamp string `json:"track_commit_timestamp,omitempty"` + TrackFunctions string `json:"track_functions,omitempty"` + TrackIOTiming string `json:"track_io_timing,omitempty"` + WALSenderTImeout int `json:"wal_sender_timeout,omitempty"` + WALWriterDelay int `json:"wal_writer_delay,omitempty"` + ConnectTimeout int `json:"connect_timeout,omitempty"` + GroupConcatMaxLen int `json:"group_concat_max_len,omitempty"` + InnoDBChangeBufferMaxSize int `json:"innodb_change_buffer_max_size,omitempty"` + InnoDBFlushNeighbors int `json:"innodb_flush_neighbors,omitempty"` + InnoDBFTMinTokenSize int `json:"innodb_ft_min_token_size,omitempty"` + InnoDBFTServerStopwordTable string `json:"innodb_ft_server_stopword_table,omitempty"` + InnoDBLockWaitTimeout int `json:"innodb_lock_wait_timeout,omitempty"` + InnoDBLogBufferSize int `json:"innodb_log_buffer_size,omitempty"` + InnoDBOnlineAlterLogMaxSize int `json:"innodb_online_alter_log_max_size,omitempty"` + InnoDBPrintAllDeadlocks *bool `json:"innodb_print_all_deadlocks,omitempty"` + InnoDBReadIOThreads int `json:"innodb_read_io_threads,omitempty"` + InnoDBRollbackOnTimeout *bool `json:"innodb_rollback_on_timeout,omitempty"` + InnoDBThreadConcurrency int `json:"innodb_thread_concurrency,omitempty"` + InnoDBWriteIOThreads int `json:"innodb_write_io_threads,omitempty"` + InteractiveTimeout int `json:"interactive_timeout,omitempty"` + InternalTmpMemStorageEngine string `json:"internal_tmp_mem_storage_engine,omitempty"` + MaxAllowedPacket int `json:"max_allowed_packet,omitempty"` + MaxHeapTableSize int `json:"max_heap_table_size,omitempty"` + NetBufferLength int `json:"net_buffer_length,omitempty"` + NetReadTimeout int `json:"net_read_timeout,omitempty"` + NetWriteTimeout int `json:"net_write_timeout,omitempty"` + SortBufferSize int `json:"sort_buffer_size,omitempty"` + TmpTableSize int `json:"tmp_table_size,omitempty"` + WaitTimeout int `json:"wait_timeout,omitempty"` + CompressionType string `json:"compression_type,omitempty"` + GroupInitialRebalanceDelayMS int `json:"group_initial_rebalance_delay_ms,omitempty"` + GroupMinSessinTimeoutMS int `json:"group_min_session_timeout_ms,omitempty"` + GroupMaxSessionTimeoutMS int `json:"group_max_session_timeout_ms,omitempty"` + ConnectionsMaxIdleMS int `json:"connections_max_idle_ms,omitempty"` + MaxIncrementalFetchSessionCacheSlots int `json:"max_incremental_fetch_session_cache_slots,omitempty"` + MessageMaxBytes int `json:"message_max_bytes,omitempty"` + OffsetsRetentionMinutes int `json:"offsets_retention_minutes,omitempty"` + LogCleanerDeleteRetentionMS int `json:"log_cleaner_delete_retention_ms,omitempty"` + LogCleanerMinCleanableRatio float32 `json:"log_cleaner_min_cleanable_ratio,omitempty"` + LogCleanerMaxCompactionLagMS int `json:"log_cleaner_max_compaction_lag_ms,omitempty"` + LogCleanerMinCompactionLagMS int `json:"log_cleaner_min_compaction_lag_ms,omitempty"` + LogCleanupPolicy string `json:"log_cleanup_policy,omitempty"` + LogFlushIntervalMessages int `json:"log_flush_interval_messages,omitempty"` + LogFlushIntervalMS int `json:"log_flush_interval_ms,omitempty"` + LogIndexIntervalBytes int `json:"log_index_interval_bytes,omitempty"` + LogindexSizeMaxBytes int `json:"log_index_size_max_bytes,omitempty"` + LogLocalRetentionMS int `json:"log_local_retention_ms,omitempty"` + LogLocalRetentionBytes int `json:"log_local_retention_bytes,omitempty"` + LogMessageDownconversionEnable *bool `json:"log_message_downconversion_enable,omitempty"` + LogMessageTimestampType string `json:"log_message_timestamp_type,omitempty"` + LogMessageTimestampDifferenceMaxMS int `json:"log_message_timestamp_difference_max_ms,omitempty"` + LogPreallocate *bool `json:"log_preallocate,omitempty"` + LogRetentionBytes int `json:"log_retention_bytes,omitempty"` + LogRetentionHours int `json:"log_retention_hours,omitempty"` + LogRetentionMS int `json:"log_retention_ms,omitempty"` + LogRollJitterMS int `json:"log_roll_jitter_ms,omitempty"` + LogRollMS int `json:"log_roll_ms,omitempty"` + LogSegmentBytes int `json:"log_segment_bytes,omitempty"` + LogSegmentDeleteDelayMS int `json:"log_segment_delete_delay_ms,omitempty"` + AutoCreateTopicsEnable *bool `json:"auto_create_topics_enable,omitempty"` + MinInsyncReplicas int `json:"min_insync_replicas,omitempty"` + NumPartitions int `json:"num_partitions,omitempty"` + DefaultReplicationRefactor int `json:"default_replication_factor,omitempty"` + ReplicaFetchMaxBytes int `json:"replica_fetch_max_bytes,omitempty"` + ReplicaFetchResponseMaxBytes int `json:"replica_fetch_response_max_bytes,omitempty"` + MaxConnectionsPerIP int `json:"max_connections_per_ip,omitempty"` + ProducerPurgatoryPurgeIntervalRequests int `json:"producer_purgatory_purge_interval_requests,omitempty"` + SASLOauthbearerExpectedAudience string `json:"sasl_oauthbearer_expected_audience,omitempty"` + SASLOauthbearerExpectedIssuer string `json:"sasl_oauthbearer_expected_issuer,omitempty"` + SASLOauthbearerJwksEndpointURL string `json:"sasl_oauthbearer_jwks_endpoint_url,omitempty"` + SASLOauthbearerSubClaimName string `json:"sasl_oauthbearer_sub_claim_name,omitempty"` + SocketRequestMaxBytes int `json:"socket_request_max_bytes,omitempty"` + TransactionStateLogSegmentBytes int `json:"transaction_state_log_segment_bytes,omitempty"` + TransactionRemoveExpiredTransactionCleanupIntervalMS int `json:"transaction_remove_expired_transaction_cleanup_interval_ms,omitempty"` + TransactionPartitionVerificationEnable *bool `json:"transaction_partition_verification_enable,omitempty"` } // AvailableOption represents an available advanced configuration option for a PostgreSQL Managed Database cluster @@ -540,8 +672,8 @@ type AvailableOption struct { Name string `json:"name"` Type string `json:"type"` Enumerals []string `json:"enumerals,omitempty"` - MinValue *int `json:"min_value,omitempty"` - MaxValue *int `json:"max_value,omitempty"` + MinValue *float32 `json:"min_value,omitempty"` + MaxValue *float32 `json:"max_value,omitempty"` AltValues []int `json:"alt_values,omitempty"` Units string `json:"units,omitempty"` } @@ -765,7 +897,7 @@ func (d *DatabaseServiceHandler) UpdateUser(ctx context.Context, databaseID, use return databaseUser.DatabaseUser, resp, nil } -// DeleteUser will delete a user within the Managed database. All data will be permanently lost. +// DeleteUser will delete a user within the Managed database func (d *DatabaseServiceHandler) DeleteUser(ctx context.Context, databaseID, username string) error { uri := fmt.Sprintf("%s/%s/users/%s", databasePath, databaseID, username) @@ -863,6 +995,158 @@ func (d *DatabaseServiceHandler) DeleteDB(ctx context.Context, databaseID, dbnam return err } +// ListTopics retrieves all Kafka topics on your Managed Database. +func (d *DatabaseServiceHandler) ListTopics(ctx context.Context, databaseID string) ([]DatabaseTopic, *Meta, *http.Response, error) { //nolint:dupl,lll + uri := fmt.Sprintf("%s/%s/topics", databasePath, databaseID) + + req, err := d.client.NewRequest(ctx, http.MethodGet, uri, nil) + if err != nil { + return nil, nil, nil, err + } + + databaseTopics := new(databaseTopicsBase) + resp, err := d.client.DoWithContext(ctx, req, databaseTopics) + if err != nil { + return nil, nil, nil, err + } + + return databaseTopics.DatabaseTopics, databaseTopics.Meta, resp, nil +} + +// CreateTopic will create a Kafka topic within the Managed Database with the given parameters +func (d *DatabaseServiceHandler) CreateTopic(ctx context.Context, databaseID string, databaseTopicReq *DatabaseTopicCreateReq) (*DatabaseTopic, *http.Response, error) { //nolint:lll + uri := fmt.Sprintf("%s/%s/topics", databasePath, databaseID) + + req, err := d.client.NewRequest(ctx, http.MethodPost, uri, databaseTopicReq) + if err != nil { + return nil, nil, err + } + + databaseTopic := new(databaseTopicBase) + resp, err := d.client.DoWithContext(ctx, req, databaseTopic) + if err != nil { + return nil, nil, err + } + + return databaseTopic.DatabaseTopic, resp, nil +} + +// GetTopic retrieves information on an individual Kafka topic within a Managed Database based on a topicName and databaseID +func (d *DatabaseServiceHandler) GetTopic(ctx context.Context, databaseID, topicName string) (*DatabaseTopic, *http.Response, error) { + uri := fmt.Sprintf("%s/%s/topics/%s", databasePath, databaseID, topicName) + + req, err := d.client.NewRequest(ctx, http.MethodGet, uri, nil) + if err != nil { + return nil, nil, err + } + + databaseTopic := new(databaseTopicBase) + resp, err := d.client.DoWithContext(ctx, req, databaseTopic) + if err != nil { + return nil, nil, err + } + + return databaseTopic.DatabaseTopic, resp, nil +} + +// UpdateTopic will update a Kafka topic within the Managed Database with the given parameters +func (d *DatabaseServiceHandler) UpdateTopic(ctx context.Context, databaseID, topicName string, databaseTopicReq *DatabaseTopicUpdateReq) (*DatabaseTopic, *http.Response, error) { //nolint:lll,dupl + uri := fmt.Sprintf("%s/%s/topics/%s", databasePath, databaseID, topicName) + + req, err := d.client.NewRequest(ctx, http.MethodPut, uri, databaseTopicReq) + if err != nil { + return nil, nil, err + } + + databaseTopic := new(databaseTopicBase) + resp, err := d.client.DoWithContext(ctx, req, databaseTopic) + if err != nil { + return nil, nil, err + } + + return databaseTopic.DatabaseTopic, resp, nil +} + +// DeleteTopic will delete a Kafka topic within the Managed database +func (d *DatabaseServiceHandler) DeleteTopic(ctx context.Context, databaseID, topicName string) error { + uri := fmt.Sprintf("%s/%s/topics/%s", databasePath, databaseID, topicName) + + req, err := d.client.NewRequest(ctx, http.MethodDelete, uri, nil) + if err != nil { + return err + } + + _, err = d.client.DoWithContext(ctx, req, nil) + return err +} + +// ListQuotas retrieves all Kafka quotas on your Managed Database. +func (d *DatabaseServiceHandler) ListQuotas(ctx context.Context, databaseID string) ([]DatabaseQuota, *Meta, *http.Response, error) { //nolint:dupl,lll + uri := fmt.Sprintf("%s/%s/quotas", databasePath, databaseID) + + req, err := d.client.NewRequest(ctx, http.MethodGet, uri, nil) + if err != nil { + return nil, nil, nil, err + } + + databaseQuotas := new(databaseQuotasBase) + resp, err := d.client.DoWithContext(ctx, req, databaseQuotas) + if err != nil { + return nil, nil, nil, err + } + + return databaseQuotas.DatabaseQuotas, databaseQuotas.Meta, resp, nil +} + +// CreateQuota will create a Kafka quota within the Managed Database with the given parameters +func (d *DatabaseServiceHandler) CreateQuota(ctx context.Context, databaseID string, databaseQuotaReq *DatabaseQuotaCreateReq) (*DatabaseQuota, *http.Response, error) { //nolint:lll + uri := fmt.Sprintf("%s/%s/quotas", databasePath, databaseID) + + req, err := d.client.NewRequest(ctx, http.MethodPost, uri, databaseQuotaReq) + if err != nil { + return nil, nil, err + } + + databaseQuota := new(databaseQuotaBase) + resp, err := d.client.DoWithContext(ctx, req, databaseQuota) + if err != nil { + return nil, nil, err + } + + return databaseQuota.DatabaseQuota, resp, nil +} + +// GetQuota retrieves information on an individual Kafka quota within a Managed Database based on a clientID and databaseID +func (d *DatabaseServiceHandler) GetQuota(ctx context.Context, databaseID, clientID, username string) (*DatabaseQuota, *http.Response, error) { //nolint:lll + uri := fmt.Sprintf("%s/%s/quotas/%s/%s", databasePath, databaseID, clientID, username) + + req, err := d.client.NewRequest(ctx, http.MethodGet, uri, nil) + if err != nil { + return nil, nil, err + } + + databaseQuota := new(databaseQuotaBase) + resp, err := d.client.DoWithContext(ctx, req, databaseQuota) + if err != nil { + return nil, nil, err + } + + return databaseQuota.DatabaseQuota, resp, nil +} + +// DeleteQuota will delete a Kafka quota within the Managed database +func (d *DatabaseServiceHandler) DeleteQuota(ctx context.Context, databaseID, clientID, username string) error { + uri := fmt.Sprintf("%s/%s/quotas/%s/%s", databasePath, databaseID, clientID, username) + + req, err := d.client.NewRequest(ctx, http.MethodDelete, uri, nil) + if err != nil { + return err + } + + _, err = d.client.DoWithContext(ctx, req, nil) + return err +} + // ListMaintenanceUpdates retrieves all available maintenance updates for your Managed Database. func (d *DatabaseServiceHandler) ListMaintenanceUpdates(ctx context.Context, databaseID string) ([]string, *http.Response, error) { uri := fmt.Sprintf("%s/%s/maintenance", databasePath, databaseID) @@ -1124,7 +1408,7 @@ func (d *DatabaseServiceHandler) UpdateConnectionPool(ctx context.Context, datab return databaseConnectionPool.ConnectionPool, resp, nil } -// DeleteConnectionPool will delete a user within the Managed database. All data will be permanently lost. +// DeleteConnectionPool will delete a connection pool within the Managed database func (d *DatabaseServiceHandler) DeleteConnectionPool(ctx context.Context, databaseID, poolName string) error { uri := fmt.Sprintf("%s/%s/connection-pools/%s", databasePath, databaseID, poolName) diff --git a/database_test.go b/database_test.go index 020b7f5..832284d 100644 --- a/database_test.go +++ b/database_test.go @@ -120,7 +120,7 @@ func TestDatabaseServiceHandler_List(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 0, + PlanReplicas: IntToIntPtr(0), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8", @@ -153,7 +153,7 @@ func TestDatabaseServiceHandler_List(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 1, + PlanReplicas: IntToIntPtr(1), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8", @@ -268,7 +268,7 @@ func TestDatabaseServiceHandler_Create(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 1, + PlanReplicas: IntToIntPtr(1), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8", @@ -405,7 +405,7 @@ func TestDatabaseServiceHandler_Get(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 0, + PlanReplicas: IntToIntPtr(0), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8", @@ -437,7 +437,7 @@ func TestDatabaseServiceHandler_Get(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 1, + PlanReplicas: IntToIntPtr(1), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8", @@ -584,7 +584,7 @@ func TestDatabaseServiceHandler_Update(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 0, + PlanReplicas: IntToIntPtr(0), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8", @@ -617,7 +617,7 @@ func TestDatabaseServiceHandler_Update(t *testing.T) { PlanDisk: 80, PlanRAM: 4096, PlanVCPUs: 2, - PlanReplicas: 1, + PlanReplicas: IntToIntPtr(1), Region: "EWR", DatabaseEngine: "mysql", DatabaseEngineVersion: "8",