Skip to content

Commit

Permalink
Merge pull request superstreamlabs#201 from memphisdev/fix-_batchMaxT…
Browse files Browse the repository at this point in the history
…imeToWaitMs

BatchMaxTimeToWaitMs min 100
  • Loading branch information
idanasulin2706 authored Feb 7, 2024
2 parents f27e100 + d1bc659 commit 6dfd4d7
Show file tree
Hide file tree
Showing 251 changed files with 43,432 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/Memphis.Client/Consumer/FetchMessageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public sealed class FetchMessageOptions
public int BatchMaxTimeToWaitMs
{
get => _batchMaxTimeToWaitMs;
set =>_batchMaxTimeToWaitMs = (value < 1000) ? 1000 : value;
set =>_batchMaxTimeToWaitMs = (value < 100) ? 100 : value;
}

public int MaxAckTimeMs { get; set; } = 30_000;
Expand Down
1 change: 0 additions & 1 deletion src/Memphis.Client/Consumer/MemphisConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ public async Task<IEnumerable<MemphisMessage>> FetchMessages(
internal IEnumerable<MemphisMessage> Fetch(FetchMessageOptions fetchMessageOptions)
{
MemphisClient.EnsureBatchSizeIsValid(fetchMessageOptions.BatchSize);

try
{
var batchSize = fetchMessageOptions.BatchSize;
Expand Down
2 changes: 1 addition & 1 deletion src/Memphis.Client/Consumer/MemphisConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public sealed class MemphisConsumerOptions
public int BatchMaxTimeToWaitMs
{
get => _batchMaxTimeToWaitMs;
set => _batchMaxTimeToWaitMs = (value < 1_000) ? 1_000 : value;
set => _batchMaxTimeToWaitMs = (value < 100) ? 100 : value;
}
public int MaxAckTimeMs { get; set; } = 30_000;
public int MaxMsgDeliveries { get; set; } = 2;
Expand Down
17 changes: 16 additions & 1 deletion src/Memphis.Client/Memphis.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<PackageReference Include="GraphQL" Version="7.2.1" />
<PackageReference Include="GraphQL.SystemTextJson" Version="7.2.1" />
<PackageReference Include="murmurhash" Version="1.0.3" />
<PackageReference Include="NATS.Client" Version="1.1.0" />
<!-- <ProjectReference Include="..\NATS.Client\NATS.Client.csproj" /> NATS 1.1.0 -->
<PackageReference Include="NJsonSchema" Version="10.8.0" />
<PackageReference Include="DotNet.ReproducibleBuilds" Version="1.1.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand All @@ -31,6 +31,21 @@
<PackageReference Include="ProtoBufEval" Version="0.1.5" />
</ItemGroup>

<!-- Deps-->
<!-- <ItemGroup Condition="$(TargetFramework) == 'netstandard1.6'">
<PackageReference Include="System.Net.Security" Version="4.3.2" />
<PackageReference Include="System.Reflection.Extensions" Version="4.3.0" />
<PackageReference Include="System.Runtime" Version="4.3.1" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
</ItemGroup>
<ItemGroup Condition="$(TargetFramework) == 'netstandard1.6' Or $(TargetFramework) == 'net46'">
<PackageReference Include="System.Collections.Specialized" Version="4.3.0" />
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="System.Text.RegularExpressions" Version="4.3.1" />
</ItemGroup> -->
<!--Deps-->

<ItemGroup>
<!-- Make assembly visible to test assembly -->
<InternalsVisibleTo Include="$(AssemblyName).UnitTests" />
Expand Down
40 changes: 40 additions & 0 deletions src/Memphis.Client/NATS.Client/AckType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Text;

namespace NATS.Client
{
public sealed class AckType
{
public static AckType AckAck = new AckType("+ACK", true);
public static AckType AckNak = new AckType("-NAK", true);
public static AckType AckProgress = new AckType("+WPI", false);
public static AckType AckTerm = new AckType("+TERM", true);

public string Text { get; }
public byte[] Bytes { get; }
public bool IsTerminal { get; }

public AckType(string text, bool terminal)
{
Text = text;
Bytes = Encoding.ASCII.GetBytes(text);
IsTerminal = terminal;
}

public byte[] BodyBytes(long delayNanoseconds) {
return delayNanoseconds < 1 ? Bytes : Encoding.ASCII.GetBytes($"{Text} {{\"delay\": {delayNanoseconds}}}");
}
}
}
202 changes: 202 additions & 0 deletions src/Memphis.Client/NATS.Client/AsyncSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2015-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;

// disable XML comment warnings
#pragma warning disable 1591

namespace NATS.Client
{
/// <summary>
/// <see cref="AsyncSubscription"/> asynchronously delivers messages to listeners of the <see cref="MessageHandler"/>
/// event. This class should not be used directly.
/// </summary>
/// <remarks>
/// If the <see cref="AsyncSubscription"/> is created without listening to the <see cref="MessageHandler"/>
/// event, no messages will be received until <see cref="Start()"/> has been called.
/// </remarks>
public class AsyncSubscription : Subscription, IAsyncSubscription, ISubscription
{
/// <summary>
/// Occurs when the <see cref="AsyncSubscription"/> receives a message from the
/// underlying <see cref="Subscription"/>.
/// </summary>
public event EventHandler<MsgHandlerEventArgs> MessageHandler;

private Task msgFeeder = null;

private bool started = false;

internal AsyncSubscription(Connection conn, string subject, string queue, bool forceOwnChannel = false)
: base(conn, subject, queue)
{
mch = forceOwnChannel ? null : conn.getMessageChannel();
ownsChannel = mch == null;
if (ownsChannel)
{
mch = new Channel<Msg>(SubName());
}
}

internal override bool processMsg(Msg msg)
{
Connection localConn;
EventHandler<MsgHandlerEventArgs> localHandler;
long localMax;
long d;

lock (mu)
{
if (closed)
return false;

// the message handler has not been setup yet, drop the
// message.
if (MessageHandler == null)
return true;

if (conn == null)
return false;

d = tallyDeliveredMessage(msg);

localConn = conn;
localHandler = MessageHandler;
localMax = max;
}

if (localMax <= 0 || d <= localMax)
{
try
{
if (localHandler != null)
{
var msgHandlerEventArgs = new MsgHandlerEventArgs(msg);

localHandler(this, msgHandlerEventArgs);
}
}
catch (Exception) { }

if (d == max)
{
unsubscribe(false);
lock (mu)
{
conn = null;
}
}
}

return true;
}

internal bool isStarted()
{
return started;
}

internal void enableAsyncProcessing()
{
if (ownsChannel && msgFeeder == null)
{
// Use the default task scheduler and do not let child tasks launched
// when delivering messages to attach to this task (Issue #273)
msgFeeder = Task.Factory.StartNew(
doAsyncProcessing,
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);
}
started = true;
}

private void doAsyncProcessing() => conn.deliverMsgs(mch);

internal void disableAsyncProcessing()
{
lock (mu)
{
if (msgFeeder != null)
{
mch.close();
msgFeeder = null;
}
MessageHandler = null;
started = false;
}
}

/// <summary>
/// Starts delivering received messages to listeners on <see cref="MessageHandler"/>
/// from a separate thread.
/// </summary>
/// <remarks>
/// If the <see cref="IAsyncSubscription"/> has already started delivering messages, this
/// method is a no-op.
/// </remarks>
/// <exception cref="NATSBadSubscriptionException">There is no longer an associated <see cref="Connection"/>
/// for this <see cref="AsyncSubscription"/>.</exception>
public void Start()
{
if (started)
return;

lock (mu)
{
if (conn == null)
throw new NATSBadSubscriptionException();

conn.sendSubscriptionMessage(this);
enableAsyncProcessing();
}
}

/// <summary>
/// Removes interest in the given subject.
/// </summary>
/// <exception cref="NATSBadSubscriptionException">There is no longer an associated <see cref="Connection"/>
/// for this <see cref="AsyncSubscription"/>.</exception>
public override void Unsubscribe()
{
disableAsyncProcessing();
base.Unsubscribe();
}

/// <summary>
/// Issues an automatic call to <see cref="Unsubscribe"/> when <paramref name="max"/> messages have been
/// received.
/// </summary>
/// <remarks><para>This can be useful when sending a request to an unknown number of subscribers.
/// <see cref="Connection"/>'s Request methods use this functionality.</para>
/// <para>Calling this method will invoke <see cref="Start"/> if it has not already been called.</para></remarks>
/// <param name="max">The maximum number of messages to receive on the subscription before calling
/// <see cref="Unsubscribe"/>. Values less than or equal to zero (<c>0</c>) unsubscribe immediately.</param>
/// <exception cref="NATSBadSubscriptionException">There is no longer an associated <see cref="Connection"/>
/// for this <see cref="AsyncSubscription"/>.</exception>
public override void AutoUnsubscribe(int max)
{
Start();
base.AutoUnsubscribe(max);
}

internal override void close()
{
disableAsyncProcessing();
close(ownsChannel);
}
}
}
Loading

0 comments on commit 6dfd4d7

Please sign in to comment.