From 79a050c631f81e8bc6f699f783a4623f17066f82 Mon Sep 17 00:00:00 2001 From: Matthew Topol Date: Wed, 1 Jul 2020 12:42:01 -0400 Subject: [PATCH] add support for data transfer encryption via rc4 and aes --- .travis.yml | 2 + client.go | 42 +++ cmd/hdfs/test/helper.bash | 4 + file_reader.go | 5 +- file_reader_test.go | 4 +- file_writer.go | 7 +- fixtures.sh | 4 + internal/rpc/aes_conn.go | 65 ++++ internal/rpc/block_write_stream.go | 19 +- internal/rpc/challenge.go | 86 +++++- internal/rpc/checksum_reader.go | 3 +- internal/rpc/digest_md5.go | 426 +++++++++++++++++++++++++++ internal/rpc/digest_md5_integrity.go | 152 ++++++++++ internal/rpc/digest_md5_privacy.go | 175 +++++++++++ internal/rpc/digest_md5_test.go | 185 ++++++++++++ internal/rpc/kerberos.go | 2 +- internal/rpc/namenode.go | 18 ++ internal/rpc/sasl_dialer.go | 42 +++ travis-setup-cdh5.sh | 23 ++ travis-setup-cdh6.sh | 27 ++ 20 files changed, 1263 insertions(+), 28 deletions(-) create mode 100644 internal/rpc/aes_conn.go create mode 100644 internal/rpc/digest_md5.go create mode 100644 internal/rpc/digest_md5_integrity.go create mode 100644 internal/rpc/digest_md5_privacy.go create mode 100644 internal/rpc/digest_md5_test.go create mode 100644 internal/rpc/sasl_dialer.go diff --git a/.travis.yml b/.travis.yml index 1bf5636b..dcdc2248 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,9 +6,11 @@ env: - PLATFORM=hdp2 - PLATFORM=cdh5 - PLATFORM=cdh6 +- PLATFORM=cdh6 ENCRYPT_DATA_TRANSFER=true - PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication - PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity - PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy +- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy ENCRYPT_DATA_TRANSFER=true before_install: - export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default. install: diff --git a/client.go b/client.go index a865b181..b1b40380 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/colinmarc/hdfs/v2/hadoopconf" + "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs" "github.com/colinmarc/hdfs/v2/internal/rpc" krb "gopkg.in/jcmturner/gokrb5.v7/client" @@ -68,6 +69,13 @@ type ClientOptions struct { // multi-namenode setup (for example: 'nn/_HOST'). It is required if // KerberosClient is provided. KerberosServicePrincipleName string + // EncryptDataTransfer specifies whether or not data transfer for datanodes + // needs to utilize encryption and thus do a negotiation to get data + // this is specified by dfs.encrypt.data.transfer in the config + EncryptDataTransfer bool + // SecureDataNode specifies whether we're protecting our data transfer + // communication via dfs.data.transfer.protection + SecureDataNode bool } // ClientOptionsFromConf attempts to load any relevant configuration options @@ -116,6 +124,26 @@ func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions { options.KerberosServicePrincipleName = strings.Split(conf["dfs.namenode.kerberos.principal"], "@")[0] } + dataTransferProt := strings.Split(strings.ToLower(conf["dfs.data.transfer.protection"]), ",") + for _, val := range dataTransferProt { + switch val { + case "privacy": + options.EncryptDataTransfer = true + fallthrough + case "integrity", "authentication": + options.SecureDataNode = true + } + } + + // dfs.encrypt.data.transfer set to true overrides dfs.data.transfer.protection and + // requires both privacy and integrity for communication. If dfs.encrypt.data.transfer is + // "true" we explicitly set EncryptDataTransfer and SecureDataNode to true, regardless of + // what they already were. + if conf["dfs.encrypt.data.transfer"] == "true" { + options.EncryptDataTransfer = true + options.SecureDataNode = true + } + return options } @@ -148,6 +176,20 @@ func NewClient(options ClientOptions) (*Client, error) { return &Client{namenode: namenode, options: options}, nil } +func (c *Client) datanodeDialFunc(token *hadoop_common.TokenProto) func(ctx context.Context, network, addr string) (net.Conn, error) { + if c.options.EncryptDataTransfer || c.options.SecureDataNode { + return (&rpc.DatanodeSaslDialer{ + Dialer: c.options.DatanodeDialFunc, + Key: c.namenode.GetEncryptionKeys(), + Privacy: c.options.EncryptDataTransfer, + Integrity: c.options.SecureDataNode, + Token: token, + }).DialContext + } + + return c.options.DatanodeDialFunc +} + // New returns Client connected to the namenode(s) specified by address, or an // error if it can't connect. Multiple namenodes can be specified by separating // them with commas, for example "nn1:9000,nn2:9000". diff --git a/cmd/hdfs/test/helper.bash b/cmd/hdfs/test/helper.bash index eaa85b82..1c8406d8 100644 --- a/cmd/hdfs/test/helper.bash +++ b/cmd/hdfs/test/helper.bash @@ -4,6 +4,10 @@ export HADOOP_FS=${HADOOP_FS-"hadoop fs"} export ROOT_TEST_DIR="$BATS_TEST_DIRNAME/../../.." export HDFS="$ROOT_TEST_DIR/hdfs" +# jdk11 is missing some APIs that the older jars here rely on +# so point at openjdk8 for now +export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 + # stolen from https://github.com/sstephenson/rbenv/blob/master/test/test_helper.bash flunk() { diff --git a/file_reader.go b/file_reader.go index 99c918f3..71f5237e 100644 --- a/file_reader.go +++ b/file_reader.go @@ -97,11 +97,12 @@ func (f *FileReader) Checksum() ([]byte, error) { paddedLength := 32 totalLength := 0 checksum := md5.New() + for _, block := range f.blocks { cr := &rpc.ChecksumReader{ Block: block, UseDatanodeHostname: f.client.options.UseDatanodeHostname, - DialFunc: f.client.options.DatanodeDialFunc, + DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()), } err := cr.SetDeadline(f.deadline) @@ -405,7 +406,7 @@ func (f *FileReader) getNewBlockReader() error { Block: block, Offset: int64(off - start), UseDatanodeHostname: f.client.options.UseDatanodeHostname, - DialFunc: f.client.options.DatanodeDialFunc, + DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()), } return f.SetDeadline(f.deadline) diff --git a/file_reader_test.go b/file_reader_test.go index 0b8b6f23..f9ac8a3e 100644 --- a/file_reader_test.go +++ b/file_reader_test.go @@ -362,11 +362,11 @@ func TestFileReadDeadline(t *testing.T) { file, err := client.Open("/_test/foo.txt") require.NoError(t, err) - file.SetDeadline(time.Now().Add(100 * time.Millisecond)) + file.SetDeadline(time.Now().Add(200 * time.Millisecond)) _, err = file.Read([]byte{0, 0}) assert.NoError(t, err) - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) _, err = file.Read([]byte{0, 0}) assert.NotNil(t, err) } diff --git a/file_writer.go b/file_writer.go index b7d0c462..3070d01b 100644 --- a/file_writer.go +++ b/file_writer.go @@ -119,7 +119,7 @@ func (c *Client) Append(name string) (*FileWriter, error) { Offset: int64(block.B.GetNumBytes()), Append: true, UseDatanodeHostname: f.client.options.UseDatanodeHostname, - DialFunc: f.client.options.DatanodeDialFunc, + DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()), } err = f.blockWriter.SetDeadline(f.deadline) @@ -262,12 +262,13 @@ func (f *FileWriter) startNewBlock() error { return &os.PathError{"create", f.name, interpretException(err)} } + block := addBlockResp.GetBlock() f.blockWriter = &rpc.BlockWriter{ ClientName: f.client.namenode.ClientName, - Block: addBlockResp.GetBlock(), + Block: block, BlockSize: f.blockSize, UseDatanodeHostname: f.client.options.UseDatanodeHostname, - DialFunc: f.client.options.DatanodeDialFunc, + DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()), } return f.blockWriter.SetDeadline(f.deadline) diff --git a/fixtures.sh b/fixtures.sh index 6ff221bc..064c51ab 100755 --- a/fixtures.sh +++ b/fixtures.sh @@ -1,5 +1,9 @@ set -e +# jdk11 is missing some APIs that the older jars here rely on +# so point at openjdk8 for now +export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 + HADOOP_FS=${HADOOP_FS-"hadoop fs"} $HADOOP_FS -mkdir -p "/_test" $HADOOP_FS -chmod 777 "/_test" diff --git a/internal/rpc/aes_conn.go b/internal/rpc/aes_conn.go new file mode 100644 index 00000000..f913fa21 --- /dev/null +++ b/internal/rpc/aes_conn.go @@ -0,0 +1,65 @@ +package rpc + +import ( + "crypto/aes" + "crypto/cipher" + "net" + "time" +) + +type aesConn struct { + conn net.Conn + + encStream cipher.StreamWriter + decStream cipher.StreamReader +} + +func newAesConn(conn net.Conn, inKey, outKey, inIv, outIv []byte) (net.Conn, error) { + c := &aesConn{conn: conn} + + encBlock, err := aes.NewCipher(inKey) + if err != nil { + return nil, err + } + + decBlock, err := aes.NewCipher(outKey) + if err != nil { + return nil, err + } + + c.encStream = cipher.StreamWriter{S: cipher.NewCTR(encBlock, inIv), W: conn} + c.decStream = cipher.StreamReader{S: cipher.NewCTR(decBlock, outIv), R: conn} + return c, nil +} + +func (d *aesConn) Close() error { + return d.conn.Close() +} + +func (d *aesConn) LocalAddr() net.Addr { + return d.conn.LocalAddr() +} + +func (d *aesConn) RemoteAddr() net.Addr { + return d.conn.RemoteAddr() +} + +func (d *aesConn) SetDeadline(t time.Time) error { + return d.conn.SetDeadline(t) +} + +func (d *aesConn) SetReadDeadline(t time.Time) error { + return d.conn.SetReadDeadline(t) +} + +func (d *aesConn) SetWriteDeadline(t time.Time) error { + return d.conn.SetWriteDeadline(t) +} + +func (d *aesConn) Write(b []byte) (n int, err error) { + return d.encStream.Write(b) +} + +func (d *aesConn) Read(b []byte) (n int, err error) { + return d.decStream.Read(b) +} diff --git a/internal/rpc/block_write_stream.go b/internal/rpc/block_write_stream.go index 397b3a4a..f1b9d7b1 100644 --- a/internal/rpc/block_write_stream.go +++ b/internal/rpc/block_write_stream.go @@ -330,33 +330,26 @@ func (s *blockWriteStream) writePacket(p outboundPacket) error { DataLen: proto.Int32(int32(len(p.data))), } - header := make([]byte, 6) + // Don't ask me why this doesn't include the header proto... + totalLength := len(p.data) + len(p.checksums) + 4 + + header := make([]byte, 6, 6+totalLength) infoBytes, err := proto.Marshal(headerInfo) if err != nil { return err } - // Don't ask me why this doesn't include the header proto... - totalLength := len(p.data) + len(p.checksums) + 4 binary.BigEndian.PutUint32(header, uint32(totalLength)) binary.BigEndian.PutUint16(header[4:], uint16(len(infoBytes))) header = append(header, infoBytes...) + header = append(header, p.checksums...) + header = append(header, p.data...) _, err = s.conn.Write(header) if err != nil { return err } - _, err = s.conn.Write(p.checksums) - if err != nil { - return err - } - - _, err = s.conn.Write(p.data) - if err != nil { - return err - } - return nil } diff --git a/internal/rpc/challenge.go b/internal/rpc/challenge.go index 5cf3d483..76875a0f 100644 --- a/internal/rpc/challenge.go +++ b/internal/rpc/challenge.go @@ -32,14 +32,12 @@ type tokenChallenge struct { algorithm string } -// parseChallenge returns a tokenChallenge parsed from a challenge response from -// the namenode. -func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) { +func parseChallenge(challenge []byte) (*tokenChallenge, error) { tokenChallenge := tokenChallenge{} - matched := challengeRegexp.FindAllSubmatch(auth.Challenge, -1) + matched := challengeRegexp.FindAllSubmatch(challenge, -1) if matched == nil { - return nil, fmt.Errorf("invalid token challenge: %s", auth.Challenge) + return nil, fmt.Errorf("invalid token challenge: %s", challenge) } for _, m := range matched { @@ -64,3 +62,81 @@ func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) return &tokenChallenge, nil } + +// parseChallengeAuth returns a tokenChallenge parsed from a challenge response from +// the namenode. +func parseChallengeAuth(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) { + return parseChallenge(auth.Challenge) +} + +type cipherType uint8 + +const ( + cipherUnknown cipherType = 0 + cipherDES cipherType = 1 << iota + cipher3DES + cipherRC4 + cipherRC440 + cipherRC456 + cipherAESCBC +) + +func (c cipherType) String() string { + switch c { + case cipherDES: + return "des" + case cipher3DES: + return "3des" + case cipherRC4: + return "rc4" + case cipherRC440: + return "rc4-40" + case cipherRC456: + return "rc4-56" + case cipherAESCBC: + return "aes-cbc" + } + return "" +} + +func getCipher(s string) cipherType { + switch s { + case "des": + return cipherDES + case "3des": + return cipher3DES + case "rc4": + return cipherRC4 + case "rc4-40": + return cipherRC440 + case "rc4-56": + return cipherRC456 + case "aes-cbc": + return cipherAESCBC + } + return 0 +} + +func chooseCipher(cipherOpts []string) cipherType { + var avail cipherType + for _, c := range cipherOpts { + avail |= getCipher(c) + } + + if avail&cipherRC4 != 0 { + return cipherRC4 + } + // if Has(avail, cipher3DES) { + // return cipher3DES + // } + if avail&cipherRC456 != 0 { + return cipherRC456 + } + if avail&cipherRC440 != 0 { + return cipherRC440 + } + // if Has(avail, cipherDES) { + // return cipherDES + // } + return 0 +} diff --git a/internal/rpc/checksum_reader.go b/internal/rpc/checksum_reader.go index 777a189d..0bdd203f 100644 --- a/internal/rpc/checksum_reader.go +++ b/internal/rpc/checksum_reader.go @@ -19,8 +19,7 @@ type ChecksumReader struct { // UseDatanodeHostname specifies whether the datanodes should be connected to // via their hostnames (if true) or IP addresses (if false). UseDatanodeHostname bool - // DialFunc is used to connect to the datanodes. If nil, then - // (&net.Dialer{}).DialContext is used. + // DialFunc is used to connect to the datanodes. If nil, then (&net.Dialer{}).DialContext is used DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) deadline time.Time diff --git a/internal/rpc/digest_md5.go b/internal/rpc/digest_md5.go new file mode 100644 index 00000000..17138483 --- /dev/null +++ b/internal/rpc/digest_md5.go @@ -0,0 +1,426 @@ +package rpc + +import ( + "bytes" + "crypto/md5" + "encoding/base64" + "encoding/binary" + "encoding/hex" + "fmt" + "hash" + "io" + "math" + "math/rand" + "net" + "strings" + + "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" + hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs" +) + +// EncryptionKey contains the information from the Namenode EncryptionKeys +// which will be used to establish sasl handshakes +type EncryptionKey struct { + KeyID int32 + ExpiryDate uint64 + BlockPoolID string + Nonce []byte + EncryptionKey []byte + EncryptionAlg string +} + +// our default auth configuration +var ( + authMethod = "TOKEN" + authMechanism = "DIGEST-MD5" + authServer = "0" + authProtocol = "hdfs" +) + +// some constants for handling the encoding and decoding of +// data for sasl communication +const ( + saslIntegrityPrefixLength = 4 + macDataLen = 4 + macHMACLen = 10 + macMsgTypeLen = 2 + macSeqNumLen = 4 +) + +var macMsgType = [2]byte{0x00, 0x01} + +// lenEncodeBytes writes the input integer encoded as a 4 byte slice. +func lenEncodeBytes(seqnum int) (out [4]byte) { + out[0] = byte((seqnum >> 24) & 0xFF) + out[1] = byte((seqnum >> 16) & 0xFF) + out[2] = byte((seqnum >> 8) & 0xFF) + out[3] = byte(seqnum & 0xFF) + return +} + +type dataTransferStatus *hdfs.DataTransferEncryptorMessageProto_DataTransferEncryptorStatus + +// sendSaslMsg is a helper function to encode and write a message for our sasl communication +func sendSaslMsg(w io.Writer, status dataTransferStatus, payload []byte, message string, secure bool) error { + msg := &hdfs.DataTransferEncryptorMessageProto{} + msg.Status = status + msg.Payload = payload + msg.Message = &message + + if secure { + // if we want a secure connection, tell the server that we want AES + opt := &hdfs.CipherOptionProto{} + opt.Suite = hdfs.CipherSuiteProto_AES_CTR_NOPADDING.Enum() + msg.CipherOption = append(msg.CipherOption, opt) + } + + data, err := makePrefixedMessage(msg) + if err != nil { + return err + } + + _, err = w.Write(data) + return err +} + +// digestMD5 holds the information for the token-digestmd5 authentication +// flow for us to handle the challenges and get the integrity and privacy +// key pairs for encoding and decoding messages for the server +type digestMD5 struct { + authID []byte + passwd string + hostname string + service string + + token *tokenChallenge + + cnonce string + cipher cipherType +} + +// convenience function to calling the a1 function for evaluating challenges +func (d *digestMD5) a1() string { + return a1(string(d.authID), d.token.realm, d.passwd, d.token.nonce, d.cnonce) +} + +// convenience function for calling the a2 function for evaluating challenges +func (d *digestMD5) a2(first bool) string { + return a2(d.service+"/"+d.hostname, d.token.qop, first) +} + +// challengeStep1 implements Using Digest Authentication as a SASAL Mechanism +// as per RFC2831. +func (d *digestMD5) challengeStep1(challenge []byte) (string, error) { + ncval := fmt.Sprintf("%08x", 1) + var err error + d.token, err = parseChallenge(challenge) + if err != nil { + return "", err + } + + d.cnonce, err = genCnonce() + if err != nil { + return "", err + } + + d.cipher = chooseCipher(d.token.cipher) + rspdigest := compute(d.a1(), d.a2(true), d.token.nonce, d.cnonce, ncval, d.token.qop) + + ret := fmt.Sprintf(`username="%s", realm="%s", nonce="%s", cnonce="%s", nc=%08x, qop=%s, digest-uri="%s/%s", response=%s, charset=utf-8`, + d.authID, d.token.realm, d.token.nonce, d.cnonce, 1, d.token.qop, d.service, d.hostname, rspdigest) + + if d.cipher != cipherUnknown { + ret += ", cipher=" + d.cipher.String() + } + return ret, nil +} + +// challengeStep2 implements Using Digest Authentication as a SASAL Mechanism +// as per RFC2831. +func (d *digestMD5) challengeStep2(challenge []byte) (string, error) { + ncval := fmt.Sprintf("%08x", 1) + rspdigest := compute(d.a1(), d.a2(false), d.token.nonce, d.cnonce, ncval, d.token.qop) + + rspauth := strings.Split(string(challenge), "=") + if rspauth[0] != "rspauth" { + return "", fmt.Errorf("Could not find rspauth in '%s'", string(challenge)) + } + if rspauth[1] != rspdigest { + return "", fmt.Errorf("rspauth did not match digest") + } + return "", nil +} + +// make this easy to mock out for testing by having it defined here where +// unit tests can override it +var genCnonce = func() (string, error) { + ret := make([]byte, 12) + if _, err := rand.Read(ret); err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(ret), nil +} + +// newDigestMD5Conn performs the necessary handshake based on the configuration and then returns +// a wrapped connection or error, in the case of our auth set to only authentication, then it just +// returns the connection unmodified after authenticating +func newDigestMD5Conn(conn net.Conn, token *hadoop_common.TokenProto, key *EncryptionKey, useSecure bool) (net.Conn, error) { + auth := &hadoop_common.RpcSaslProto_SaslAuth{} + auth.Method = &authMethod + auth.Mechanism = &authMechanism + auth.ServerId = &authServer + auth.Protocol = &authProtocol + + ourToken := &hadoop_common.TokenProto{} + ourToken.Kind = token.Kind + ourToken.Password = token.Password[:] + ourToken.Service = token.Service + ourToken.Identifier = token.GetIdentifier() + + // if the server didn't send us a Nonce, then the data isn't encrypted + // but we will still attempt to authenticate + if useSecure && len(key.Nonce) == 0 { + useSecure = false + } + + if useSecure { + nonceBase64Len := int(math.Ceil(4 * (float64(len(key.Nonce)) / 3))) + buf := new(bytes.Buffer) + buf.Grow(6 + nonceBase64Len + len(key.BlockPoolID)) + buf.WriteString(fmt.Sprintf("%d", key.KeyID)) + buf.WriteString(" ") + buf.WriteString(key.BlockPoolID) + buf.WriteString(" ") + buf.WriteString(base64.StdEncoding.EncodeToString(key.Nonce)) + ourToken.Identifier = buf.Bytes() + ourToken.Password = key.EncryptionKey + } else { + ourToken.Identifier = make([]byte, base64.StdEncoding.EncodedLen(len(token.GetIdentifier()))) + base64.StdEncoding.Encode(ourToken.Identifier, token.GetIdentifier()) + } + + dgst := digestMD5{ + authID: ourToken.Identifier, + passwd: base64.StdEncoding.EncodeToString(ourToken.Password), + hostname: auth.GetServerId(), + service: auth.GetProtocol(), + } + + // start handshake + var err error + if err = binary.Write(conn, binary.BigEndian, uint32(0xDEADBEEF)); err != nil { + return nil, err + } + + if err = sendSaslMsg(conn, hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum(), []byte{}, "", false); err != nil { + return nil, err + } + + msg := &hdfs.DataTransferEncryptorMessageProto{} + if err = readPrefixedMessage(conn, msg); err != nil { + return nil, err + } + + challengeResponse, err := dgst.challengeStep1(msg.Payload) + if err != nil { + return nil, err + } + + if err = sendSaslMsg(conn, hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum(), []byte(challengeResponse), "", useSecure); err != nil { + return nil, err + } + + if err = readPrefixedMessage(conn, msg); err != nil { + return nil, err + } + + // the result of this challenge evaluation should be an empty string + if challengeResponse, err = dgst.challengeStep2(msg.Payload); err != nil || len(challengeResponse) != 0 { + return nil, err + } + + // we're only authenticating, no RPC protection involved, so no wrapping the connection + // just return the connection as is, we're done here! + if dgst.token.qop == qopAuthentication { + return conn, nil + } + + kic, kis := generateIntegrityKeyPair(dgst.a1()) + + var d digestMD5Handler + if dgst.token.qop == qopPrivacy { + if dgst.cipher == cipherUnknown { + return nil, fmt.Errorf("Could not find implemented cipher among choices: %v", dgst.token.cipher) + } + kcc, kcs := generatePrivacyKeyPair(dgst.a1(), dgst.cipher) + d, err = newDigestMD5PrivacyConn(conn, kic[:], kis[:], kcc[:], kcs[:]) + if err != nil { + return nil, err + } + } else if dgst.token.qop == qopIntegrity { + d = newDigestMD5IntegrityConn(conn, kic[:], kis[:]) + } + + if len(msg.GetCipherOption()) > 0 { + cipher := msg.GetCipherOption()[0] + var outKey []byte + + decoded, err := d.decode(cipher.InKey) + if err != nil { + return nil, err + } + // we reuse the buffer that is returned from decoding, so we need to copy + // the decoded output so the next call to decode doesn't blow it away + inKey := make([]byte, len(decoded)) + copy(inKey, decoded) + + if outKey, err = d.decode(cipher.OutKey); err != nil { + return nil, err + } + + return newAesConn(conn, inKey, outKey, cipher.InIv, cipher.OutIv) + } + return d, nil +} + +// helper interface for decoding the cipher keys if we need to construct +// an aes connection +type digestMD5Handler interface { + net.Conn + decode(input []byte) ([]byte, error) +} + +// compute implements the computation of md5 digest authentication as per RFC2831 +// using the same terms as described in the RFC in order to make it easier to +// understand and ensure it maintains proper functionality. The response value +// computation is defined as: +// +// HEX( KD ( HEX(H(A1)), { nonce-value, ":", nc-value, ":", cnonce-value, ":", qop-value, ":", HEX(H(A2)) })) +// +// A1 = { H( { username-value, ":", realm-value, ":", passwd }), ":", nonce-value, ":", cnonce-value } +// +// If "qop" is "auth": +// +// A2 = { "AUTHENTICATE:", digest-uri-value } +// +// If "qop" is "auth-int" or "auth-conf": +// +// A2 = { "AUTHENTICATE:", digest-uri-value, ":00000000000000000000000000000000" } +// +// Where: +// +// { a, b, ... } is the concatenation of the octet strings a, b, ... +// +// H(s) is the 16 octet MD5 Hash [RFC1321] of the octet string s +// +// KD(k, s) is H({k, ":", s}), i.e., the 16 octet hash of the string k, a colon, and the string s +// +// HEX(n) is the representation of the 16 octet MD5 hash n as a string of 32 hex digits (with alphabetic characters +// in lower case) +func compute(a1, a2, nonce, cnonce, ncvalue, qop string) string { + x := hexfn(h(a1)) + y := strings.Join([]string{nonce, ncvalue, cnonce, qop, hexfn(h(a2))}, ":") + return hexfn(kd(x, y)) +} + +func h(s string) [md5.Size]byte { + return md5.Sum([]byte(s)) +} + +func kd(k, s string) [md5.Size]byte { + return h(strings.Join([]string{k, s}, ":")) +} + +func hexfn(data [md5.Size]byte) string { + return hex.EncodeToString(data[:]) +} + +func a1(username, realm, password, nonce, cnonce string) string { + x := h(strings.Join([]string{username, realm, password}, ":")) + return strings.Join([]string{string(x[:]), nonce, cnonce}, ":") +} + +func a2(digestURI, qop string, initial bool) string { + var a2 string + // when computing the initial response digest, we use the "AUTHENTICATE" piece + // but when validating the server response-auth, the "AUTHENTICATE:" string is + // left out in taht calculation to confirm the digest-response + if initial { + a2 = strings.Join([]string{"AUTHENTICATE", digestURI}, ":") + } else { + a2 = ":" + digestURI + } + if qop == qopPrivacy || qop == qopIntegrity { + a2 = a2 + ":00000000000000000000000000000000" + } + return a2 +} + +// If the server offered qop=auth-int and we replied with qop=auth-int then subsequent +// messages need to be integrity protected. The base session key is H(A1) as defined above +// this function will generate the pair of integrity keys for both client to server (kic) +// and server to client (kis) as defined in the RFC with the specified magic constants +func generateIntegrityKeyPair(a1 string) ([md5.Size]byte, [md5.Size]byte) { + clientIntMagicStr := []byte("Digest session key to client-to-server signing key magic constant") + serverIntMagicStr := []byte("Digest session key to server-to-client signing key magic constant") + + sum := h(a1) + kic := md5.Sum(append(sum[:], clientIntMagicStr...)) + kis := md5.Sum(append(sum[:], serverIntMagicStr...)) + + return kic, kis +} + +// If message integrity is negotiated, there will be a MAC block for each message appended +// to the message. The MAC block is 16 bytes, the first 10 bytes of the HMAC-MD5 of the message +// plus a 2 byte message type number and a 4 byte sequence number. This function provides the +// HMAC as defined in the RFC as HMAC(ki, {seqnum, msg})[0..9] +func getHMAC(mac hash.Hash, seq, msg []byte) []byte { + data := append(seq, msg...) + + mac.Reset() + mac.Write(data) + + hash := mac.Sum(nil) + return hash[0:10] +} + +// If the server sent a cipher-opts directive and we respond with a cipher directive, then +// subsequent messages between the client and server must be confidentiality protected via +// privacy keys. Again we use the base session key of H(A1) as defined above, and then we +// calculate the privacy keys: +// +// For the Client-to-Server: +// +// kcc = MD5({H(A1)[0..n], "Digest H(A1) to client-to-server sealing key magic constant"}) +// +// For the Server-to-Client: +// +// kcs = MD5({H(A1)[0..n], "Digest H(A1) to server-to-client sealing key magic constant"}) +// +// Where n is based on the cipher we choose: +// rc4-40: n == 5 +// rc4-56: n == 7 +// for all others n == 16 +// +// For now, I've only implemented handling for rc4, not des or 3des +func generatePrivacyKeyPair(a1 string, useCipher cipherType) ([md5.Size]byte, [md5.Size]byte) { + clientConfMagicStr := []byte("Digest H(A1) to client-to-server sealing key magic constant") + serverConfMagicStr := []byte("Digest H(A1) to server-to-client sealing key magic constant") + + sum := h(a1) + var n int + switch useCipher { + case cipherRC440: + n = 5 + case cipherRC456: + n = 7 + default: + n = md5.Size + } + + kcc := md5.Sum(append(sum[:n], clientConfMagicStr...)) + kcs := md5.Sum(append(sum[:n], serverConfMagicStr...)) + + return kcc, kcs +} diff --git a/internal/rpc/digest_md5_integrity.go b/internal/rpc/digest_md5_integrity.go new file mode 100644 index 00000000..e705359e --- /dev/null +++ b/internal/rpc/digest_md5_integrity.go @@ -0,0 +1,152 @@ +package rpc + +import ( + "bytes" + "crypto/hmac" + "crypto/md5" + "encoding/binary" + "errors" + "fmt" + "hash" + "io" + "net" + "syscall" + "time" +) + +type digestMD5IntegrityConn struct { + conn net.Conn + readDeadline time.Time + + readBuf bytes.Buffer + writeBuf bytes.Buffer + + kic []byte + kis []byte + + sendSeqNum int + readSeqNum int + + encodeMAC hash.Hash + decodeMAC hash.Hash +} + +func newDigestMD5IntegrityConn(conn net.Conn, kic, kis []byte) digestMD5Handler { + return &digestMD5IntegrityConn{ + conn: conn, + kic: kic, + kis: kis, + encodeMAC: hmac.New(md5.New, kic), + decodeMAC: hmac.New(md5.New, kis), + } +} + +func (d *digestMD5IntegrityConn) Close() error { + return d.conn.Close() +} + +func (d *digestMD5IntegrityConn) LocalAddr() net.Addr { + return d.conn.LocalAddr() +} + +func (d *digestMD5IntegrityConn) RemoteAddr() net.Addr { + return d.conn.RemoteAddr() +} + +func (d *digestMD5IntegrityConn) SetDeadline(t time.Time) error { + d.readDeadline = t + return d.conn.SetDeadline(t) +} + +func (d *digestMD5IntegrityConn) SetReadDeadline(t time.Time) error { + d.readDeadline = t + return d.conn.SetReadDeadline(t) +} + +func (d *digestMD5IntegrityConn) SetWriteDeadline(t time.Time) error { + return d.conn.SetWriteDeadline(t) +} + +func (d *digestMD5IntegrityConn) Write(b []byte) (n int, err error) { + inputLen := len(b) + seqBuf := lenEncodeBytes(d.sendSeqNum) + outputLen := macDataLen + inputLen + macHMACLen + macMsgTypeLen + macSeqNumLen + + d.writeBuf.Reset() + d.writeBuf.Grow(outputLen) + + binary.Write(&d.writeBuf, binary.BigEndian, int32(outputLen-macDataLen)) + d.writeBuf.Write(b) + + hmac := getHMAC(d.encodeMAC, seqBuf[:], b) + d.writeBuf.Write(hmac) + d.writeBuf.Write(macMsgType[:]) + binary.Write(&d.writeBuf, binary.BigEndian, int32(d.sendSeqNum)) + + d.sendSeqNum++ + wr, err := d.writeBuf.WriteTo(d.conn) + return int(wr), err +} + +// Read will decode the underlying bytes and then copy them from our +// buffer into the provided byte slice +func (d *digestMD5IntegrityConn) Read(b []byte) (int, error) { + if !d.readDeadline.IsZero() && d.readDeadline.Before(time.Now()) { + return 0, syscall.ETIMEDOUT + } + + n, err := d.readBuf.Read(b) + if len(b) == n || (err != nil && err != io.EOF) { + return n, err + } + + var sz int32 + err = binary.Read(d.conn, binary.BigEndian, &sz) + if err != nil { + return n, err + } + + d.readBuf.Reset() + d.readBuf.Grow(int(sz)) + _, err = io.CopyN(&d.readBuf, d.conn, int64(sz)) + if err != nil { + return n, err + } + + decoded, err := d.decode(d.readBuf.Bytes()) + if err != nil { + return n, err + } + + d.readBuf.Truncate(len(decoded)) + return d.readBuf.Read(b[n:]) +} + +// decode will decode a message from the server and perform the integrity +// protection check, removing the verification and mac data in what is returned +// the slice returned is an alias to the buffer and must be either used or +// copied to a new slice before calling decode again +func (d *digestMD5IntegrityConn) decode(input []byte) ([]byte, error) { + inputLen := len(input) + if inputLen < saslIntegrityPrefixLength { + return nil, fmt.Errorf("Input length smaller than the integrity prefix") + } + + seqBuf := lenEncodeBytes(d.readSeqNum) + + dataLen := inputLen - macHMACLen - macMsgTypeLen - macSeqNumLen + hmac := getHMAC(d.decodeMAC, seqBuf[:], input[:dataLen]) + + seqNumStart := inputLen - macSeqNumLen + msgTypeStart := seqNumStart - macMsgTypeLen + origHashStart := msgTypeStart - macHMACLen + + if !bytes.Equal(hmac, input[origHashStart:origHashStart+macHMACLen]) || + !bytes.Equal(macMsgType[:], input[msgTypeStart:msgTypeStart+macMsgTypeLen]) || + !bytes.Equal(seqBuf[:], input[seqNumStart:seqNumStart+macSeqNumLen]) { + return nil, errors.New("HMAC Integrity Check failed") + } + + d.readSeqNum++ + return input[:dataLen], nil +} diff --git a/internal/rpc/digest_md5_privacy.go b/internal/rpc/digest_md5_privacy.go new file mode 100644 index 00000000..f351bedb --- /dev/null +++ b/internal/rpc/digest_md5_privacy.go @@ -0,0 +1,175 @@ +package rpc + +import ( + "bytes" + "crypto/hmac" + "crypto/md5" + "crypto/rc4" + "encoding/binary" + "fmt" + "hash" + "io" + "net" + "syscall" + "time" +) + +func newDigestMD5PrivacyConn(conn net.Conn, kic, kis, kcc, kcs []byte) (digestMD5Handler, error) { + encryptor, err := rc4.NewCipher(kcc) + if err != nil { + return nil, err + } + + decryptor, err := rc4.NewCipher(kcs) + if err != nil { + return nil, err + } + + return &digestMD5PrivacyConn{ + conn: conn, + kic: kic, + kis: kis, + kcc: kcc, + kcs: kcs, + encryptor: encryptor, + decryptor: decryptor, + decodeMAC: hmac.New(md5.New, kis), + encodeMAC: hmac.New(md5.New, kic), + }, nil +} + +type digestMD5PrivacyConn struct { + conn net.Conn + readDeadline time.Time + + sendSeqNum int + readSeqNum int + + kic []byte + kis []byte + + kcc []byte + kcs []byte + + decodeMAC hash.Hash + encodeMAC hash.Hash + + decryptor *rc4.Cipher + encryptor *rc4.Cipher + + readBuf bytes.Buffer + writeBuf bytes.Buffer +} + +func (d *digestMD5PrivacyConn) Close() error { + return d.conn.Close() +} + +func (d *digestMD5PrivacyConn) LocalAddr() net.Addr { + return d.conn.LocalAddr() +} + +func (d *digestMD5PrivacyConn) RemoteAddr() net.Addr { + return d.conn.RemoteAddr() +} + +func (d *digestMD5PrivacyConn) SetDeadline(t time.Time) error { + d.readDeadline = t + return d.conn.SetDeadline(t) +} + +func (d *digestMD5PrivacyConn) SetReadDeadline(t time.Time) error { + d.readDeadline = t + return d.conn.SetReadDeadline(t) +} + +func (d *digestMD5PrivacyConn) SetWriteDeadline(t time.Time) error { + return d.conn.SetWriteDeadline(t) +} + +func (d *digestMD5PrivacyConn) Read(b []byte) (int, error) { + if !d.readDeadline.IsZero() && d.readDeadline.Before(time.Now()) { + return 0, syscall.ETIMEDOUT + } + + n, err := d.readBuf.Read(b) + if len(b) == n || (err != nil && err != io.EOF) { + return n, err + } + + var sz int32 + err = binary.Read(d.conn, binary.BigEndian, &sz) + if err != nil { + return 0, err + } + + d.readBuf.Reset() + d.readBuf.Grow(int(sz)) + _, err = io.CopyN(&d.readBuf, d.conn, int64(sz)) + if err != nil { + return n, err + } + + decoded, err := d.decode(d.readBuf.Bytes()) + if err != nil { + return n, err + } + + d.readBuf.Truncate(len(decoded)) + return d.readBuf.Read(b[n:]) +} + +func (d *digestMD5PrivacyConn) decode(input []byte) (out []byte, err error) { + inputLen := len(input) + if inputLen < saslIntegrityPrefixLength { + return nil, fmt.Errorf("Input length smaller than the integrity prefix") + } + + seqNumStart := inputLen - macSeqNumLen + msgTypeStart := seqNumStart - macMsgTypeLen + + encryptedLen := inputLen - macMsgTypeLen - macSeqNumLen + d.decryptor.XORKeyStream(input[:encryptedLen], input[:encryptedLen]) + + origHash := input[encryptedLen-macHMACLen : encryptedLen] + encryptedLen -= macHMACLen + + seqBuf := lenEncodeBytes(d.readSeqNum) + hmac := getHMAC(d.decodeMAC, seqBuf[:], input[:encryptedLen]) + + msgType := input[msgTypeStart : msgTypeStart+macMsgTypeLen] + seqNum := input[seqNumStart : seqNumStart+macSeqNumLen] + + if !bytes.Equal(hmac, origHash) || !bytes.Equal(macMsgType[:], msgType) || !bytes.Equal(seqNum, seqBuf[:]) { + return nil, fmt.Errorf("HMAC Integrity Check Failed") + } + + d.readSeqNum++ + return input[:encryptedLen], nil +} + +func (d *digestMD5PrivacyConn) Write(b []byte) (int, error) { + inputLen := len(b) + seqBuf := lenEncodeBytes(d.sendSeqNum) + + encryptedLen := inputLen + macHMACLen + outputLen := macDataLen + encryptedLen + macMsgTypeLen + macSeqNumLen + d.writeBuf.Reset() + d.writeBuf.Grow(outputLen) + + finalLength := encryptedLen + macMsgTypeLen + macSeqNumLen + binary.Write(&d.writeBuf, binary.BigEndian, int32(finalLength)) + d.writeBuf.Write(b) + + hmac := getHMAC(d.encodeMAC, seqBuf[:], b) + d.writeBuf.Write(hmac) + + toEncrypt := d.writeBuf.Bytes()[macDataLen:] + d.encryptor.XORKeyStream(toEncrypt, toEncrypt) + d.writeBuf.Write(macMsgType[:]) + binary.Write(&d.writeBuf, binary.BigEndian, int32(d.sendSeqNum)) + + d.sendSeqNum++ + n, err := d.writeBuf.WriteTo(d.conn) + return int(n), err +} diff --git a/internal/rpc/digest_md5_test.go b/internal/rpc/digest_md5_test.go new file mode 100644 index 00000000..95eb4e1f --- /dev/null +++ b/internal/rpc/digest_md5_test.go @@ -0,0 +1,185 @@ +package rpc + +import ( + "bytes" + "encoding/binary" + "net" + "testing" + + "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" + hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs" + "github.com/stretchr/testify/assert" +) + +func getTestDigest() *digestMD5 { + return &digestMD5{ + passwd: "secret", + authID: []byte("chris"), + hostname: "elwood.innosoft.com", + service: "imap", + } +} + +func TestMD5DigestResponse(t *testing.T) { + dgst := getTestDigest() + + origGenCnonce := genCnonce + genCnonce = func() (string, error) { + return "OA6MHXh6VqTrRk", nil + } + defer func() { + genCnonce = origGenCnonce + }() + + // example pulled from page 19 of RFC 2831 + challenge := `realm="elwood.innosoft.com", nonce="OA6MG9tEQGm2hh", qop="auth", algorithm=md5-sess, charset=utf-8, cipher="rc4"` + ret, err := dgst.challengeStep1([]byte(challenge)) + if assert.NoError(t, err) { + assert.Equal(t, `username="chris", realm="elwood.innosoft.com", nonce="OA6MG9tEQGm2hh", cnonce="OA6MHXh6VqTrRk", nc=00000001, qop=auth, digest-uri="imap/elwood.innosoft.com", response=d388dad90d4bbd760a152321f2143af7, charset=utf-8, cipher=rc4`, ret) + assert.Equal(t, cipherRC4, dgst.cipher) + } +} + +func TestMD5DigestRspAuth(t *testing.T) { + dgst := getTestDigest() + + // setup state as it would be after the first challenge + dgst.token = &tokenChallenge{ + algorithm: "md5-sess", + charset: "utf-8", + nonce: "OA6MG9tEQGm2hh", + qop: qopAuthentication, + realm: "elwood.innosoft.com", + } + dgst.cnonce = "OA6MHXh6VqTrRk" + + // evaluate the rspauth as per the example in RFC 2831 + ret, err := dgst.challengeStep2([]byte("rspauth=ea40f60335c427b5527b84dbabcdfffd")) + if assert.NoError(t, err) { + assert.Empty(t, ret) + } +} + +func TestDigestMD5Conn(t *testing.T) { + // Let's use a sample captured sasl handshake complete with cipher key + // to ensure that our handshake is working properly and that we get the + // proper encrypted data back and forth + key := &EncryptionKey{} + key.EncryptionKey = []byte{ + 0x4d, 0xed, 0xaa, 0xd4, 0xf0, 0xf8, 0xec, 0x7d, + 0xfd, 0xf7, 0x76, 0xaf, 0xbc, 0x93, 0xba, 0x8e, + 0xd1, 0xc3, 0xb3, 0xb7} + key.Nonce = []byte{0x79, 0x0c, 0xc3, 0xa6, 0x31, 0x7f, 0x5b, 0xd7} + key.KeyID = 388373981 + key.BlockPoolID = "BP-529865118-10.129.176.136-1582635112897" + + empty := "" + blockKind := "HDFS_BLOCK_TOKEN" + token := &hadoop_common.TokenProto{} + token.Kind = &blockKind + token.Service = &empty + + origGenCnonce := genCnonce + genCnonce = func() (string, error) { + return "dqNZ/hGooPsuK3iWPeDFeQ==", nil + } + defer func() { + genCnonce = origGenCnonce + }() + + server, client := net.Pipe() + go func() { + b := make([]byte, 4) + server.Read(b) + + d := new(uint32) + binary.Read(bytes.NewReader(b), binary.BigEndian, d) + // handshake starts by first passing 0xDEADBEEF to the server + assert.Equal(t, uint32(0xDEADBEEF), *d) + + var err error + msg := &hdfs.DataTransferEncryptorMessageProto{} + err = readPrefixedMessage(server, msg) + assert.NoError(t, err) + + // the server then responds with the initial challenge + err = sendSaslMsg(server, hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum(), + []byte(`username="388373981 BP-529865118-10.129.176.136-1582635112897 eQzDpjF/W9c=", realm="0", nonce="8iQSCAmYohP0K4dBX4Z2cxYC4CFJjfVp3aATEHNN", qop="auth-conf", cipher="rc4"`), + "", false) + + assert.NoError(t, err) + + err = readPrefixedMessage(server, msg) + assert.NoError(t, err) + // our client should respond appropriately with the correct challenge response + assert.Equal(t, `username="388373981 BP-529865118-10.129.176.136-1582635112897 eQzDpjF/W9c=", realm="0", nonce="8iQSCAmYohP0K4dBX4Z2cxYC4CFJjfVp3aATEHNN", cnonce="dqNZ/hGooPsuK3iWPeDFeQ==", nc=00000001, qop=auth-conf, digest-uri="hdfs/0", response=c4669d46e21197923d3e98e53e6dd543, charset=utf-8, cipher=rc4`, + string(msg.Payload)) + + // finally the server responds with a rspauth and the cipher information + msg.Status = hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum() + msg.Payload = []byte("rspauth=830abc648a95a91e9ff1d594cdbca222") + opt := &hdfs.CipherOptionProto{} + opt.Suite = hdfs.CipherSuiteProto_AES_CTR_NOPADDING.Enum() + // these are the encoded cipher keys, InKey and OutKey will need to be + // decoded by the client before they can be used + opt.InKey = []byte{ + 0xbb, 0x5e, 0xcf, 0x32, 0x55, 0xe7, 0x59, 0x5b, + 0xe5, 0xf9, 0xd7, 0xd2, 0x1e, 0x29, 0xb8, 0xeb, + 0x04, 0x93, 0x8b, 0x74, 0x58, 0xbd, 0x77, 0x79, + 0x8f, 0xfd, 0xf2, 0xe3, 0xb9, 0xbd, 0x70, 0xa7, + 0x3b, 0xbc, 0xf4, 0xa2, 0xf3, 0xa1, 0x8a, 0x51, + 0x83, 0x3e, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00} + opt.InIv = []byte{ + 0xc9, 0x50, 0x0c, 0xa0, 0xcc, 0x10, 0x13, 0x37, + 0x06, 0x21, 0x1e, 0x76, 0xf8, 0x64, 0xea, 0x37, + } + opt.OutKey = []byte{ + 0x63, 0x50, 0x62, 0xfe, 0x18, 0xed, 0xb9, 0xf6, + 0x27, 0x92, 0x45, 0x6f, 0xa6, 0xdc, 0x9c, 0x6e, + 0x71, 0x5e, 0x4a, 0xcb, 0x92, 0x97, 0xa4, 0xcb, + 0xa1, 0x56, 0xe3, 0x4f, 0x25, 0x5d, 0xfb, 0xd1, + 0x65, 0x81, 0x12, 0xe5, 0xd9, 0xe0, 0x12, 0x33, + 0x53, 0xef, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + } + opt.OutIv = []byte{ + 0xe2, 0xcb, 0xcd, 0xe2, 0x03, 0x20, 0x8e, 0x37, + 0x74, 0x02, 0x11, 0x66, 0x66, 0x9c, 0xd9, 0xa0, + } + msg.CipherOption = []*hdfs.CipherOptionProto{opt} + + data, _ := makePrefixedMessage(msg) + server.Write(data) + + actualInKey := []byte{ + 0xe6, 0xfb, 0x59, 0xb1, 0x7e, 0xd7, 0xdf, 0x11, + 0x3a, 0xf3, 0xac, 0x62, 0xef, 0xc0, 0x86, 0x3d, + 0x92, 0x74, 0x7d, 0xd9, 0x3f, 0xae, 0xbc, 0x62, + 0xf2, 0xb5, 0x68, 0x7b, 0x10, 0x6f, 0xa3, 0x53, + } + actualInIv := opt.OutIv + actualOutKey := []byte{ + 0x7b, 0x91, 0xb6, 0x66, 0x60, 0xab, 0xff, 0x8c, + 0x80, 0x48, 0xe2, 0x0c, 0xef, 0x24, 0x0c, 0xc9, + 0x0b, 0xc5, 0xd7, 0x92, 0x14, 0x9c, 0x6f, 0xea, + 0xb9, 0x12, 0x1a, 0x48, 0xc4, 0x85, 0x5f, 0x43, + } + actualOutIv := opt.InIv + + wrapped, _ := newAesConn(server, actualInKey, actualOutKey, actualInIv, actualOutIv) + n, err := wrapped.Read(b) + assert.NoError(t, err) + assert.Equal(t, 4, n) + + assert.True(t, bytes.Equal(b, []byte{0xDE, 0xAD, 0xBE, 0xEF})) + + server.Close() + }() + + wrapped, err := newDigestMD5Conn(client, token, key, true) + assert.NoError(t, err) + + n, err := wrapped.Write([]byte{0xDE, 0xAD, 0xBE, 0xEF}) + assert.NoError(t, err) + assert.Equal(t, 4, n) + wrapped.Close() +} diff --git a/internal/rpc/kerberos.go b/internal/rpc/kerberos.go index 2244fa15..72c9e861 100644 --- a/internal/rpc/kerberos.go +++ b/internal/rpc/kerberos.go @@ -56,7 +56,7 @@ func (c *NamenodeConnection) doKerberosHandshake() error { } if tokenAuth != nil { - challenge, err := parseChallenge(tokenAuth) + challenge, err := parseChallengeAuth(tokenAuth) if err != nil { return err } diff --git a/internal/rpc/namenode.go b/internal/rpc/namenode.go index c5767f2a..330c488d 100644 --- a/internal/rpc/namenode.go +++ b/internal/rpc/namenode.go @@ -223,6 +223,24 @@ func (c *NamenodeConnection) Execute(method string, req proto.Message, resp prot return nil } +// GetEncryptionKeys will use the `getDataEncryptionKey` operation on the +// namenode in order to fetch the current data encryption keys +func (c *NamenodeConnection) GetEncryptionKeys() *EncryptionKey { + req := &hdfs.GetDataEncryptionKeyRequestProto{} + resp := &hdfs.GetDataEncryptionKeyResponseProto{} + + c.Execute("getDataEncryptionKey", req, resp) + + enc := &EncryptionKey{} + enc.KeyID = int32(resp.DataEncryptionKey.GetKeyId()) + enc.ExpiryDate = resp.DataEncryptionKey.GetExpiryDate() + enc.BlockPoolID = resp.DataEncryptionKey.GetBlockPoolId() + enc.Nonce = resp.DataEncryptionKey.GetNonce() + enc.EncryptionKey = resp.DataEncryptionKey.GetEncryptionKey() + enc.EncryptionAlg = resp.DataEncryptionKey.GetEncryptionAlgorithm() + return enc +} + // A handshake packet: // +-----------------------------------------------------------+ // | Header, 4 bytes ("hrpc") | diff --git a/internal/rpc/sasl_dialer.go b/internal/rpc/sasl_dialer.go new file mode 100644 index 00000000..4dbe132d --- /dev/null +++ b/internal/rpc/sasl_dialer.go @@ -0,0 +1,42 @@ +package rpc + +import ( + "context" + "net" + + "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" +) + +// DatanodeSaslDialer is a dialer that will use the Privacy and Integrity +// data members in order to decide whether or not it needs to wrap the connection +// to the datanode for proper handling of encryption / integrity protection +// after it's been constructed it can be used as one would use net.Dialer +type DatanodeSaslDialer struct { + Dialer func(ctx context.Context, network, addr string) (net.Conn, error) + Key *EncryptionKey + Privacy bool + Integrity bool + Token *hadoop_common.TokenProto +} + +// DialContext fits the Dialer interface as per net.Dialer, if the Dialer provided +// is nil, then (&net.Dialer{}).DialContext will be used as the underlying connection +// to the datanode, afterwards if we are configured for privacy / integrity we will +// wrap the connection object here after performing the handshake so that any consumers +// of the connection don't have to care about the encryption / integrity protection +func (d *DatanodeSaslDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + if d.Dialer == nil { + d.Dialer = (&net.Dialer{}).DialContext + } + + conn, err := d.Dialer(ctx, network, addr) + if err != nil { + return nil, err + } + + if d.Privacy || d.Integrity { + return newDigestMD5Conn(conn, d.Token, d.Key, d.Privacy) + } + + return conn, err +} diff --git a/travis-setup-cdh5.sh b/travis-setup-cdh5.sh index defde3d3..ad89a831 100755 --- a/travis-setup-cdh5.sh +++ b/travis-setup-cdh5.sh @@ -3,6 +3,12 @@ set -e KERBEROS=${KERBEROS-"false"} +ENCRYPT_DATA_TRANSFER=${ENCRYPT_DATA_TRANSFER:-"false"} +if [ "$ENCRYPT_DATA_TRANSFER" = "true" ]; then + KERBEROS="true" # must enable kerberos for encrypted data transfer + ENCRYPT_DATA_TRANSFER_ALG="rc4" + ENCRYPT_DATA_TRANSFER_CIPHER="AES/CTR/NoPadding" +fi UBUNTU_CODENAME=$(lsb_release -c | awk '{print $2}') @@ -144,6 +150,18 @@ sudo tee /etc/hadoop/conf.gohdfs/hdfs-site.xml <ignore.secure.ports.for.testing true + + dfs.encrypt.data.transfer + $ENCRYPT_DATA_TRANSFER + + + dfs.encrypt.data.transfer.algorithm + $ENCRYPT_DATA_TRANSFER_ALG + + + dfs.encrypt.data.transfer.cipher.suites + $ENCRYPT_DATA_TRANSFER_CIPHER + EOF @@ -156,6 +174,11 @@ sudo -u hdfs hdfs namenode -format -nonInteractive sudo adduser travis hadoop +if [ $ENCRYPT_DATA_TRANSFER = "true" ]; then + sudo apt-get install -y --allow-unauthenticated hadoop-kms hadoop-kms-server + sudo service hadoop-kms-server restart +fi + sudo service hadoop-hdfs-datanode restart sudo service hadoop-hdfs-namenode restart diff --git a/travis-setup-cdh6.sh b/travis-setup-cdh6.sh index 2ad9b2db..7d3e06d1 100755 --- a/travis-setup-cdh6.sh +++ b/travis-setup-cdh6.sh @@ -3,6 +3,12 @@ set -e KERBEROS=${KERBEROS-"false"} +ENCRYPT_DATA_TRANSFER=${ENCRYPT_DATA_TRANSFER:-"false"} +if [ "$ENCRYPT_DATA_TRANSFER" = "true" ]; then + KERBEROS="true" # kerberos has to be enabled for encrypted data transfer + ENCRYPT_DATA_TRANSFER_ALG="rc4" + ENCRYPT_DATA_TRANSFER_CIPHER="AES/CTR/NoPadding" +fi UBUNTU_CODENAME=$(lsb_release -cs) UBUNTU_VERSION=$(lsb_release -rs | sed s/\\.//) @@ -146,6 +152,22 @@ sudo tee /etc/hadoop/conf.gohdfs/hdfs-site.xml <ignore.secure.ports.for.testing true + + dfs.data.transfer.protection + $RPC_PROTECTION + + + dfs.encrypt.data.transfer + $ENCRYPT_DATA_TRANSFER + + + dfs.encrypt.data.transfer.algorithm + $ENCRYPT_DATA_TRANSFER_ALG + + + dfs.encrypt.data.transfer.cipher.suites + $ENCRYPT_DATA_TRANSFER_CIPHER + EOF @@ -158,6 +180,11 @@ sudo -u hdfs hdfs namenode -format -nonInteractive sudo adduser travis hadoop +if [ $ENCRYPT_DATA_TRANSFER = "true" ]; then + sudo apt-get install -y --allow-unauthenticated hadoop-kms hadoop-kms-server + sudo service hadoop-kms-server restart +fi + sudo service hadoop-hdfs-datanode restart sudo service hadoop-hdfs-namenode restart