Skip to content

Commit

Permalink
add support for data transfer encryption via rc4 and aes
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Topol committed Jul 15, 2020
1 parent 15f6da0 commit 79a050c
Show file tree
Hide file tree
Showing 20 changed files with 1,263 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ env:
- PLATFORM=hdp2
- PLATFORM=cdh5
- PLATFORM=cdh6
- PLATFORM=cdh6 ENCRYPT_DATA_TRANSFER=true
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy ENCRYPT_DATA_TRANSFER=true
before_install:
- export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default.
install:
Expand Down
42 changes: 42 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/colinmarc/hdfs/v2/hadoopconf"
"github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
krb "gopkg.in/jcmturner/gokrb5.v7/client"
Expand Down Expand Up @@ -68,6 +69,13 @@ type ClientOptions struct {
// multi-namenode setup (for example: 'nn/_HOST'). It is required if
// KerberosClient is provided.
KerberosServicePrincipleName string
// EncryptDataTransfer specifies whether or not data transfer for datanodes
// needs to utilize encryption and thus do a negotiation to get data
// this is specified by dfs.encrypt.data.transfer in the config
EncryptDataTransfer bool
// SecureDataNode specifies whether we're protecting our data transfer
// communication via dfs.data.transfer.protection
SecureDataNode bool
}

// ClientOptionsFromConf attempts to load any relevant configuration options
Expand Down Expand Up @@ -116,6 +124,26 @@ func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options.KerberosServicePrincipleName = strings.Split(conf["dfs.namenode.kerberos.principal"], "@")[0]
}

dataTransferProt := strings.Split(strings.ToLower(conf["dfs.data.transfer.protection"]), ",")
for _, val := range dataTransferProt {
switch val {
case "privacy":
options.EncryptDataTransfer = true
fallthrough
case "integrity", "authentication":
options.SecureDataNode = true
}
}

// dfs.encrypt.data.transfer set to true overrides dfs.data.transfer.protection and
// requires both privacy and integrity for communication. If dfs.encrypt.data.transfer is
// "true" we explicitly set EncryptDataTransfer and SecureDataNode to true, regardless of
// what they already were.
if conf["dfs.encrypt.data.transfer"] == "true" {
options.EncryptDataTransfer = true
options.SecureDataNode = true
}

return options
}

Expand Down Expand Up @@ -148,6 +176,20 @@ func NewClient(options ClientOptions) (*Client, error) {
return &Client{namenode: namenode, options: options}, nil
}

func (c *Client) datanodeDialFunc(token *hadoop_common.TokenProto) func(ctx context.Context, network, addr string) (net.Conn, error) {
if c.options.EncryptDataTransfer || c.options.SecureDataNode {
return (&rpc.DatanodeSaslDialer{
Dialer: c.options.DatanodeDialFunc,
Key: c.namenode.GetEncryptionKeys(),
Privacy: c.options.EncryptDataTransfer,
Integrity: c.options.SecureDataNode,
Token: token,
}).DialContext
}

return c.options.DatanodeDialFunc
}

// New returns Client connected to the namenode(s) specified by address, or an
// error if it can't connect. Multiple namenodes can be specified by separating
// them with commas, for example "nn1:9000,nn2:9000".
Expand Down
4 changes: 4 additions & 0 deletions cmd/hdfs/test/helper.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ export HADOOP_FS=${HADOOP_FS-"hadoop fs"}
export ROOT_TEST_DIR="$BATS_TEST_DIRNAME/../../.."
export HDFS="$ROOT_TEST_DIR/hdfs"

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# stolen from https://github.com/sstephenson/rbenv/blob/master/test/test_helper.bash

