diff --git a/src/dotnet/SolutionVersion.cs b/src/dotnet/SolutionVersion.cs index c6d7315ddf3..7b48807905b 100644 --- a/src/dotnet/SolutionVersion.cs +++ b/src/dotnet/SolutionVersion.cs @@ -5,10 +5,10 @@ [assembly: AssemblyDescription("ZooKeeper Client for .NET http://zookeeper.apache.org")] [assembly: AssemblyProduct("ZooKeeperNet")] [assembly: AssemblyCopyright("Copyright 2007-2012 ewhauser, omwok. - All rights reserved.")] -[assembly: AssemblyVersion("0.5.3")] -[assembly: AssemblyFileVersion("0.5.3")] - -[assembly: AssemblyInformationalVersion("0.5.3.343")] +[assembly: AssemblyVersion("3.3.4.6")] +[assembly: AssemblyFileVersion("3.3.4.6")] + +[assembly: AssemblyInformationalVersion("3.3.4.6")] [assembly: ComVisibleAttribute(false)] [assembly: CLSCompliantAttribute(false)] diff --git a/src/dotnet/ZooKeeperNet.Recipes/DistributedQueue.cs b/src/dotnet/ZooKeeperNet.Recipes/DistributedQueue.cs index 64097b80253..6c53f05767f 100644 --- a/src/dotnet/ZooKeeperNet.Recipes/DistributedQueue.cs +++ b/src/dotnet/ZooKeeperNet.Recipes/DistributedQueue.cs @@ -197,7 +197,7 @@ public ResetChildWatcher() public void Process(WatchedEvent @event) { - LOG.DebugFormat("Watcher fired on path: {0} state: {1} type {2}", @event.Path, @event.State, @event.EventType); + LOG.DebugFormat("Watcher fired on path: {0} state: {1} type {2}", @event.Path, @event.State, @event.Type); reset.Set(); } diff --git a/src/dotnet/ZooKeeperNet.Recipes/WriteLock.cs b/src/dotnet/ZooKeeperNet.Recipes/WriteLock.cs index f4fb563a0a5..6b8aa7d07f5 100644 --- a/src/dotnet/ZooKeeperNet.Recipes/WriteLock.cs +++ b/src/dotnet/ZooKeeperNet.Recipes/WriteLock.cs @@ -206,7 +206,7 @@ public LockWatcher(WriteLock writeLock) public void Process(WatchedEvent @event) { if (LOG.IsDebugEnabled) - LOG.DebugFormat("Watcher fired on path: {0} state: {1} type {2}", @event.Path, @event.State, @event.EventType); + LOG.DebugFormat("Watcher fired on path: {0} state: {1} type {2}", @event.Path, @event.State, @event.Type); try { writeLock.Lock(); diff --git a/src/dotnet/ZooKeeperNet.Tests/ClientTests.cs b/src/dotnet/ZooKeeperNet.Tests/ClientTests.cs index 7e1c36b62f4..3edc4eca451 100644 --- a/src/dotnet/ZooKeeperNet.Tests/ClientTests.cs +++ b/src/dotnet/ZooKeeperNet.Tests/ClientTests.cs @@ -153,7 +153,7 @@ private class MyWatcher : CountdownWatcher public override void Process(WatchedEvent @event) { base.Process(@event); - if (@event.EventType != EventType.None) + if (@event.Type != EventType.None) { try { @@ -212,7 +212,7 @@ public void testMutipleWatcherObjs() WatchedEvent @event; watchers[i].events.TryTake(out @event, TimeSpan.FromSeconds(3d)); Assert.AreEqual(name + i, @event.Path); - Assert.AreEqual(EventType.NodeDataChanged, @event.EventType); + Assert.AreEqual(EventType.NodeDataChanged, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); // small chance that an unexpected message was delivered @@ -241,7 +241,7 @@ public void testMutipleWatcherObjs() WatchedEvent @event; watchers[i].events.TryTake(out @event, TimeSpan.FromSeconds(10d)); Assert.AreEqual(name + i, @event.Path); - Assert.AreEqual(EventType.NodeDataChanged, @event.EventType); + Assert.AreEqual(EventType.NodeDataChanged, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); // small chance that an unexpected message was delivered @@ -269,7 +269,7 @@ public void testMutipleWatcherObjs() WatchedEvent @event; watchers[i].events.TryTake(out @event, TimeSpan.FromSeconds(3000)); Assert.AreEqual(name + i, @event.Path); - Assert.AreEqual(EventType.NodeDataChanged, @event.EventType); + Assert.AreEqual(EventType.NodeDataChanged, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); // small chance that an unexpected message was delivered @@ -281,7 +281,7 @@ public void testMutipleWatcherObjs() WatchedEvent event2; watchers2[i].events.TryTake(out @event2, TimeSpan.FromSeconds(3000)); Assert.AreEqual(name + i, event2.Path); - Assert.AreEqual(EventType.NodeDataChanged, event2.EventType); + Assert.AreEqual(EventType.NodeDataChanged, event2.Type); Assert.AreEqual(KeeperState.SyncConnected, event2.State); // small chance that an unexpected message was delivered @@ -387,7 +387,7 @@ private void performClientTest(bool withWatcherObj) WatchedEvent @event; watcher.events.TryTake(out @event, TimeSpan.FromSeconds(3000)); Assert.AreEqual(frog, @event.Path); - Assert.AreEqual(EventType.NodeCreated, @event.EventType); + Assert.AreEqual(EventType.NodeCreated, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); // Test child watch and Create with sequence zk.GetChildren(patPlusBen, true); @@ -429,18 +429,18 @@ private void performClientTest(bool withWatcherObj) watcher.events.TryTake(out @event, TimeSpan.FromSeconds(3)); Assert.AreEqual(patPlusBen, @event.Path); - Assert.AreEqual(EventType.NodeChildrenChanged, @event.EventType); + Assert.AreEqual(EventType.NodeChildrenChanged, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); for (int i = 0; i < 10; i++) { watcher.events.TryTake(out @event, TimeSpan.FromSeconds(3)); string name = children.ElementAt(i); Assert.AreEqual(patPlusBen + "/" + name, @event.Path); - Assert.AreEqual(EventType.NodeDataChanged, @event.EventType); + Assert.AreEqual(EventType.NodeDataChanged, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); watcher.events.TryTake(out @event, TimeSpan.FromSeconds(3)); Assert.AreEqual(patPlusBen + "/" + name, @event.Path); - Assert.AreEqual(EventType.NodeDeleted, @event.EventType); + Assert.AreEqual(EventType.NodeDeleted, @event.Type); Assert.AreEqual(KeeperState.SyncConnected, @event.State); } zk.Create("/good" + Guid.NewGuid() + "\u0040path", "".GetBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); diff --git a/src/dotnet/ZooKeeperNet.Tests/RecoveryTest.cs b/src/dotnet/ZooKeeperNet.Tests/RecoveryTest.cs new file mode 100755 index 00000000000..097636ce09a --- /dev/null +++ b/src/dotnet/ZooKeeperNet.Tests/RecoveryTest.cs @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Org.Apache.Zookeeper.Data; + +namespace ZooKeeperNet.Tests +{ + public class RecoveryTest : AbstractZooKeeperTests, IWatcher + { + private readonly ILog LOG = LogManager.GetLogger(typeof(RecoveryTest)); + private int setDataCount; + private int processCount; + private readonly string testPath = "/unittests/recoverytests/" + Guid.NewGuid(); + + [Test, Explicit] + public void ReconnectsWhenDisconnected() + { + using (CancellationTokenSource token = new CancellationTokenSource()) + { + Thread thread = new Thread(Run) + { + IsBackground = true, + Name = "RecoveryTest.Run" + }; + thread.Start(token.Token); + Thread.Sleep(15*1000); + LOG.Error("STOP ZK!!!! Count: " + processCount); + Thread.Sleep(20*1000); + LOG.Error("START ZK!!! Count: " + processCount); + Thread.Sleep(30*1000); + LOG.Error("Stopping ZK client."); + token.Cancel(); + LOG.Error("Waiting for thread to stop..." + processCount); + thread.Join(); + if (thread.IsAlive) + Assert.Fail("Thread still alive"); + Assert.AreEqual(setDataCount, processCount, "setDataCount == processCount"); + LOG.Error("Finished:" + setDataCount + ":" + processCount); + } + } + + private void Run(object sender) + { + try + { + CancellationToken token = (CancellationToken)sender; + using (ZooKeeper zooKeeper = CreateClient(this)) + { + Stat stat = new Stat(); + if (zooKeeper.Exists("/unittests/recoverytests", false) == null) + { + zooKeeper.Create("/unittests", new byte[] {0}, Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); + zooKeeper.Create("/unittests/recoverytests", new byte[] {0}, Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); + } + if (zooKeeper.Exists(testPath, false) == null) + { + zooKeeper.Create(testPath, new byte[] {0}, Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); + } + while (zooKeeper.State.IsAlive() && !token.IsCancellationRequested) + { + try + { + zooKeeper.GetData(testPath, true, stat); + zooKeeper.SetData(testPath, Guid.NewGuid().ToByteArray(), -1); + setDataCount++; + } + catch(KeeperException ke) + { + LOG.Error(ke); + } + } + LOG.Error("Waiting for dispose."); + } + } + catch(Exception ex) + { + LOG.Error(ex); + } + } + + public void Process(WatchedEvent @event) + { + LOG.Debug(@event); + if (@event.Type == EventType.NodeCreated || @event.Type == EventType.NodeDataChanged) + { + Interlocked.Increment(ref processCount); + } + } + } +} \ No newline at end of file diff --git a/src/dotnet/ZooKeeperNet.Tests/ZooKeeperNet.Tests.csproj b/src/dotnet/ZooKeeperNet.Tests/ZooKeeperNet.Tests.csproj index 7fe5748ee1e..0bfd960da72 100644 --- a/src/dotnet/ZooKeeperNet.Tests/ZooKeeperNet.Tests.csproj +++ b/src/dotnet/ZooKeeperNet.Tests/ZooKeeperNet.Tests.csproj @@ -1,122 +1,123 @@ - - - - Debug - AnyCPU - 9.0.30729 - 2.0 - {F1BEB393-A3A0-4A6E-A9B2-C58493B4EE00} - Library - Properties - ZooKeeperNet.Tests - ZooKeeperNet.Tests - v4.0 - 512 - - - - - 3.5 - publish\ - true - Disk - false - Foreground - 7 - Days - false - false - true - 0 - 1.0.0.%2a - false - false - true - - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - AllRules.ruleset - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - AllRules.ruleset - - - - False - ..\lib\log4net.dll - - - False - ..\lib\nunit.framework.dll - - - False - ..\lib\pnunit.framework.dll - - - - 3.5 - - - - - - - - - - - - - - - - - - - {5C6774FB-2350-46B2-B1DF-1CCC757C7727} - ZooKeeperNet - - - - - False - .NET Framework 3.5 SP1 Client Profile - false - - - False - .NET Framework 3.5 SP1 - true - - - False - Windows Installer 3.1 - true - - - - - - + + + + Debug + AnyCPU + 9.0.30729 + 2.0 + {F1BEB393-A3A0-4A6E-A9B2-C58493B4EE00} + Library + Properties + ZooKeeperNet.Tests + ZooKeeperNet.Tests + v4.0 + 512 + + + + + 3.5 + publish\ + true + Disk + false + Foreground + 7 + Days + false + false + true + 0 + 1.0.0.%2a + false + false + true + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + AllRules.ruleset + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + AllRules.ruleset + + + + False + ..\lib\log4net.dll + + + False + ..\lib\nunit.framework.dll + + + False + ..\lib\pnunit.framework.dll + + + + 3.5 + + + + + + + + + + + + + + + + + + + + False + .NET Framework 3.5 SP1 Client Profile + false + + + False + .NET Framework 3.5 SP1 + true + + + False + Windows Installer 3.1 + true + + + + + + + + {5C6774FB-2350-46B2-B1DF-1CCC757C7727} + ZooKeeperNet + + + + --> \ No newline at end of file diff --git a/src/dotnet/ZooKeeperNet/ClientConnection.cs b/src/dotnet/ZooKeeperNet/ClientConnection.cs index 78ed3370baf..93d6c5a2b60 100644 --- a/src/dotnet/ZooKeeperNet/ClientConnection.cs +++ b/src/dotnet/ZooKeeperNet/ClientConnection.cs @@ -261,6 +261,10 @@ private void InternalDispose() { // ignore, close the send/event threads } + catch (Exception ex) + { + LOG.WarnFormat("Error disposing {0} : {1}", this.GetType().FullName, ex.Message); + } finally { producer.Dispose(); @@ -294,8 +298,8 @@ public override string ToString() .Append(" xid:").Append(producer.xid) .Append(" sent:").Append(producer.sentCount) .Append(" recv:").Append(producer.recvCount) - .Append(" queuedpkts:").Append(producer.outgoingQueue.Count) - .Append(" pendingresp:").Append(producer.pendingQueue.Count) + .Append(" queuedpkts:").Append(producer.OutgoingQueueCount) + .Append(" pendingresp:").Append(producer.PendingQueueCount) .Append(" queuedevents:").Append(consumer.waitingEvents.Count); return sb.ToString(); diff --git a/src/dotnet/ZooKeeperNet/ClientConnectionEventConsumer.cs b/src/dotnet/ZooKeeperNet/ClientConnectionEventConsumer.cs index 818636eb27a..113a3794f84 100644 --- a/src/dotnet/ZooKeeperNet/ClientConnectionEventConsumer.cs +++ b/src/dotnet/ZooKeeperNet/ClientConnectionEventConsumer.cs @@ -111,7 +111,7 @@ public void PollEvents() public void QueueEvent(WatchedEvent @event) { - if (@event.EventType == EventType.None && sessionState == @event.State) return; + if (@event.Type == EventType.None && sessionState == @event.State) return; if (Interlocked.CompareExchange(ref isDisposed, 0, 0) == 1) throw new InvalidOperationException("consumer has been disposed"); @@ -119,7 +119,7 @@ public void QueueEvent(WatchedEvent @event) sessionState = @event.State; // materialize the watchers based on the event - var pair = new ClientConnection.WatcherSetEventPair(conn.watcher.Materialize(@event.State, @event.EventType,@event.Path), @event); + var pair = new ClientConnection.WatcherSetEventPair(conn.watcher.Materialize(@event.State, @event.Type,@event.Path), @event); // queue the pair (watch set & event) for later processing waitingEvents.Enqueue(pair); } @@ -129,11 +129,21 @@ private void InternalDispose() { if (Interlocked.CompareExchange(ref isDisposed, 1, 0) == 0) { - eventThread.Join(); - //process any unprocessed event - ClientConnection.WatcherSetEventPair pair; - while(waitingEvents.TryDequeue(out pair)) - ProcessWatcher(pair.Watchers, pair.WatchedEvent); + try + { + if (eventThread.IsAlive) + { + eventThread.Join(); + } + //process any unprocessed event + ClientConnection.WatcherSetEventPair pair; + while (waitingEvents.TryDequeue(out pair)) + ProcessWatcher(pair.Watchers, pair.WatchedEvent); + } + catch (Exception ex) + { + LOG.WarnFormat("Error disposing {0} : {1}", this.GetType().FullName, ex.Message); + } } } diff --git a/src/dotnet/ZooKeeperNet/ClientConnectionRequestProducer.cs b/src/dotnet/ZooKeeperNet/ClientConnectionRequestProducer.cs index 7d81401af0a..3c74e9ade06 100644 --- a/src/dotnet/ZooKeeperNet/ClientConnectionRequestProducer.cs +++ b/src/dotnet/ZooKeeperNet/ClientConnectionRequestProducer.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using ZooKeeperNet.IO; namespace ZooKeeperNet @@ -22,9 +23,17 @@ public class ClientConnectionRequestProducer : IStartable, IDisposable private readonly ClientConnection conn; private readonly ZooKeeper zooKeeper; private readonly Thread requestThread; - - internal readonly System.Collections.Concurrent.ConcurrentQueue pendingQueue = new System.Collections.Concurrent.ConcurrentQueue(); - internal readonly System.Collections.Concurrent.ConcurrentQueue outgoingQueue = new System.Collections.Concurrent.ConcurrentQueue(); + + private readonly ConcurrentQueue pendingQueue = new ConcurrentQueue(); + private readonly LinkedList outgoingQueue = new LinkedList(); + public int PendingQueueCount + { + get + { + return pendingQueue.Count; + } + } + public int OutgoingQueueCount { get { return outgoingQueue.Count; } } private TcpClient client; private int lastConnectIndex; @@ -35,6 +44,7 @@ public class ClientConnectionRequestProducer : IStartable, IDisposable internal long lastZxid; private long lastPingSentNs; internal int xid = 1; + private volatile bool closing; private byte[] incomingBuffer = new byte[4]; internal int sentCount; @@ -78,12 +88,23 @@ public Packet QueuePacket(RequestHeader h, ReplyHeader r, IRecord request, IReco h.Xid = Xid; Packet p = new Packet(h, r, request, response, null, watchRegistration, clientPath, serverPath); - - if (!zooKeeper.State.IsAlive() || Interlocked.CompareExchange(ref isDisposed, 0, 0) == 1) + + if (!zooKeeper.State.IsAlive() || closing || Interlocked.CompareExchange(ref isDisposed, 0, 0) == 1) + { + if(LOG.IsDebugEnabled) + LOG.DebugFormat("Connection closing. Sending ConLossPacket. IsAlive: {0}, closing: {1}", zooKeeper.State.IsAlive(), closing); ConLossPacket(p); + } else + { + if (h.Type == (int)OpCode.CloseSession) + closing = true; // enqueue the packet when zookeeper is connected - outgoingQueue.Enqueue(p); + lock (outgoingQueue) + { + outgoingQueue.AddLast(p); + } + } return p; } @@ -98,10 +119,10 @@ private void SendRequests() try { now = DateTime.Now; - if (client == null) + if (client == null || !client.Connected || zooKeeper.State == ZooKeeper.States.NOT_CONNECTED) { // don't re-establish connection if we are closing - if(conn.IsClosed) + if(conn.IsClosed || closing) break; StartConnect(); @@ -119,26 +140,31 @@ private void SendRequests() // Why we just have to do this once, here packet = null; - if (outgoingQueue.TryDequeue(out packet)) + lock (outgoingQueue) { - // We have something to send so it's the same - // as if we do the send now. - DoSend(packet); - lastSend = DateTime.Now; - packet = null; - } - else - { - // spin the processor - spin.SpinOnce(); - if (spin.Count > ClientConnection.maxSpin) - // reset the spinning counter - spin.Reset(); + if(!outgoingQueue.IsEmpty()) + { + packet = outgoingQueue.First(); + outgoingQueue.RemoveFirst(); + // We have something to send so it's the same + // as if we do the send now. + DoSend(packet); + lastSend = DateTime.Now; + packet = null; + } + else + { + // spin the processor + spin.SpinOnce(); + if (spin.Count > ClientConnection.maxSpin) + // reset the spinning counter + spin.Reset(); + } } } catch (Exception e) { - if(conn.IsClosed) + if (conn.IsClosed || closing) { if (LOG.IsDebugEnabled) { @@ -147,34 +173,38 @@ private void SendRequests() } break; } - - // this is ugly, you have a better way speak up - if (e is KeeperException.SessionExpiredException) - LOG.InfoFormat("{0}, closing socket connection",e.Message); - else if (e is SessionTimeoutException) - LOG.InfoFormat("{0}{1}",e.Message,RETRY_CONN_MSG); - else if (e is System.IO.EndOfStreamException) - LOG.InfoFormat("{0}{1}", e.Message, RETRY_CONN_MSG); else - LOG.WarnFormat("Session 0x{0:X} for server {1}, unexpected error{2}, detail:{3}-{4}", conn.SessionId, null, RETRY_CONN_MSG, e.Message, e.StackTrace); - // a safe-net ...there's a packet about to send when an exception happen - if(packet != null) - ConLossPacket(packet); - // clean up any queued packet - Cleanup(); - // inform client using event about the exception - if (zooKeeper.State.IsAlive()) - conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null)); + { + // this is ugly, you have a better way speak up + if (e is KeeperException.SessionExpiredException) + LOG.InfoFormat("{0}, closing socket connection", e.Message); + else if (e is SessionTimeoutException) + LOG.InfoFormat("{0}{1}", e.Message, RETRY_CONN_MSG); + else if (e is System.IO.EndOfStreamException) + LOG.InfoFormat("{0}{1}", e.Message, RETRY_CONN_MSG); + else + LOG.InfoFormat("Session 0x{0:X} for server {1}, unexpected error{2}, detail:{3}-{4}", conn.SessionId, null, RETRY_CONN_MSG, e.Message, e.StackTrace); + // a safe-net ...there's a packet about to send when an exception happen + if (packet != null) + ConLossPacket(packet); + // clean up any queued packet + Cleanup(); + if(zooKeeper.State.IsAlive()) + { + conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null)); + } + } } } + // safe-net to ensure everything is clean up properly Cleanup(); // i don't think this is necessary, when we reached this block ....the state is surely not alive - //if (zooKeeper.State.IsAlive()) - // conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null)); - - if (LOG.IsDebugEnabled) + if (zooKeeper.State.IsAlive()) + conn.consumer.QueueEvent(new WatchedEvent(KeeperState.Disconnected, EventType.None, null)); + + if (LOG.IsDebugEnabled) LOG.Debug("SendThread exitedloop."); } @@ -197,11 +227,18 @@ private void Cleanup() client = null; } } - Packet pack; - while (pendingQueue.TryDequeue(out pack)) - ConLossPacket(pack); + + lock (outgoingQueue) + { + foreach (var packet in outgoingQueue) + { + ConLossPacket(packet); + } + outgoingQueue.Clear(); + } - while (outgoingQueue.TryDequeue(out pack)) + Packet pack; + while (pendingQueue.TryDequeue(out pack)) ConLossPacket(pack); } @@ -243,16 +280,17 @@ private void StartConnect() if (nextAddrToTry == conn.serverAddrs.Count) nextAddrToTry = 0; + Cleanup(); LOG.InfoFormat("Opening socket connection to server {0}", addr); client = new TcpClient(); client.LingerState = new LingerOption(false, 0); - client.NoDelay = true; + client.NoDelay = true; Interlocked.Exchange(ref initialized, 0); IsConnectionClosedByServer = false; client.Connect(addr); - + client.GetStream().BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer); PrimeConnection(); } @@ -274,14 +312,20 @@ private void StartConnect() private void ReceiveAsynch(IAsyncResult ar) { if (Interlocked.CompareExchange(ref isDisposed, 0, 0) == 1) + { return; + } int len = 0; try { - len = client.GetStream().EndRead(ar); + if (client == null) + return; + NetworkStream stream = client.GetStream(); + len = stream.EndRead(ar); if (len == 0) //server closed the connection... - { - zooKeeper.State = ZooKeeper.States.CLOSED; + { + LOG.Debug("TcpClient connection lost."); + zooKeeper.State = ZooKeeper.States.NOT_CONNECTED; IsConnectionClosedByServer = true; return; } @@ -295,7 +339,7 @@ private void ReceiveAsynch(IAsyncResult ar) // get the length information from the stream juteBuffer = new byte[ReadLength(bData)]; // try getting other info from the stream - client.GetStream().BeginRead(juteBuffer, 0, juteBuffer.Length, ReceiveAsynch, juteBuffer); + stream.BeginRead(juteBuffer, 0, juteBuffer.Length, ReceiveAsynch, juteBuffer); } else // not an incoming buffer then it is surely a zookeeper process information { @@ -304,19 +348,19 @@ private void ReceiveAsynch(IAsyncResult ar) // haven't been initialized so read the authentication negotiation result ReadConnectResult(bData); // reading length information - client.GetStream().BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer); + stream.BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer); } else { currentLen += len; if (juteBuffer.Length > currentLen) // stream haven't been completed so read any left bytes - client.GetStream().BeginRead(juteBuffer, currentLen, juteBuffer.Length - currentLen, ReceiveAsynch, juteBuffer); + stream.BeginRead(juteBuffer, currentLen, juteBuffer.Length - currentLen, ReceiveAsynch, juteBuffer); else { // stream is complete so read the response ReadResponse(bData); // everything is fine, now read the stream again for length information - client.GetStream().BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer); + stream.BeginRead(incomingBuffer, 0, incomingBuffer.Length, ReceiveAsynch, incomingBuffer); } } } @@ -333,31 +377,23 @@ private void PrimeConnection() lastConnectIndex = currentConnectIndex; ConnectRequest conReq = new ConnectRequest(0, lastZxid, Convert.ToInt32(conn.SessionTimeout.TotalMilliseconds), conn.SessionId, conn.SessionPassword); - byte[] buffer; - MemoryStream ms = new MemoryStream(); - using (EndianBinaryWriter writer = new EndianBinaryWriter(EndianBitConverter.Big, ms, Encoding.UTF8)) + lock (outgoingQueue) { - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(writer); - boa.WriteInt(-1, "len"); // we'll fill this latter - conReq.Serialize(boa, "connect"); - ms.Position = 0; - int len = Convert.ToInt32(ms.Length); // now we have the real length - writer.Write(len - 4); // write the length info - buffer = ms.ToArray(); - } - outgoingQueue.Enqueue((new Packet(null, null, null, null, buffer, null, null, null))); + if (!ClientConnection.disableAutoWatchReset && (!zooKeeper.DataWatches.IsEmpty() || !zooKeeper.ExistWatches.IsEmpty() || !zooKeeper.ChildWatches.IsEmpty())) + { + var sw = new SetWatches(lastZxid, zooKeeper.DataWatches, zooKeeper.ExistWatches, zooKeeper.ChildWatches); + var h = new RequestHeader(); + h.Type = (int)OpCode.SetWatches; + h.Xid = -8; + Packet packet = new Packet(h, new ReplyHeader(), sw, null, null, null, null, null); + outgoingQueue.AddFirst(packet); + } - foreach (ClientConnection.AuthData id in conn.authInfo) - outgoingQueue.Enqueue(new Packet(new RequestHeader(-4, (int)OpCode.Auth), null, new AuthPacket(0, id.Scheme, id.GetData()), null, null, null, null, null)); + foreach (ClientConnection.AuthData id in conn.authInfo) + outgoingQueue.AddFirst( + new Packet(new RequestHeader(-4, (int) OpCode.Auth), null, new AuthPacket(0, id.Scheme, id.GetData()), null, null, null, null, null)); - if (!ClientConnection.disableAutoWatchReset && (!zooKeeper.DataWatches.IsEmpty() || !zooKeeper.ExistWatches.IsEmpty() || !zooKeeper.ChildWatches.IsEmpty())) - { - var sw = new SetWatches(lastZxid, zooKeeper.DataWatches, zooKeeper.ExistWatches, zooKeeper.ChildWatches); - var h = new RequestHeader(); - h.Type = (int)OpCode.SetWatches; - h.Xid = -8; - Packet packet = new Packet(h, new ReplyHeader(), sw, null, null, null, null, null); - outgoingQueue.Enqueue(packet); + outgoingQueue.AddFirst(new Packet(null, null, conReq, null, null, null, null, null)); } if (LOG.IsDebugEnabled) @@ -380,10 +416,12 @@ private void SendPing() /// The packet to send private void DoSend(Packet packet) { - if (packet.header != null - && packet.header.Type != (int)OpCode.Ping + if (packet.header != null + && packet.header.Type != (int)OpCode.Ping && packet.header.Type != (int)OpCode.Auth) + { pendingQueue.Enqueue(packet); + } client.GetStream().Write(packet.data, 0, packet.data.Length); sentCount++; } @@ -506,7 +544,9 @@ private void ReadResponse(byte[] content) } } else + { throw new IOException(new StringBuilder("Nothing in the queue, but got ").Append(replyHdr.Xid).ToString()); + } } } @@ -539,7 +579,18 @@ private void InternalDispose() if (Interlocked.CompareExchange(ref isDisposed, 1, 0) == 0) { zooKeeper.State = ZooKeeper.States.CLOSED; - requestThread.Join(); + try + { + if (requestThread.IsAlive) + { + requestThread.Join(); + } + } + catch (Exception ex) + { + LOG.WarnFormat("Error disposing {0} : {1}", this.GetType().FullName, ex.Message); + } + incomingBuffer = juteBuffer = null; } } diff --git a/src/dotnet/ZooKeeperNet/Packet.cs b/src/dotnet/ZooKeeperNet/Packet.cs index fe5a9459e1c..02071dab04c 100644 --- a/src/dotnet/ZooKeeperNet/Packet.cs +++ b/src/dotnet/ZooKeeperNet/Packet.cs @@ -38,6 +38,7 @@ public class Packet private int finished; internal ZooKeeper.WatchRegistration watchRegistration; internal readonly byte[] data; + public readonly DateTime DateCreated; /** Client's view of the path (may differ due to chroot) **/ private string clientPath; @@ -65,7 +66,10 @@ internal Packet(RequestHeader header, ReplyHeader replyHeader, IRecord request, { BinaryOutputArchive boa = BinaryOutputArchive.getArchive(writer); boa.WriteInt(-1, "len"); // We'll fill this in later - header.Serialize(boa, "header"); + if(header != null) + { + header.Serialize(boa, "header"); + } if (request != null) { request.Serialize(boa, "request"); diff --git a/src/dotnet/ZooKeeperNet/WatchedEvent.cs b/src/dotnet/ZooKeeperNet/WatchedEvent.cs index 71cf7949761..c087d55e665 100644 --- a/src/dotnet/ZooKeeperNet/WatchedEvent.cs +++ b/src/dotnet/ZooKeeperNet/WatchedEvent.cs @@ -46,7 +46,7 @@ public KeeperState State get { return state; } } - public EventType EventType + public EventType Type { get { return type; } } diff --git a/src/dotnet/ZooKeeperNet/ZooKeeper.cs b/src/dotnet/ZooKeeperNet/ZooKeeper.cs index 7d6b807fb70..ea2af563db1 100644 --- a/src/dotnet/ZooKeeperNet/ZooKeeper.cs +++ b/src/dotnet/ZooKeeperNet/ZooKeeper.cs @@ -179,6 +179,7 @@ public class States : IEquatable public static readonly States CONNECTED = new States("CONNECTED"); public static readonly States CLOSED = new States("CLOSED"); public static readonly States AUTH_FAILED = new States("AUTH_FAILED"); + public static readonly States NOT_CONNECTED = new States("NOT_CONNECTED"); private string state; @@ -216,6 +217,11 @@ public override int GetHashCode() { return (state != null ? state.GetHashCode() : 0); } + + public override string ToString() + { + return this.state; + } } private Guid id = Guid.NewGuid(); diff --git a/src/dotnet/ZooKeeperNet/ZooKeeperEx.cs b/src/dotnet/ZooKeeperNet/ZooKeeperEx.cs index 87939f55135..da46788b8bc 100644 --- a/src/dotnet/ZooKeeperNet/ZooKeeperEx.cs +++ b/src/dotnet/ZooKeeperNet/ZooKeeperEx.cs @@ -16,6 +16,7 @@ * */ using System.Collections.Generic; +using log4net; namespace ZooKeeperNet { @@ -26,6 +27,8 @@ namespace ZooKeeperNet public static class ZooKeeperEx { + private static readonly ILog LOG = LogManager.GetLogger(typeof(ZooKeeperEx)); + public static TValue GetAndRemove(this IDictionary dictionary, TKey key) { TValue value; @@ -88,8 +91,16 @@ public Disposable(Action action) public void Dispose() { - action(); - sentinel.Dispose(); + try + { + action(); + sentinel.Dispose(); + } + catch (Exception ex) + { + LOG.WarnFormat("Error disposing {0} : {1}", this.GetType().FullName, ex.Message); + } + } } diff --git a/src/dotnet/ZooKeeperNet/ZooKeeperNet.csproj b/src/dotnet/ZooKeeperNet/ZooKeeperNet.csproj index 7e6ef4e1231..c8e1f53813b 100644 --- a/src/dotnet/ZooKeeperNet/ZooKeeperNet.csproj +++ b/src/dotnet/ZooKeeperNet/ZooKeeperNet.csproj @@ -1,203 +1,203 @@ - - - - Debug - AnyCPU - 9.0.30729 - 2.0 - {5C6774FB-2350-46B2-B1DF-1CCC757C7727} - Library - Properties - ZooKeeperNet - ZooKeeperNet - v4.0 - 512 - false - ..\ZooKeeperKey.snk - - - - - 3.5 - publish\ - true - Disk - false - Foreground - 7 - Days - false - false - true - 0 - 1.0.0.%2a - false - false - true - - - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - AllRules.ruleset - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - AllRules.ruleset - - - - False - ..\lib\log4net.dll - - - - - 3.5 - - - - - - SolutionVersion.cs - - - - Code - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - False - .NET Framework 3.5 SP1 Client Profile - false - - - False - .NET Framework 3.5 SP1 - true - - - False - Windows Installer 3.1 - true - - - - + + + + Debug + AnyCPU + 9.0.30729 + 2.0 + {5C6774FB-2350-46B2-B1DF-1CCC757C7727} + Library + Properties + ZooKeeperNet + ZooKeeperNet + v4.0 + 512 + true + ..\ZooKeeperKey.snk + + + + + 3.5 + publish\ + true + Disk + false + Foreground + 7 + Days + false + false + true + 0 + 1.0.0.%2a + false + false + true + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + AllRules.ruleset + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + AllRules.ruleset + + + + False + ..\lib\log4net.dll + + + + + 3.5 + + + + + + SolutionVersion.cs + + + + Code + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + False + .NET Framework 3.5 SP1 Client Profile + false + + + False + .NET Framework 3.5 SP1 + true + + + False + Windows Installer 3.1 + true + + + + + --> \ No newline at end of file diff --git a/src/dotnet/lib/log4net.dll b/src/dotnet/lib/log4net.dll index 1e66c82abda..ffc57e11254 100644 Binary files a/src/dotnet/lib/log4net.dll and b/src/dotnet/lib/log4net.dll differ