Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
2 parents 59ecadd + 056c33a commit 315fe7b
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 28 deletions.
21 changes: 21 additions & 0 deletions examples/DisposableTokens/DisposableTokens.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<None Remove="Momento.Sdk" />
<None Remove="Microsoft.SourceLink.GitHub" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Momento.Sdk" Version="1.20.0" />
</ItemGroup>
</Project>
42 changes: 42 additions & 0 deletions examples/DisposableTokens/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using Momento.Sdk;
using Momento.Sdk.Auth;
using Momento.Sdk.Auth.AccessControl;
using Momento.Sdk.Config;
using Momento.Sdk.Responses;

ICredentialProvider authProvider = new EnvMomentoTokenProvider("MOMENTO_AUTH_TOKEN");

IAuthClient client = new AuthClient(AuthConfigurations.Default.Latest(), authProvider);
var scope = new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.CacheItemPermission(
CacheRole.ReadWrite,
CacheSelector.ByName("cache"),
CacheItemSelector.AllCacheItems
),
new DisposableToken.CachePermission(
CacheRole.ReadOnly,
CacheSelector.ByName("topsecret")
),
new DisposableToken.TopicPermission(
TopicRole.PublishSubscribe,
CacheSelector.ByName("cache"),
TopicSelector.ByName("example-topic")
)
});
var tokenResponse = await client.GenerateDisposableTokenAsync(
scope,
ExpiresIn.Minutes(5)
);

if (tokenResponse is GenerateDisposableTokenResponse.Success token)
{
Console.WriteLine("The generated disposable token is: " + token.AuthToken);
Console.WriteLine("The token endpoint is: " + token.Endpoint);
Console.WriteLine("The token expires at (epoch timestamp): " + token.ExpiresAt.Epoch());
}
else if (tokenResponse is GenerateDisposableTokenResponse.Error err)
{
Console.WriteLine("Error generating disposable token: " + err.Message);
}
15 changes: 15 additions & 0 deletions examples/DisposableTokens/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<img src="https://docs.momentohq.com/img/logo.svg" alt="logo" width="400"/>

# Disposable Tokens Example

This example program demonstrates how to generate disposable Momento auth tokens.

# Usage

The program assumes that your Momento auth token is available in the `MOMENTO_AUTH_TOKEN` environment variable:

```bash
MOMENTO_AUTH_TOKEN=<YOUR_AUTH_TOKEN> dotnet run
```

The example generates a disposable expiring auth token using the enumerated permissions and expiry defined in the program and prints its attributes to the console.
34 changes: 34 additions & 0 deletions src/Momento.Sdk/Internal/LoggingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,40 @@ public static void LogTraceTopicMessageReceived(this ILogger logger, string mess
logger.LogTrace("Received '{}' message on: cacheName: {}; topicName: {}", messageType, cacheName, topicName);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a discontinuity was received.
/// </summary>
/// <param name="logger"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="lastSequenceNumber"></param>
/// <param name="newSequenceNumber"></param>
public static void LogTraceTopicDiscontinuityReceived(this ILogger logger, string cacheName, string topicName, ulong lastSequenceNumber, ulong newSequenceNumber)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("Received discontinuity: cacheName: {}; topicName: {}, lastSequenceNumber: {}, newSequenceNumber: {}", cacheName, topicName, lastSequenceNumber, newSequenceNumber);
}
}

/// <summary>
/// Logs a message at TRACE level that indicates that a topic subscription received an error.
/// </summary>
/// <typeparam name="TError"></typeparam>
/// <param name="logger"></param>
/// <param name="cacheName"></param>
/// <param name="topicName"></param>
/// <param name="error"></param>
/// <returns></returns>
public static TError LogTraceTopicSubscriptionError<TError>(this ILogger logger, string cacheName, string topicName, TError error)
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("An error was received by a subscription: cacheName: {}; topicName: {}; error: {}", cacheName, topicName, error);
}
return error;
}
#endif

/// <summary>
Expand Down
110 changes: 82 additions & 28 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
request.ResumeAtTopicSequenceNumber = resumeAtTopicSequenceNumber.Value;
}

