Skip to content

Commit

Permalink
add some comment and some clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
omwok committed Jun 1, 2012
1 parent a247883 commit 8a2a7b5
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 117 deletions.
6 changes: 3 additions & 3 deletions src/dotnet/ZooKeeperNet/ClientConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ClientConnection : IClientConnection
{
private static readonly ILog LOG = LogManager.GetLogger(typeof(ClientConnection));

//doesn't need this restriction, heck no limmit
//TODO find an elegant way to set this parameter
public const int packetLen = 4096 * 1024;
internal static readonly bool disableAutoWatchReset = false;
internal const int maxSpin = 30;
Expand Down Expand Up @@ -122,7 +122,7 @@ private void CreateProducer()

private string SetChrootPath()
{
int off = hosts.IndexOf('/');
int off = hosts.IndexOf(PathUtils.PathSeparatorChar);
if (off >= 0)
{
string path = hosts.Substring(off);
Expand Down Expand Up @@ -216,6 +216,7 @@ public ReplyHeader SubmitRequest(RequestHeader h, IRecord request, IRecord respo
Packet p = QueuePacket(h, r, request, response, null, null, watchRegistration, null, null);
SpinWait spin = new SpinWait();
DateTime start = DateTime.Now;
// now wait for reply for the packet
while (!p.Finished)
{
spin.SpinOnce();
Expand All @@ -239,7 +240,6 @@ public Packet QueuePacket(RequestHeader h, ReplyHeader r, IRecord request, IReco
/// </summary>
private void InternalDispose()
{
//if (!closing)
if(Interlocked.CompareExchange(ref isClosed,1,0) == 0)
{
//closing = true;
Expand Down
20 changes: 6 additions & 14 deletions src/dotnet/ZooKeeperNet/ClientConnectionEventConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ClientConnectionEventConsumer : IStartable, IDisposable

private readonly ClientConnection conn;
private readonly Thread eventThread;

//ConcurrentQueue gives us the non-blocking way of processing, it reduced the contention so much
internal readonly ConcurrentQueue<ClientConnection.WatcherSetEventPair> waitingEvents = new ConcurrentQueue<ClientConnection.WatcherSetEventPair>();

/** This is really the queued session state until the event
Expand All @@ -52,7 +52,6 @@ public void Start()

private static void ProcessWatcher(IEnumerable<IWatcher> watchers,WatchedEvent watchedEvent)
{
// // each watcher will process the event
foreach (IWatcher watcher in watchers)
{
try
Expand All @@ -70,13 +69,12 @@ public void PollEvents()
{
try
{
//while (!waitingEvents.IsCompleted)
SpinWait spin = new SpinWait();
while(Interlocked.CompareExchange(ref isDisposed, 1, 1) == 0)
{
try
{
ClientConnection.WatcherSetEventPair pair;// = waitingEvents.Take();
ClientConnection.WatcherSetEventPair pair;
if (waitingEvents.TryDequeue(out pair))
ProcessWatcher(pair.Watchers, pair.WatchedEvent);
else
Expand Down Expand Up @@ -131,17 +129,11 @@ private void InternalDispose()
{
if (Interlocked.CompareExchange(ref isDisposed, 1, 0) == 0)
{
//waitingEvents.CompleteAdding();
eventThread.Join();
while (!waitingEvents.IsEmpty)
{
ClientConnection.WatcherSetEventPair pair;// = waitingEvents.Take();
if (waitingEvents.TryDequeue(out pair))
{
ProcessWatcher(pair.Watchers,pair.WatchedEvent);
}
}
//waitingEvents.Dispose();
//process any unprocessed event
ClientConnection.WatcherSetEventPair pair;
while(waitingEvents.TryDequeue(out pair))
ProcessWatcher(pair.Watchers, pair.WatchedEvent);
}
}

Expand Down
105 changes: 53 additions & 52 deletions src/dotnet/ZooKeeperNet/ClientConnectionRequestProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,12 @@ public Packet QueuePacket(RequestHeader h, ReplyHeader r, IRecord request, IReco
h.Xid = Xid;

Packet p = new Packet(h, r, request, response, null, watchRegistration, clientPath, serverPath);

if (!zooKeeper.State.IsAlive() || Interlocked.CompareExchange(ref isDisposed, 0, 0) == 1)
ConLossPacket(p);
else
{
// enqueue the packet when zookeeper is connected
outgoingQueue.Enqueue(p);
//queueEvent.Set();
}
return p;
}

Expand All @@ -103,7 +101,6 @@ private void SendRequests()
if (client == null)
{
// don't re-establish connection if we are closing
//if (conn.closing)
if(conn.IsClosed)
break;

Expand Down Expand Up @@ -132,14 +129,15 @@ private void SendRequests()
}
else
{
// spin the processor
spin.SpinOnce();
if (spin.Count > ClientConnection.maxSpin)
// reset the spinning counter
spin.Reset();
}
}
catch (Exception e)
{
//if (conn.closing)
if(conn.IsClosed)
{
if (LOG.IsDebugEnabled)
Expand All @@ -159,16 +157,22 @@ private void SendRequests()
LOG.InfoFormat("{0}{1}", e.Message, RETRY_CONN_MSG);
else
LOG.WarnFormat("Session 0x{0:X} for server {1}, unexpected error{2}, detail:{3}-{4}", conn.SessionId, null, RETRY_CONN_MSG, e.Message, e.StackTrace);
// a safe-net ...there's a packet about to send when an exception happen
if(packet != null)
ConLossPacket(packet);
// clean up any queued packet
Cleanup();
// inform client using event about the exception
if (zooKeeper.State.IsAlive())
conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null));
}
}
// safe-net to ensure everything is clean up properly
Cleanup();
if (zooKeeper.State.IsAlive())
conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null));

// i don't think this is necessary, when we reached this block ....the state is surely not alive
//if (zooKeeper.State.IsAlive())
// conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null));

if (LOG.IsDebugEnabled)
LOG.Debug("SendThread exitedloop.");
Expand All @@ -180,6 +184,7 @@ private void Cleanup()
{
try
{
// close the connection
client.Close();
}
catch (IOException e)
Expand All @@ -192,7 +197,7 @@ private void Cleanup()
client = null;
}
}
Packet pack;
Packet pack;
while (pendingQueue.TryDequeue(out pack))
ConLossPacket(pack);

Expand Down Expand Up @@ -242,12 +247,12 @@ private void StartConnect()
client = new TcpClient();
client.LingerState = new LingerOption(false, 0);
client.NoDelay = true;
//initialized =

Interlocked.Exchange(ref initialized, 0);
IsConnectionClosedByServer = false;
//ConnectSocket(addr);

client.Connect(addr);
//initialized = false;

client.GetStream().BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer);
PrimeConnection();
}
Expand All @@ -256,6 +261,16 @@ private void StartConnect()

private int currentLen;

/// <summary>
/// process the receiving mechanism in asynchronous manner.
/// Zookeeper server sent data in two main parts
/// part(1) -> contain the length of the part(2)
/// part(2) -> contain the interest information
///
/// Part(2) may deliver in two or more segments so it is important to
/// handle this situation properly
/// </summary>
/// <param name="ar">The asynchronous result</param>
private void ReceiveAsynch(IAsyncResult ar)
{
if (Interlocked.CompareExchange(ref isDisposed, 0, 0) == 1)
Expand All @@ -264,37 +279,43 @@ private void ReceiveAsynch(IAsyncResult ar)
try
{
len = client.GetStream().EndRead(ar);
if (len == 0) //server closed the connection
{
if (len == 0) //server closed the connection...
{
zooKeeper.State = ZooKeeper.States.CLOSED;
IsConnectionClosedByServer = true;
return;
}
byte[] bData = (byte[])ar.AsyncState;

recvCount++;
if (bData == incomingBuffer)
{

if (bData == incomingBuffer) // if bData is incoming then surely it is a length information
{
currentLen = 0;
juteBuffer = null;
// get the length information from the stream
juteBuffer = new byte[ReadLength(bData)];
// try getting other info from the stream
client.GetStream().BeginRead(juteBuffer, 0, juteBuffer.Length, ReceiveAsynch, juteBuffer);
}
else
else // not an incoming buffer then it is surely a zookeeper process information
{
if (Interlocked.CompareExchange(ref initialized,1,0) == 0)
{
// haven't been initialized so read the authentication negotiation result
ReadConnectResult(bData);
// reading length information
client.GetStream().BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer);
}
else
{
currentLen += len;
if (juteBuffer.Length > currentLen)
if (juteBuffer.Length > currentLen) // stream haven't been completed so read any left bytes
client.GetStream().BeginRead(juteBuffer, currentLen, juteBuffer.Length - currentLen, ReceiveAsynch, juteBuffer);
else
{
// stream is complete so read the response
ReadResponse(bData);
// everything is fine, now read the stream again for length information
client.GetStream().BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer);
}
}
Expand All @@ -306,25 +327,6 @@ private void ReceiveAsynch(IAsyncResult ar)
}
}

//private void ConnectAsynch(IAsyncResult ar)
//{
// client.EndConnect(ar);
// ManualResetEvent evt = (ManualResetEvent)ar.AsyncState;
// evt.Set();
//}

//private void ConnectSocket(IPEndPoint addr)
//{
// client.Connect(addr);
//using (ManualResetEvent socketConnectTimeout = new ManualResetEvent(false))
//{
// client.BeginConnect(addr.Address, addr.Port, ConnectAsynch, socketConnectTimeout);
// if (socketConnectTimeout.WaitOne(conn.connectTimeout))
// return;
// throw new InvalidOperationException(string.Format("Could not make socket connection to {0}:{1}", addr.Address, addr.Port));
//}
//}

private void PrimeConnection()
{
LOG.InfoFormat("Socket connection established to {0}, initiating session", client.Client.RemoteEndPoint);
Expand All @@ -336,11 +338,11 @@ private void PrimeConnection()
using (EndianBinaryWriter writer = new EndianBinaryWriter(EndianBitConverter.Big, ms, Encoding.UTF8))
{
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(writer);
boa.WriteInt(-1, "len");
boa.WriteInt(-1, "len"); // we'll fill this latter
conReq.Serialize(boa, "connect");
ms.Position = 0;
int len = Convert.ToInt32(ms.Length);
writer.Write(len - 4);
int len = Convert.ToInt32(ms.Length); // now we have the real length
writer.Write(len - 4); // write the length info
buffer = ms.ToArray();
}
outgoingQueue.Enqueue((new Packet(null, null, null, null, buffer, null, null, null)));
Expand Down Expand Up @@ -368,15 +370,16 @@ private void SendPing()
RequestHeader h = new RequestHeader(-2, (int)OpCode.Ping);
conn.QueuePacket(h, null, null, null, null, null, null, null, null);
}

// send packet to server
// there's posibility when server closed the socket and client try to send some packet, when this happen it will throw exception
// the exception is either IOException, NullReferenceException and/or ObjectDisposedException
// so it is mandatory to catch these excepetions

/// <summary>
/// send packet to server
/// there's posibility when server closed the socket and client try to send some packet, when this happen it will throw exception
/// the exception is either IOException, NullReferenceException and/or ObjectDisposedException
/// so it is mandatory to catch these excepetions
/// </summary>
/// <param name="packet">The packet to send</param>
private void DoSend(Packet packet)
{
//if (client == null)
// throw new IOException("Socket is null!");
if (packet.header != null
&& packet.header.Type != (int)OpCode.Ping
&& packet.header.Type != (int)OpCode.Auth)
Expand Down Expand Up @@ -411,6 +414,7 @@ private void ReadConnectResult(byte[] content)
throw new SessionExpiredException(new StringBuilder().AppendFormat("Unable to reconnect to ZooKeeper service, session 0x{0:X} has expired", conn.SessionId).ToString());
}
conn.readTimeout = new TimeSpan(0, 0, 0, 0, negotiatedSessionTimeout * 2 / 3);
// commented...we haven't need this information yet...
//conn.connectTimeout = new TimeSpan(0, 0, 0, negotiatedSessionTimeout / conn.serverAddrs.Count);
conn.SessionId = conRsp.SessionId;
conn.SessionPassword = conRsp.Passwd;
Expand Down Expand Up @@ -457,7 +461,7 @@ private void ReadResponse(byte[] content)
{
string serverPath = @event.Path;
if (serverPath.CompareTo(conn.ChrootPath) == 0)
@event.Path = "/";
@event.Path = PathUtils.PathSeparator;
else
@event.Path = serverPath.Substring(conn.ChrootPath.Length);
}
Expand Down Expand Up @@ -527,7 +531,6 @@ private static void FinishPacket(Packet p)
p.watchRegistration.Register(p.replyHeader.Err);

p.Finished = true;
//conn.consumer.QueuePacket(p);
}

private int isDisposed = 0;
Expand All @@ -536,9 +539,7 @@ private void InternalDispose()
if (Interlocked.CompareExchange(ref isDisposed, 1, 0) == 0)
{
zooKeeper.State = ZooKeeper.States.CLOSED;
//queueEvent.Set();
requestThread.Join();
//queueEvent.Dispose();
incomingBuffer = juteBuffer = null;
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/dotnet/ZooKeeperNet/DataTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ public class DataTree
//private ZKWatchManager dataWatches = new ZKWatchManager();

//private ZKWatchManager childWatches = new ZKWatchManager();

/** the root of zookeeper tree */
//private static string rootZookeeper = "/";


/** the zookeeper nodes that acts as the management and status node **/
//private static string procZookeeper = Quotas.procZookeeper;

Expand Down
2 changes: 1 addition & 1 deletion src/dotnet/ZooKeeperNet/IO/EndianBinaryReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public byte[] ReadBytes(int count)
//byte[] copy = new byte[index];
//Buffer.BlockCopy(ret, 0, copy, 0, index);
//return copy;
Array.Resize(ref ret, index);
Array.Resize(ref ret, index); // change to array resize...simpler way
return ret;
}
index += read;
Expand Down
4 changes: 2 additions & 2 deletions src/dotnet/ZooKeeperNet/Packet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ internal Packet(RequestHeader header, ReplyHeader replyHeader, IRecord request,
request.Serialize(boa, "request");
}
ms.Position = 0;
int len = Convert.ToInt32(ms.Length);
writer.Write(len - 4);
int len = Convert.ToInt32(ms.Length); // now we have the real length
writer.Write(len - 4); // update the length info
this.data = ms.ToArray();
}
}
Expand Down
Loading

0 comments on commit 8a2a7b5

Please sign in to comment.