Skip to content

Commit

Permalink
Add SASL handshake negotiation
Browse files Browse the repository at this point in the history
This patch adds the mandatory SASL handshake for SASL negotiation.
  • Loading branch information
guillaumebreton committed Oct 20, 2016
1 parent af0513c commit 8561c1d
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func (b *Broker) Open(conf *Config) error {
}

if conf.Net.SASL.Enable {
b.connErr = b.sendAndReceiveSASLPlainHandshake()
if b.connErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return
}
b.connErr = b.sendAndReceiveSASLPlainAuth()
if b.connErr != nil {
err = b.conn.Close()
Expand Down Expand Up @@ -514,6 +519,53 @@ func (b *Broker) responseReceiver() {
close(b.done)
}

func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
rb := &SaslHandshakeRequest{"PLAIN"}
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
if err != nil {
return err
}

err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return err
}

bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
b.correlationID++
//wait for the response
header := make([]byte, 8) // response header
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
length := binary.BigEndian.Uint32(header[:4])
payload := make([]byte, length-4)
n, err = io.ReadFull(b.conn, payload)
if err != nil {
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
b.updateIncomingCommunicationMetrics(n)
res := &SaslHandshakeResponse{}
err = versionedDecode(payload, res, 0)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
}
if res.Err != ErrNoError {
Logger.Printf("Invalid SASL Mechanism : %s\n", err.Error())
return res.Err
}
Logger.Print("Successful SASL handshake")
return nil

}

// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
//
Expand All @@ -533,6 +585,12 @@ func (b *Broker) responseReceiver() {
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
// of responding to bad credentials but thats how its being done today.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {

handshakeErr := b.sendAndReceiveSASLPlainHandshake()
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
Expand Down

0 comments on commit 8561c1d

Please sign in to comment.