Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attach DLS station #145

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ try
IdempotencyWindowMs = 0,
SendPoisonMessageToDls = true,
SendSchemaFailedMessageToDls = true,
PartitionsNumber = 3 // defaults to 1
PartitionsNumber = 3, // defaults to 1
DlsStation = "<dls-station>" // If DlsStation is set, then DLS events will be sent to selected station as well. The default value is "" (no DLS station).
});
}
catch (Exception ex)
Expand Down
17 changes: 17 additions & 0 deletions src/Memphis.Client.IntegrationTests/Station/MemphisStationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ public async Task GivenStationOptions_WhenCreateStation_ThenStationIsCreated(str
Assert.NotNull(result);
}

[Theory]
[InlineData("station_with_dls_a", "dls_station_a")]
[InlineData("station_with_dls_b", "dls_station_b")]
public async Task GivenStationOptionsWithDls_WhenCreateStation_ThenStationWithDlsIsCreated(string stationName, string dlsStationName)
{
using var client = await MemphisClientFactory.CreateClient(_fixture.MemphisClientOptions);
var stationOptions = _fixture.DefaultStationOptions;
stationOptions.Name = stationName;
stationOptions.DlsStation = dlsStationName;

var result = await client.CreateStation(stationOptions);

await result.DestroyAsync();
Assert.NotNull(result);
}


[Theory]
[InlineData("station_tst_station_name_x")]
[InlineData("station_tst_station_name_y")]
Expand Down
3 changes: 2 additions & 1 deletion src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
{
string stationName = producerOptions.StationName;
string producerName = producerOptions.ProducerName.ToLower();
bool generateRandomSuffix = producerOptions.GenerateUniqueSuffix;

Check warning on line 132 in src/Memphis.Client/MemphisClient.cs

View workflow job for this annotation

GitHub Actions / build

'MemphisProducerOptions.GenerateUniqueSuffix' is obsolete: 'GenerateUniqueSuffix will be stopped to be supported after November 1'st, 2023.'
string internalStationName = MemphisUtil.GetInternalName(stationName);

if (_brokerConnection.IsClosed())
Expand Down Expand Up @@ -288,7 +288,7 @@
{
StationName = producer.StationName,
ProducerName = producer.ProducerName,
GenerateUniqueSuffix = false,

Check warning on line 291 in src/Memphis.Client/MemphisClient.cs

View workflow job for this annotation

GitHub Actions / build

'MemphisProducerOptions.GenerateUniqueSuffix' is obsolete: 'GenerateUniqueSuffix will be stopped to be supported after November 1'st, 2023.'
MaxAckTimeMs = ackWaitMs
};

Expand Down Expand Up @@ -348,7 +348,7 @@

consumerOptions.RealName = consumerOptions.ConsumerName.ToLower();

if (consumerOptions.GenerateUniqueSuffix)

Check warning on line 351 in src/Memphis.Client/MemphisClient.cs

View workflow job for this annotation

GitHub Actions / build

'MemphisConsumerOptions.GenerateUniqueSuffix' is obsolete: 'GenerateUniqueSuffix will be stopped to be supported after November 1'st, 2023.'
{
consumerOptions.ConsumerName = $"{consumerOptions.ConsumerName}_{MemphisUtil.GetUniqueKey(8)}";
}
Expand Down Expand Up @@ -452,7 +452,8 @@
},
UserName = _userName,
TieredStorageEnabled = stationOptions.TieredStorageEnabled,
PartitionsNumber = stationOptions.PartitionsNumber
PartitionsNumber = stationOptions.PartitionsNumber,
DlsStation = stationOptions.DlsStation
};

var createStationModelJson = JsonSerDes.PrepareJsonString<CreateStationRequest>(createStationModel);
Expand Down Expand Up @@ -810,7 +811,7 @@
BatchMaxTimeToWaitMs = fetchMessageOptions.BatchMaxTimeToWaitMs,
MaxAckTimeMs = fetchMessageOptions.MaxAckTimeMs,
MaxMsgDeliveries = fetchMessageOptions.MaxMsgDeliveries,
GenerateUniqueSuffix = fetchMessageOptions.GenerateUniqueSuffix,

Check warning on line 814 in src/Memphis.Client/MemphisClient.cs

View workflow job for this annotation

GitHub Actions / build

'MemphisConsumerOptions.GenerateUniqueSuffix' is obsolete: 'GenerateUniqueSuffix will be stopped to be supported after November 1'st, 2023.'

Check warning on line 814 in src/Memphis.Client/MemphisClient.cs

View workflow job for this annotation

GitHub Actions / build

'FetchMessageOptions.GenerateUniqueSuffix' is obsolete: 'GenerateUniqueSuffix will be stopped to be supported after November 1'st, 2023.'
StartConsumeFromSequence = fetchMessageOptions.StartConsumeFromSequence,
LastMessages = fetchMessageOptions.LastMessages,
});
Expand Down
96 changes: 49 additions & 47 deletions src/Memphis.Client/Models/Request/CreateStationRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,54 @@

#nullable disable

namespace Memphis.Client.Models.Request
namespace Memphis.Client.Models.Request;

