Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SASL handshake negotiation #748

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,53 @@ func (b *Broker) responseReceiver() {
close(b.done)
}

func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
rb := &SaslHandshakeRequest{"PLAIN"}
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
if err != nil {
return err
}

err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return err
}

bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
b.correlationID++
//wait for the response
header := make([]byte, 8) // response header
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
length := binary.BigEndian.Uint32(header[:4])
payload := make([]byte, length-4)
n, err = io.ReadFull(b.conn, payload)
if err != nil {
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
b.updateIncomingCommunicationMetrics(n)
res := &SaslHandshakeResponse{}
err = versionedDecode(payload, res, 0)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
}
if res.Err != ErrNoError {
Logger.Printf("Invalid SASL Mechanism : %s\n", err.Error())
return res.Err
}
Logger.Print("Successful SASL handshake")
return nil

}

// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
//
Expand All @@ -533,6 +580,11 @@ func (b *Broker) responseReceiver() {
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
// of responding to bad credentials but thats how its being done today.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
handshakeErr := b.sendAndReceiveSASLPlainHandshake()
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
Expand Down