Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
2 parents 89f0e4b + 2814d9f commit e704c40
Show file tree
Hide file tree
Showing 10 changed files with 1,451 additions and 46 deletions.
82 changes: 82 additions & 0 deletions src/Momento.Sdk/Auth/AccessControl/DisposableTokenScopes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,86 @@ public static DisposableTokenScope TopicPublishOnly(CacheSelector cacheSelector,
)
});
}


public static DisposableTokenScope TopicNamePrefixPublishSubscribe(string cacheName, string topicNamePrefix)
{
return TopicNamePrefixPublishSubscribe(CacheSelector.ByName(cacheName), TopicSelector.ByTopicNamePrefix(topicNamePrefix));
}

public static DisposableTokenScope TopicNamePrefixPublishSubscribe(CacheSelector cacheSelector, string topicNamePrefix)
{
return TopicNamePrefixPublishSubscribe(cacheSelector, TopicSelector.ByTopicNamePrefix(topicNamePrefix));
}

public static DisposableTokenScope TopicNamePrefixPublishSubscribe(string cacheName, TopicSelector topicSelector)
{
return TopicNamePrefixPublishSubscribe(CacheSelector.ByName(cacheName), topicSelector);
}

public static DisposableTokenScope TopicNamePrefixPublishSubscribe(CacheSelector cacheSelector, TopicSelector topicSelector)
{
return new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.TopicPermission(
TopicRole.PublishSubscribe,
cacheSelector,
topicSelector
)
});
}

public static DisposableTokenScope TopicNamePrefixSubscribeOnly(string cacheName, string topicNamePrefix)
{
return TopicNamePrefixSubscribeOnly(CacheSelector.ByName(cacheName), TopicSelector.ByTopicNamePrefix(topicNamePrefix));
}

public static DisposableTokenScope TopicNamePrefixSubscribeOnly(CacheSelector cacheSelector, string topicNamePrefix)
{
return TopicNamePrefixSubscribeOnly(cacheSelector, TopicSelector.ByTopicNamePrefix(topicNamePrefix));
}

public static DisposableTokenScope TopicNamePrefixSubscribeOnly(string cacheName, TopicSelector topicSelector)
{
return TopicNamePrefixSubscribeOnly(CacheSelector.ByName(cacheName), topicSelector);
}

public static DisposableTokenScope TopicNamePrefixSubscribeOnly(CacheSelector cacheSelector, TopicSelector topicSelector)
{
return new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.TopicPermission(
TopicRole.SubscribeOnly,
cacheSelector,
topicSelector
)
});
}

public static DisposableTokenScope TopicNamePrefixPublishOnly(string cacheName, string topicNamePrefix)
{
return TopicNamePrefixPublishOnly(CacheSelector.ByName(cacheName), TopicSelector.ByTopicNamePrefix(topicNamePrefix));
}

public static DisposableTokenScope TopicNamePrefixPublishOnly(CacheSelector cacheSelector, string topicNamePrefix)
{
return TopicNamePrefixPublishOnly(cacheSelector, TopicSelector.ByTopicNamePrefix(topicNamePrefix));
}

public static DisposableTokenScope TopicNamePrefixPublishOnly(string cacheName, TopicSelector topicSelector)
{
return TopicNamePrefixPublishOnly(CacheSelector.ByName(cacheName), topicSelector);
}

public static DisposableTokenScope TopicNamePrefixPublishOnly(CacheSelector cacheSelector, TopicSelector topicSelector)
{
return new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.TopicPermission(
TopicRole.PublishOnly,
cacheSelector,
topicSelector
)
});
}
}
15 changes: 15 additions & 0 deletions src/Momento.Sdk/Auth/AccessControl/Permissions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,19 @@ public static SelectByTopicName ByName(string topicName)
{
return new SelectByTopicName(topicName);
}

// This selector is currently only used to generate disposable tokens. There are plans to
// support topic name prefix in `generateApiToken` as well, but it is not yet implemented.
// Because we are only generating disposable tokens in the SDK now, this is fine. However,
// if topic name prefix is still unsupported server side when we add the code to generate
// API keys, we will need to add a second selector specific to API key generation that does
// not include `SelectByTopicNamePrefix` and refer to that selector when setting up API key
// permissions like the ones in `DisposableToken.cs`. That will ensure that usage of the
// correct type can be verified at compile time.
public record SelectByTopicNamePrefix(string TopicNamePrefix) : TopicSelector;

