Skip to content

Commit

Permalink
Attach DLS station (superstreamlabs#145)
Browse files Browse the repository at this point in the history
* Added DLS station

* Added DLS station test

* Updated README
  • Loading branch information
tbazen authored Sep 20, 2023
1 parent d385ca7 commit 0040b2c
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 91 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ try
IdempotencyWindowMs = 0,
SendPoisonMessageToDls = true,
SendSchemaFailedMessageToDls = true,
PartitionsNumber = 3 // defaults to 1
PartitionsNumber = 3, // defaults to 1
DlsStation = "<dls-station>" // If DlsStation is set, then DLS events will be sent to selected station as well. The default value is "" (no DLS station).
});
}
catch (Exception ex)
Expand Down
17 changes: 17 additions & 0 deletions src/Memphis.Client.IntegrationTests/Station/MemphisStationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ public async Task GivenStationOptions_WhenCreateStation_ThenStationIsCreated(str
Assert.NotNull(result);
}

[Theory]
[InlineData("station_with_dls_a", "dls_station_a")]
[InlineData("station_with_dls_b", "dls_station_b")]
public async Task GivenStationOptionsWithDls_WhenCreateStation_ThenStationWithDlsIsCreated(string stationName, string dlsStationName)
{
using var client = await MemphisClientFactory.CreateClient(_fixture.MemphisClientOptions);
var stationOptions = _fixture.DefaultStationOptions;
stationOptions.Name = stationName;
stationOptions.DlsStation = dlsStationName;

var result = await client.CreateStation(stationOptions);

await result.DestroyAsync();
Assert.NotNull(result);
}


[Theory]
[InlineData("station_tst_station_name_x")]
[InlineData("station_tst_station_name_y")]
Expand Down
3 changes: 2 additions & 1 deletion src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ public async Task<MemphisStation> CreateStation(StationOptions stationOptions, C
},
UserName = _userName,
TieredStorageEnabled = stationOptions.TieredStorageEnabled,
PartitionsNumber = stationOptions.PartitionsNumber
PartitionsNumber = stationOptions.PartitionsNumber,
DlsStation = stationOptions.DlsStation
};

var createStationModelJson = JsonSerDes.PrepareJsonString<CreateStationRequest>(createStationModel);
Expand Down
96 changes: 49 additions & 47 deletions src/Memphis.Client/Models/Request/CreateStationRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,54 @@

#nullable disable

namespace Memphis.Client.Models.Request
namespace Memphis.Client.Models.Request;

[DataContract]
internal sealed class CreateStationRequest
{
[DataMember(Name = "name")]
public string StationName { get; set; }

[DataMember(Name = "retention_type")]
public string RetentionType { get; set; }

[DataMember(Name = "retention_value")]
public int RetentionValue { get; set; }

[DataMember(Name = "storage_type")]
public string StorageType { get; set; }

[DataMember(Name = "replicas")]
public int Replicas { get; set; }

[DataMember(Name = "idempotency_windows_in_ms")]
public int IdempotencyWindowsInMs { get; set; }

[DataMember(Name = "schema_name")]
public string SchemaName { get; set; }

[DataMember(Name = "dls_configuration")]
public DlsConfiguration DlsConfiguration { get; set; }

[DataMember(Name = "username")]
public string UserName { get; set; }

[DataMember(Name = "tiered_storage_enabled")]
public bool TieredStorageEnabled { get; set; }

[DataMember(Name = "partitions_number")]
public int PartitionsNumber { get; set; }

[DataMember(Name = "dls_station")]
public string DlsStation { get; set; }
}

