diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index b8dd889633e..3af15701bde 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -1036,48 +1036,13 @@ func (m *Launcher) run(ctx context.Context) (err error) { // NATS streaming server natsOpts := nats.NewDefaultServerOptions() - - // Welcome to ghetto land. It doesn't seem possible to tell NATS to initialise - // a random port. In some integration-style tests, this launcher gets initialised - // multiple times, and sometimes the port from the previous instantiation is - // still open. - // - // This atrocity checks if the port is free, and if it's not, moves on to the - // next one. This best-effort approach may still fail occasionally when, for example, - // two tests race on isAddressPortAvailable. - var total int - for { - portAvailable, err := isAddressPortAvailable(natsOpts.Host, natsOpts.Port) - if err != nil { - return err - } - if portAvailable && natsOpts.Host == "" { - // Double-check localhost to accommodate tests - time.Sleep(100 * time.Millisecond) - portAvailable, err = isAddressPortAvailable("localhost", natsOpts.Port) - if err != nil { - return err - } - } - if portAvailable { - break - } - - time.Sleep(100 * time.Millisecond) - natsOpts.Port++ - total++ - if total > 50 { - return errors.New("unable to find free port for Nats server") - } - } + natsOpts.Port = nats.RandomPort m.natsServer = nats.NewServer(&natsOpts) - m.natsPort = natsOpts.Port - if err := m.natsServer.Open(); err != nil { m.log.Error("Failed to start nats streaming server", zap.Error(err)) return err } - + m.natsPort = natsOpts.Port // updated with random port publisher := nats.NewAsyncPublisher(m.log, fmt.Sprintf("nats-publisher-%d", m.natsPort), m.NatsURL()) if err := publisher.Open(); err != nil { m.log.Error("Failed to connect to streaming server", zap.Error(err)) @@ -1557,18 +1522,6 @@ func checkForPriorVersion(ctx context.Context, log *zap.Logger, boltPath string, return nil } -// isAddressPortAvailable checks whether the address:port is available to listen, -// by using net.Listen to verify that the port opens successfully, then closes the listener. -func isAddressPortAvailable(address string, port int) (bool, error) { - if l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)); err == nil { - if err := l.Close(); err != nil { - return false, err - } - return true, nil - } - return false, nil -} - // OrganizationService returns the internal organization service. func (m *Launcher) OrganizationService() platform.OrganizationService { return m.apibackend.OrganizationService diff --git a/go.mod b/go.mod index 21c17ddda21..59282472768 100644 --- a/go.mod +++ b/go.mod @@ -125,3 +125,5 @@ require ( // if that version does not include PR 8112. In that event, someone (perhaps Mark R again) // will need to apply the change in 8112 on top of the newer version of Arrow. replace github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db => github.com/influxdata/arrow/go/arrow v0.0.0-20200917142114-986e413c1705 + +replace github.com/nats-io/nats-streaming-server v0.11.2 => github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803 diff --git a/go.sum b/go.sum index 9b1020342c7..7072b05e74f 100644 --- a/go.sum +++ b/go.sum @@ -332,6 +332,8 @@ github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/q github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= +github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803 h1:LpaVAM5Www2R7M0GJAxAdL3swBvmna8Pyzw6F7o+j04= +github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803/go.mod h1:qgAMR6M9EokX+R5X7jUQfubwBdS1tBIl4yVJ3shhcWk= github.com/influxdata/pkg-config v0.2.5 h1:iC19aXlkUPiwxjxeeKk8TT8S5s3pargNPLgZE/rvOzc= github.com/influxdata/pkg-config v0.2.5/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= @@ -426,8 +428,6 @@ github.com/nats-io/go-nats v1.7.0 h1:oQOfHcLr8hb43QG8yeVyY2jtarIaTjOv41CGdF3tTvQ github.com/nats-io/go-nats v1.7.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= github.com/nats-io/go-nats-streaming v0.4.0 h1:00wOBnTKzZGvQOFRSxj18kUm4X2TvXzv8LS0skZegPc= github.com/nats-io/go-nats-streaming v0.4.0/go.mod h1:gfq4R3c9sKAINOpelo0gn/b9QDMBZnmrttcsNF+lqyo= -github.com/nats-io/nats-streaming-server v0.11.2 h1:UCqZbfXUKs9Ejw7KiNaFZEbbiVbK7uA8jbK2TsdGbqg= -github.com/nats-io/nats-streaming-server v0.11.2/go.mod h1:RyqtDJZvMZO66YmyjIYdIvS69zu/wDAkyNWa8PIUa5c= github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= diff --git a/nats/server.go b/nats/server.go index 7e0f139ef94..f99078fe9ab 100644 --- a/nats/server.go +++ b/nats/server.go @@ -10,6 +10,13 @@ import ( const ServerName = "platform" +const ( + // RandomPort is the value for port that, when supplied, will cause the + // server to listen on a randomly-chosen available port. The resolved port + // will be reassigned to the Port field of server.Options. + RandomPort = server.RANDOM_PORT +) + var ErrNoNatsConnection = errors.New("nats connection has not been established. Call Open() first") // Server wraps a connection to a NATS streaming server