Skip to content

Commit

Permalink
Add handling for tcp register timeout leaving connection dead (#1319)
Browse files Browse the repository at this point in the history
  • Loading branch information
nvollmar authored May 8, 2024
1 parent 840b6c0 commit d0a7293
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import java.net.InetSocketAddress
import scala.collection.immutable.Seq

import org.apache.pekko
import pekko.actor.Props
import pekko.actor.{ ActorKilledException, Kill, Props }
import pekko.io.Tcp
import pekko.io.Tcp.{ Connected, PeerClosed, Register }
import pekko.io.dns.{ RecordClass, RecordType }
import pekko.io.dns.internal.DnsClient.Answer
import pekko.testkit.{ ImplicitSender, PekkoSpec, TestProbe }
import pekko.testkit.{ EventFilter, ImplicitSender, PekkoSpec, TestProbe }

class TcpDnsClientSpec extends PekkoSpec with ImplicitSender {
import TcpDnsClient._
Expand Down Expand Up @@ -107,5 +107,24 @@ class TcpDnsClientSpec extends PekkoSpec with ImplicitSender {
answerProbe.expectMsg(Answer(42, Nil))
answerProbe.expectMsg(Answer(43, Nil))
}

"fail when the connection just terminates" in {
val tcpExtensionProbe = TestProbe()
val answerProbe = TestProbe()
val connectionProbe = TestProbe()

val client = system.actorOf(Props(new TcpDnsClient(tcpExtensionProbe.ref, dnsServerAddress, answerProbe.ref)))

client ! exampleRequestMessage

tcpExtensionProbe.expectMsg(Tcp.Connect(dnsServerAddress))
connectionProbe.send(tcpExtensionProbe.lastSender, Connected(dnsServerAddress, localAddress))
connectionProbe.expectMsgType[Register]

EventFilter[ActorKilledException](occurrences = 1).intercept {
// simulate connection stopping due to register timeout => client must fail
connectionProbe.ref ! Kill
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
package org.apache.pekko.io.dns.internal

import java.net.InetSocketAddress

import org.apache.pekko
import pekko.PekkoException
import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash }
import pekko.actor.{ Actor, ActorLogging, ActorRef, Stash, Terminated }
import pekko.annotation.InternalApi
import pekko.io.Tcp
import pekko.io.dns.internal.DnsClient.Answer
Expand Down Expand Up @@ -49,6 +48,7 @@ import pekko.util.ByteString
log.debug("Connected to TCP address [{}]", ns)
val connection = sender()
context.become(ready(connection))
context.watch(connection)
connection ! Tcp.Register(self)
unstashAll()
case _: Message =>
Expand Down Expand Up @@ -80,7 +80,10 @@ import pekko.util.ByteString
}
}
case Tcp.PeerClosed =>
context.unwatch(connection)
context.become(idle)
case Terminated(`connection`) =>
throwFailure("TCP connection terminated without closing (register timeout?)", None)
}

private def parseResponse(data: ByteString) = {
Expand Down

0 comments on commit d0a7293

Please sign in to comment.