public static SelectByTopicNamePrefix ByTopicNamePrefix(string topicNamePrefix)
{
return new SelectByTopicNamePrefix(topicNamePrefix);
}
}
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Exceptions/InternalServerException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Momento.Sdk.Exceptions;
public class InternalServerException : SdkException
{
/// <include file="../docs.xml" path='docs/class[@name="SdkException"]/constructor/*' />
public InternalServerException(string message, MomentoErrorTransportDetails transportDetails, Exception? e = null) : base(MomentoErrorCode.INTERNAL_SERVER_ERROR, message, transportDetails, e)
public InternalServerException(string message, MomentoErrorTransportDetails? transportDetails = null, Exception? e = null) : base(MomentoErrorCode.INTERNAL_SERVER_ERROR, message, transportDetails, e)
{
this.MessageWrapper = "An unexpected error occurred while trying to fulfill the request; please contact us at [email protected]";
}
Expand Down
33 changes: 27 additions & 6 deletions src/Momento.Sdk/Internal/ScsTokenClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ScsTokenClient(IAuthConfiguration config, string authToken, string endpoi
this._exceptionMapper = new CacheExceptionMapper(config.LoggerFactory);
}

protected DateTime CalculateDeadline()
private DateTime CalculateDeadline()
{
return DateTime.UtcNow.Add(authClientOperationTimeout);
}
Expand All @@ -39,14 +39,28 @@ protected DateTime CalculateDeadline()
public async Task<GenerateDisposableTokenResponse> GenerateDisposableToken(
DisposableTokenScope scope, ExpiresIn expiresIn
) {
_GenerateDisposableTokenRequest request = new _GenerateDisposableTokenRequest
Permissions permissions;
try
{
permissions = PermissionsFromDisposableTokenScope(scope);
}
catch (ArgumentNullException e)
{
Expires = new _GenerateDisposableTokenRequest.Types.Expires() { ValidForSeconds = (uint)expiresIn.Seconds() },
AuthToken = this.authToken,
Permissions = PermissionsFromDisposableTokenScope(scope)
};
return _logger.LogTraceAuthRequestError(RequestTypeAuthGenerateDisposableToken,
new GenerateDisposableTokenResponse.Error(
new InvalidArgumentException("Permissions parameters may not be null", null, e)
)
);
}

try
{
_GenerateDisposableTokenRequest request = new _GenerateDisposableTokenRequest
{
Expires = new _GenerateDisposableTokenRequest.Types.Expires() { ValidForSeconds = (uint)expiresIn.Seconds() },
AuthToken = this.authToken,
Permissions = permissions
};
_logger.LogTraceExecutingAuthRequest(RequestTypeAuthGenerateDisposableToken);
var response = await grpcManager.Client.generateDisposableToken(
request, new CallOptions(deadline: CalculateDeadline())
Expand Down Expand Up @@ -208,6 +222,13 @@ DisposableToken.TopicPermission permission
TopicName = byName.TopicName
};
}
else if (permission.TopicSelector is TopicSelector.SelectByTopicNamePrefix byTopicNamePrefix)
{
grpcPermission.TopicSelector = new PermissionsType.Types.TopicSelector
{
TopicNamePrefix = byTopicNamePrefix.TopicNamePrefix
};
}
else
{
throw new UnknownException(
Expand Down
55 changes: 43 additions & 12 deletions src/Momento.Sdk/Internal/ScsTopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public async Task<TopicSubscribeResponse> Subscribe(string cacheName, string top

private async Task<TopicPublishResponse> SendPublish(string cacheName, string topicName, _TopicValue value)
{
_PublishRequest request = new _PublishRequest
var request = new _PublishRequest
{
CacheName = cacheName,
Topic = topicName,
Expand Down Expand Up @@ -124,6 +124,7 @@ private async Task<TopicSubscribeResponse> SendSubscribe(string cacheName, strin
{
_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, cacheName, topicName);
subscriptionWrapper = new SubscriptionWrapper(grpcManager, cacheName, topicName, _exceptionMapper, _logger);
await subscriptionWrapper.Subscribe();
}
catch (Exception e)
{
Expand All @@ -146,8 +147,9 @@ private class SubscriptionWrapper : IDisposable
private readonly CacheExceptionMapper _exceptionMapper;
private readonly ILogger _logger;

private AsyncServerStreamingCall<_SubscriptionItem> _subscription;
private AsyncServerStreamingCall<_SubscriptionItem>? _subscription;
private ulong? _lastSequenceNumber;
private bool _subscribed;

public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
string topicName, CacheExceptionMapper exceptionMapper, ILogger logger)
Expand All @@ -157,11 +159,9 @@ public SubscriptionWrapper(TopicGrpcManager grpcManager, string cacheName,
_topicName = topicName;
_exceptionMapper = exceptionMapper;
_logger = logger;

_subscription = Subscribe();
}

private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
public async Task Subscribe()
{
var request = new _SubscriptionRequest
{
Expand All @@ -174,7 +174,19 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
}

_logger.LogTraceExecutingTopicRequest(RequestTypeTopicSubscribe, _cacheName, _topicName);
return _grpcManager.Client.subscribe(request, new CallOptions());
var subscription = _grpcManager.Client.subscribe(request, new CallOptions());

await subscription.ResponseStream.MoveNext();
var firstMessage = subscription.ResponseStream.Current;
// The first message to a new subscription will always be a heartbeat.
if (firstMessage.KindCase is not _SubscriptionItem.KindOneofCase.Heartbeat)
{
throw new InternalServerException(
$"Expected heartbeat message for topic {_topicName} on cache {_cacheName}. Got: {firstMessage.KindCase}");
}

_subscription = subscription;
_subscribed = true;
}

public async ValueTask<TopicMessage?> GetNextRelevantMessageFromGrpcStreamAsync(
Expand All @@ -184,7 +196,13 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
{
try
{
await _subscription.ResponseStream.MoveNext(cancellationToken);
if (!_subscribed)
{
await Subscribe();
_subscribed = true;
}

await _subscription!.ResponseStream.MoveNext(cancellationToken);
}
catch (OperationCanceledException)
{
Expand All @@ -193,17 +211,24 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
catch (Exception e)
{
var sdkException = _exceptionMapper.Convert(e);
if (sdkException.ErrorCode == MomentoErrorCode.CANCELLED_ERROR)
if (sdkException.ErrorCode is MomentoErrorCode.CANCELLED_ERROR)
{
break;
}
//TODO: Are there other errors that we should break for?

_logger.LogTraceTopicSubscriptionError(_cacheName, _topicName, sdkException);

await Task.Delay(5_000, cancellationToken);
// Certain errors can never be recovered
if (!IsErrorRecoverable(sdkException))
{
return new TopicMessage.Error(sdkException);
}

// If the error is recoverable, wait and attempt to resubscribe
Dispose();
_subscription = Subscribe();
await Task.Delay(5_000, cancellationToken);

_subscribed = false;
continue;
}

Expand Down Expand Up @@ -248,9 +273,15 @@ private AsyncServerStreamingCall<_SubscriptionItem> Subscribe()
return null;
}

private static bool IsErrorRecoverable(SdkException exception)
{
return exception.ErrorCode is not (MomentoErrorCode.PERMISSION_ERROR
or MomentoErrorCode.AUTHENTICATION_ERROR);
}

public void Dispose()
{
_subscription.Dispose();
_subscription?.Dispose();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Momento.Sdk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.49.0" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="7.0.0" />
<PackageReference Include="Momento.Protos" Version="0.78.2" />
<PackageReference Include="Momento.Protos" Version="0.79.0" />
<PackageReference Include="JWT" Version="9.0.3" />
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
Expand Down
2 changes: 1 addition & 1 deletion src/Momento.Sdk/Responses/TopicSubscribeResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public async ValueTask<bool> MoveNextAsync()
return true;
case TopicMessage.Error:
Current = nextMessage;
return true;
return false;
default:
Current = null;
return false;
Expand Down
Loading

0 comments on commit e704c40

Please sign in to comment.