AsyncServerStreamingCall<_SubscriptionItem> subscription;
SubscriptionWrapper subscriptionWrapper;
try
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscription = grpcManager.Client.subscribe(request, new CallOptions());
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
}
catch (Exception e)
{
Expand All @@ -132,72 +132,126 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
}

var response = new TopicSubscribeResponse.Subscription(
cancellationToken => GetNextRelevantMessageFromGrpcStreamAsync(subscription, cancellationToken, cacheName, topicName),
subscription.Dispose);
cancellationToken => subscriptionWrapper.GetNextRelevantMessageFromGrpcStreamAsync(cancellationToken),
subscriptionWrapper.Dispose);
return _logger.LogTraceTopicRequestSuccess(RequestTypeTopicSubscribe, cacheName, topicName,
response);
}

private async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(AsyncServerStreamingCall<_SubscriptionItem> subscription,
CancellationToken cancellationToken, string cacheName, string topicName)
private class SubscriptionWrapper : IDisposable
{
if (cancellationToken.IsCancellationRequested)
private readonly TopicGrpcManager _grpcManager;
private readonly string _cacheName;
private readonly string _topicName;
private readonly CacheExceptionMapper _exceptionMapper;
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
private ulong? _lastSequenceNumber;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
{
return null;
_grpcManager = grpcManager;
_cacheName = cacheName;
_topicName = topicName;
_exceptionMapper = exceptionMapper;
_logger = logger;

_subscription = Subscribe();
}

try
private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
{
var request = new _SubscriptionRequest
{
CacheName = _cacheName,
Topic = _topicName
};
if (_lastSequenceNumber != null)
{
request.ResumeAtTopicSequenceNumber = _lastSequenceNumber.Value;
}

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
return _grpcManager.Client.subscribe(request, new CallOptions());
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
CancellationToken cancellationToken)
{
while (await subscription.ResponseStream.MoveNext(cancellationToken))
while (!cancellationToken.IsCancellationRequested)
{
var message = subscription.ResponseStream.Current;
try
{
await _subscription.ResponseStream.MoveNext(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception e)
{
var sdkException = _exceptionMapper.Convert(e);
if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
{
break;
}
//TODO: Are there other errors that we should break for?

_logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);

await Task.Delay(5_000, cancellationToken);
Dispose();
_subscription = Subscribe();
continue;
}

var message = _subscription.ResponseStream.Current;

switch (message.KindCase)
{
case _SubscriptionItem.KindOneofCase.Item:
_lastSequenceNumber = message.Item.TopicSequenceNumber;
switch (message.Item.Value.KindCase)
{
case _TopicValue.KindOneofCase.Text:
_logger.LogTraceTopicMessageReceived("text", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("text", _cacheName, _topicName);
return new TopicMessage.Text(message.Item.Value);
case _TopicValue.KindOneofCase.Binary:
_logger.LogTraceTopicMessageReceived("binary", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("binary", _cacheName, _topicName);
return new TopicMessage.Binary(message.Item.Value);
case _TopicValue.KindOneofCase.None:
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}

break;
case _SubscriptionItem.KindOneofCase.Discontinuity:
_logger.LogTraceTopicMessageReceived("discontinuity", cacheName, topicName);
_logger.LogTraceTopicDiscontinuityReceived(_cacheName, _topicName,
message.Discontinuity.LastTopicSequence, message.Discontinuity.NewTopicSequence);
_lastSequenceNumber = message.Discontinuity.NewTopicSequence;
break;
case _SubscriptionItem.KindOneofCase.Heartbeat:
_logger.LogTraceTopicMessageReceived("heartbeat", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("heartbeat", _cacheName, _topicName);
break;
case _SubscriptionItem.KindOneofCase.None:
_logger.LogTraceTopicMessageReceived("none", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("none", _cacheName, _topicName);
break;
default:
_logger.LogTraceTopicMessageReceived("unknown", cacheName, topicName);
_logger.LogTraceTopicMessageReceived("unknown", _cacheName, _topicName);
break;
}
}
}
catch (OperationCanceledException)
{

return null;
}
catch (Exception e)

public void Dispose()
{
var sdkException = _exceptionMapper.Convert(e);
return sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR
? null
: new TopicMessage.Error(sdkException);
_subscription.Dispose();
}

return null;
}
}
#endif

0 comments on commit 315fe7b

Please sign in to comment.