[DataContract]
internal sealed class CreateStationRequest
{
[DataMember(Name = "name")]
public string StationName { get; set; }

[DataMember(Name = "retention_type")]
public string RetentionType { get; set; }

[DataMember(Name = "retention_value")]
public int RetentionValue { get; set; }

[DataMember(Name = "storage_type")]
public string StorageType { get; set; }

[DataMember(Name = "replicas")]
public int Replicas { get; set; }

[DataMember(Name = "idempotency_windows_in_ms")]
public int IdempotencyWindowsInMs { get; set; }

[DataMember(Name = "schema_name")]
public string SchemaName { get; set; }

[DataMember(Name = "dls_configuration")]
public DlsConfiguration DlsConfiguration { get; set; }

[DataMember(Name = "username")]
public string UserName { get; set; }

[DataMember(Name = "tiered_storage_enabled")]
public bool TieredStorageEnabled { get; set; }

[DataMember(Name = "partitions_number")]
public int PartitionsNumber { get; set; }

[DataMember(Name = "dls_station")]
public string DlsStation { get; set; }
}

[DataContract]
internal sealed class DlsConfiguration
{
[DataContract]
internal sealed class CreateStationRequest
{
[DataMember(Name = "name")]
public string StationName { get; set; }

[DataMember(Name = "retention_type")]
public string RetentionType { get; set; }

[DataMember(Name = "retention_value")]
public int RetentionValue { get; set; }

[DataMember(Name = "storage_type")]
public string StorageType { get; set; }

[DataMember(Name = "replicas")]
public int Replicas { get; set; }

[DataMember(Name = "idempotency_windows_in_ms")]
public int IdempotencyWindowsInMs { get; set; }

[DataMember(Name = "schema_name")]
public string SchemaName { get; set; }

[DataMember(Name = "dls_configuration")]
public DlsConfiguration DlsConfiguration { get; set; }

[DataMember(Name = "username")]
public string UserName { get; set; }

[DataMember(Name = "tiered_storage_enabled")]
public bool TieredStorageEnabled { get; set; }

[DataMember(Name = "partitions_number")]
public int PartitionsNumber { get; set; }
}

[DataContract]
internal sealed class DlsConfiguration
{
[DataMember(Name = "poison")]
public bool Poison { get; set; }

[DataMember(Name = "SchemaVerse")]
public bool SchemaVerse { get; set; }
}
[DataMember(Name = "poison")]
public bool Poison { get; set; }

[DataMember(Name = "SchemaVerse")]
public bool SchemaVerse { get; set; }
}
85 changes: 43 additions & 42 deletions src/Memphis.Client/Station/StationOptions.cs
Original file line number Diff line number Diff line change
@@ -1,48 +1,49 @@
#nullable disable

namespace Memphis.Client.Station
namespace Memphis.Client.Station;

public sealed class StationOptions
{
public sealed class StationOptions
{
public string Name { get; set; }
/// <summary>
/// The retention type to use for the station. Valid values are:
/// <list type="bullet">
/// <item><description>message_age_sec</description></item>
/// <item><description>messages</description></item>
/// <item><description>bytes</description></item>
/// <item><description>ack_based</description></item>
/// </list>
/// <description>
/// Use the <see cref="RetentionTypes"/> class for the valid values.
/// </description>
/// </summary>
public string RetentionType { get; set; } = RetentionTypes.MAX_MESSAGE_AGE_SECONDS;
public int RetentionValue { get; set; } = 604_800;
public string StorageType { get; set; } = StorageTypes.DISK;
public int Replicas { get; set; } = 1;
public int IdempotenceWindowMs { get; set; } = 120_000;
public string SchemaName { get; set; } = string.Empty;
public bool SendPoisonMessageToDls { get; set; } = false;
public bool SendSchemaFailedMessageToDls { get; set; } = true;
public bool TieredStorageEnabled { get; set; } = false;
public int PartitionsNumber { get; set; } = 1;
}
public string Name { get; set; }
/// <summary>
/// The retention type to use for the station. Valid values are:
/// <list type="bullet">
/// <item><description>message_age_sec</description></item>
/// <item><description>messages</description></item>
/// <item><description>bytes</description></item>
/// <item><description>ack_based</description></item>
/// </list>
/// <description>
/// Use the <see cref="RetentionTypes"/> class for the valid values.
/// </description>
/// </summary>
public string RetentionType { get; set; } = RetentionTypes.MAX_MESSAGE_AGE_SECONDS;
public int RetentionValue { get; set; } = 604_800;
public string StorageType { get; set; } = StorageTypes.DISK;
public int Replicas { get; set; } = 1;
public int IdempotenceWindowMs { get; set; } = 120_000;
public string SchemaName { get; set; } = string.Empty;
public bool SendPoisonMessageToDls { get; set; } = false;
public bool SendSchemaFailedMessageToDls { get; set; } = true;
public bool TieredStorageEnabled { get; set; } = false;
public int PartitionsNumber { get; set; } = 1;

public string DlsStation { get; set; } = string.Empty;
}

public class RetentionTypes
{
public const string MAX_MESSAGE_AGE_SECONDS = "message_age_sec";
public const string MESSAGES = "messages";
public const string BYTES = "bytes";
/// <summary>
/// Retention based on message acks. This is for cloud users only.
/// </summary>
public const string ACK_BASED = "ack_based";
}
public class RetentionTypes
{
public const string MAX_MESSAGE_AGE_SECONDS = "message_age_sec";
public const string MESSAGES = "messages";
public const string BYTES = "bytes";
/// <summary>
/// Retention based on message acks. This is for cloud users only.
/// </summary>
public const string ACK_BASED = "ack_based";
}

public class StorageTypes
{
public const string DISK = "file";
public const string MEMORY = "memory";
}
public class StorageTypes
{
public const string DISK = "file";
public const string MEMORY = "memory";
}
Loading