flunk() {
Expand Down
5 changes: 3 additions & 2 deletions file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ func (f *FileReader) Checksum() ([]byte, error) {
paddedLength := 32
totalLength := 0
checksum := md5.New()

for _, block := range f.blocks {
cr := &rpc.ChecksumReader{
Block: block,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

err := cr.SetDeadline(f.deadline)
Expand Down Expand Up @@ -405,7 +406,7 @@ func (f *FileReader) getNewBlockReader() error {
Block: block,
Offset: int64(off - start),
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

return f.SetDeadline(f.deadline)
Expand Down
4 changes: 2 additions & 2 deletions file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ func TestFileReadDeadline(t *testing.T) {
file, err := client.Open("/_test/foo.txt")
require.NoError(t, err)

file.SetDeadline(time.Now().Add(100 * time.Millisecond))
file.SetDeadline(time.Now().Add(200 * time.Millisecond))
_, err = file.Read([]byte{0, 0})
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
_, err = file.Read([]byte{0, 0})
assert.NotNil(t, err)
}
Expand Down
7 changes: 4 additions & 3 deletions file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Client) Append(name string) (*FileWriter, error) {
Offset: int64(block.B.GetNumBytes()),
Append: true,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

err = f.blockWriter.SetDeadline(f.deadline)
Expand Down Expand Up @@ -262,12 +262,13 @@ func (f *FileWriter) startNewBlock() error {
return &os.PathError{"create", f.name, interpretException(err)}
}

block := addBlockResp.GetBlock()
f.blockWriter = &rpc.BlockWriter{
ClientName: f.client.namenode.ClientName,
Block: addBlockResp.GetBlock(),
Block: block,
BlockSize: f.blockSize,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

return f.blockWriter.SetDeadline(f.deadline)
Expand Down
4 changes: 4 additions & 0 deletions fixtures.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
set -e

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

HADOOP_FS=${HADOOP_FS-"hadoop fs"}
$HADOOP_FS -mkdir -p "/_test"
$HADOOP_FS -chmod 777 "/_test"
Expand Down
65 changes: 65 additions & 0 deletions internal/rpc/aes_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rpc

import (
"crypto/aes"
"crypto/cipher"
"net"
"time"
)

type aesConn struct {
conn net.Conn

encStream cipher.StreamWriter
decStream cipher.StreamReader
}

func newAesConn(conn net.Conn, inKey, outKey, inIv, outIv []byte) (net.Conn, error) {
c := &aesConn{conn: conn}

encBlock, err := aes.NewCipher(inKey)
if err != nil {
return nil, err
}

decBlock, err := aes.NewCipher(outKey)
if err != nil {
return nil, err
}

c.encStream = cipher.StreamWriter{S: cipher.NewCTR(encBlock, inIv), W: conn}
c.decStream = cipher.StreamReader{S: cipher.NewCTR(decBlock, outIv), R: conn}
return c, nil
}

func (d *aesConn) Close() error {
return d.conn.Close()
}

func (d *aesConn) LocalAddr() net.Addr {
return d.conn.LocalAddr()
}

func (d *aesConn) RemoteAddr() net.Addr {
return d.conn.RemoteAddr()
}

func (d *aesConn) SetDeadline(t time.Time) error {
return d.conn.SetDeadline(t)
}

func (d *aesConn) SetReadDeadline(t time.Time) error {
return d.conn.SetReadDeadline(t)
}

func (d *aesConn) SetWriteDeadline(t time.Time) error {
return d.conn.SetWriteDeadline(t)
}

func (d *aesConn) Write(b []byte) (n int, err error) {
return d.encStream.Write(b)
}

func (d *aesConn) Read(b []byte) (n int, err error) {
return d.decStream.Read(b)
}
19 changes: 6 additions & 13 deletions internal/rpc/block_write_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,33 +330,26 @@ func (s *blockWriteStream) writePacket(p outboundPacket) error {
DataLen: proto.Int32(int32(len(p.data))),
}

header := make([]byte, 6)
// Don't ask me why this doesn't include the header proto...
totalLength := len(p.data) + len(p.checksums) + 4

header := make([]byte, 6, 6+totalLength)
infoBytes, err := proto.Marshal(headerInfo)
if err != nil {
return err
}

// Don't ask me why this doesn't include the header proto...
totalLength := len(p.data) + len(p.checksums) + 4
binary.BigEndian.PutUint32(header, uint32(totalLength))
binary.BigEndian.PutUint16(header[4:], uint16(len(infoBytes)))
header = append(header, infoBytes...)
header = append(header, p.checksums...)
header = append(header, p.data...)

_, err = s.conn.Write(header)
if err != nil {
return err
}

_, err = s.conn.Write(p.checksums)
if err != nil {
return err
}

_, err = s.conn.Write(p.data)
if err != nil {
return err
}

return nil
}

Expand Down
86 changes: 81 additions & 5 deletions internal/rpc/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ type tokenChallenge struct {
algorithm string
}

// parseChallenge returns a tokenChallenge parsed from a challenge response from
// the namenode.
func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) {
func parseChallenge(challenge []byte) (*tokenChallenge, error) {
tokenChallenge := tokenChallenge{}

matched := challengeRegexp.FindAllSubmatch(auth.Challenge, -1)
matched := challengeRegexp.FindAllSubmatch(challenge, -1)
if matched == nil {
return nil, fmt.Errorf("invalid token challenge: %s", auth.Challenge)
return nil, fmt.Errorf("invalid token challenge: %s", challenge)
}

for _, m := range matched {
Expand All @@ -64,3 +62,81 @@ func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error)

return &tokenChallenge, nil
}

// parseChallengeAuth returns a tokenChallenge parsed from a challenge response from
// the namenode.
func parseChallengeAuth(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) {
return parseChallenge(auth.Challenge)
}

type cipherType uint8

const (
cipherUnknown cipherType = 0
cipherDES cipherType = 1 << iota
cipher3DES
cipherRC4
cipherRC440
cipherRC456
cipherAESCBC
)

func (c cipherType) String() string {
switch c {
case cipherDES:
return "des"
case cipher3DES:
return "3des"
case cipherRC4:
return "rc4"
case cipherRC440:
return "rc4-40"
case cipherRC456:
return "rc4-56"
case cipherAESCBC:
return "aes-cbc"
}
return ""
}

func getCipher(s string) cipherType {
switch s {
case "des":
return cipherDES
case "3des":
return cipher3DES
case "rc4":
return cipherRC4
case "rc4-40":
return cipherRC440
case "rc4-56":
return cipherRC456
case "aes-cbc":
return cipherAESCBC
}
return 0
}

func chooseCipher(cipherOpts []string) cipherType {
var avail cipherType
for _, c := range cipherOpts {
avail |= getCipher(c)
}

if avail&cipherRC4 != 0 {
return cipherRC4
}
// if Has(avail, cipher3DES) {
// return cipher3DES
// }
if avail&cipherRC456 != 0 {
return cipherRC456
}
if avail&cipherRC440 != 0 {
return cipherRC440
}
// if Has(avail, cipherDES) {
// return cipherDES
// }
return 0
}
3 changes: 1 addition & 2 deletions internal/rpc/checksum_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type ChecksumReader struct {
// UseDatanodeHostname specifies whether the datanodes should be connected to
// via their hostnames (if true) or IP addresses (if false).
UseDatanodeHostname bool
// DialFunc is used to connect to the datanodes. If nil, then
// (&net.Dialer{}).DialContext is used.
// DialFunc is used to connect to the datanodes. If nil, then (&net.Dialer{}).DialContext is used
DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)

deadline time.Time
Expand Down
Loading

0 comments on commit 79a050c

Please sign in to comment.