[DataContract]
internal sealed class DlsConfiguration
{
[DataContract]
internal sealed class CreateStationRequest
{
[DataMember(Name = "name")]
public string StationName { get; set; }

[DataMember(Name = "retention_type")]
public string RetentionType { get; set; }

[DataMember(Name = "retention_value")]
public int RetentionValue { get; set; }

[DataMember(Name = "storage_type")]
public string StorageType { get; set; }

[DataMember(Name = "replicas")]
public int Replicas { get; set; }

[DataMember(Name = "idempotency_windows_in_ms")]
public int IdempotencyWindowsInMs { get; set; }

[DataMember(Name = "schema_name")]
public string SchemaName { get; set; }

[DataMember(Name = "dls_configuration")]
public DlsConfiguration DlsConfiguration { get; set; }

[DataMember(Name = "username")]
public string UserName { get; set; }

[DataMember(Name = "tiered_storage_enabled")]
public bool TieredStorageEnabled { get; set; }

[DataMember(Name = "partitions_number")]
public int PartitionsNumber { get; set; }
}

[DataContract]
internal sealed class DlsConfiguration
{
[DataMember(Name = "poison")]
public bool Poison { get; set; }

[DataMember(Name = "SchemaVerse")]
public bool SchemaVerse { get; set; }
}
[DataMember(Name = "poison")]
public bool Poison { get; set; }

[DataMember(Name = "SchemaVerse")]
public bool SchemaVerse { get; set; }
}
85 changes: 43 additions & 42 deletions src/Memphis.Client/Station/StationOptions.cs
Original file line number Diff line number Diff line change
@@ -1,48 +1,49 @@
#nullable disable

namespace Memphis.Client.Station
namespace Memphis.Client.Station;

public sealed class StationOptions
{
public sealed class StationOptions
{
public string Name { get; set; }
/// <summary>
/// The retention type to use for the station. Valid values are:
/// <list type="bullet">
/// <item><description>message_age_sec</description></item>
/// <item><description>messages</description></item>
/// <item><description>bytes</description></item>
/// <item><description>ack_based</description></item>
/// </list>
/// <description>
/// Use the <see cref="RetentionTypes"/> class for the valid values.
/// </description>
/// </summary>
public string RetentionType { get; set; } = RetentionTypes.MAX_MESSAGE_AGE_SECONDS;
public int RetentionValue { get; set; } = 604_800;
public string StorageType { get; set; } = StorageTypes.DISK;
public int Replicas { get; set; } = 1;
public int IdempotenceWindowMs { get; set; } = 120_000;
public string SchemaName { get; set; } = string.Empty;
public bool SendPoisonMessageToDls { get; set; } = false;
public bool SendSchemaFailedMessageToDls { get; set; } = true;
public bool TieredStorageEnabled { get; set; } = false;
public int PartitionsNumber { get; set; } = 1;
}
public string Name { get; set; }
/// <summary>
/// The retention type to use for the station. Valid values are:
/// <list type="bullet">
/// <item><description>message_age_sec</description></item>
/// <item><description>messages</description></item>
/// <item><description>bytes</description></item>
/// <item><description>ack_based</description></item>
/// </list>
/// <description>
/// Use the <see cref="RetentionTypes"/> class for the valid values.
/// </description>
/// </summary>
public string RetentionType { get; set; } = RetentionTypes.MAX_MESSAGE_AGE_SECONDS;
public int RetentionValue { get; set; } = 604_800;
public string StorageType { get; set; } = StorageTypes.DISK;
public int Replicas { get; set; } = 1;
public int IdempotenceWindowMs { get; set; } = 120_000;
public string SchemaName { get; set; } = string.Empty;
public bool SendPoisonMessageToDls { get; set; } = false;
public bool SendSchemaFailedMessageToDls { get; set; } = true;
public bool TieredStorageEnabled { get; set; } = false;
public int PartitionsNumber { get; set; } = 1;

public string DlsStation { get; set; } = string.Empty;
}

public class RetentionTypes
{
public const string MAX_MESSAGE_AGE_SECONDS = "message_age_sec";
public const string MESSAGES = "messages";
public const string BYTES = "bytes";
/// <summary>
/// Retention based on message acks. This is for cloud users only.
/// </summary>
public const string ACK_BASED = "ack_based";
}
public class RetentionTypes
{
public const string MAX_MESSAGE_AGE_SECONDS = "message_age_sec";
public const string MESSAGES = "messages";
public const string BYTES = "bytes";
/// <summary>
/// Retention based on message acks. This is for cloud users only.
/// </summary>
public const string ACK_BASED = "ack_based";
}

public class StorageTypes
{
public const string DISK = "file";
public const string MEMORY = "memory";
}
public class StorageTypes
{
public const string DISK = "file";
public const string MEMORY = "memory";
}

0 comments on commit 0040b2c

Please sign in to comment.