Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support hdfs kerberos #277

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,28 @@ sftp:
It only needs to be configured for hdfs data sources.

```yaml
sftp:
hdfs:
address: 192.168.0.10:8020
user: <user>
servicePrincipalName: <Kerberos Service Principal Name>
krb5ConfigFile: <Kerberos config file>
ccacheFile: <Kerberos ccache file>
keyTabFile: <Kerberos keytab file>
password: <Kerberos password>
dataTransferProtection: <Kerberos Data Transfer Protection>
disablePAFXFAST: false
path: <path of file>
```

* `address`: **Required**. The address of hdfs service.
* `user`: **Optional**. The user of hdfs service.
* `servicePrincipalName`: **Optional**. The kerberos service principal name of hdfs service when enable kerberos.
* `krb5ConfigFile`: **Optional**. The kerberos config file of hdfs service when enable kerberos, default is `/etc/krb5.conf`.
* `ccacheFile`: **Optional**. The ccache file of hdfs service when enable kerberos.
* `keyTabFile`: **Optional**. The keytab file of hdfs service when enable kerberos.
* `password`: **Optional**. The kerberos password of hdfs service when enable kerberos.
* `dataTransferProtection`: **Optional**. The data transfer protection of hdfs service.
* `disablePAFXFAST`: **Optional**. Whether to prohibit the client to use PA_FX_FAST.
* `path`: **Required**. The path of file in the sftp service.

#### batch
Expand Down
190 changes: 101 additions & 89 deletions docs/configuration-reference.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/fclairamb/ftpserverlib v0.21.0
github.com/golang/mock v1.6.0
github.com/jcmturner/gokrb5/v8 v8.4.2
github.com/jlaffaye/ftp v0.1.0
github.com/onsi/ginkgo/v2 v2.4.0
github.com/onsi/gomega v1.24.0
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/fs v0.1.0 // indirect
Expand Down
100 changes: 95 additions & 5 deletions pkg/source/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,34 @@ package source

import (
"fmt"
"os"
"os/user"
"strings"

"github.com/colinmarc/hdfs/v2"
"github.com/colinmarc/hdfs/v2/hadoopconf"
krb "github.com/jcmturner/gokrb5/v8/client"
"github.com/jcmturner/gokrb5/v8/config"
"github.com/jcmturner/gokrb5/v8/credentials"
"github.com/jcmturner/gokrb5/v8/keytab"
)

const defaultKrb5ConfigFile = "/etc/krb5.conf"

var _ Source = (*hdfsSource)(nil)

type (
HDFSConfig struct {
Address string `yaml:"address,omitempty"`
User string `yaml:"user,omitempty"`
Path string `yaml:"path,omitempty"`
Address string `yaml:"address,omitempty"`
User string `yaml:"user,omitempty"`
ServicePrincipalName string `yaml:"servicePrincipalName,omitempty"`
Krb5ConfigFile string `yaml:"krb5ConfigFile,omitempty"`
CCacheFile string `yaml:"ccacheFile,omitempty"`
KeyTabFile string `yaml:"keyTabFile,omitempty"`
Password string `yaml:"password,omitempty"`
DataTransferProtection string `yaml:"dataTransferProtection,omitempty"`
DisablePAFXFAST bool `yaml:"disablePAFXFAST,omitempty"`
Path string `yaml:"path,omitempty"`
}

hdfsSource struct {
Expand All @@ -35,7 +50,6 @@ func (s *hdfsSource) Name() string {
}

func (s *hdfsSource) Open() error {
// TODO: support kerberos
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil {
return err
Expand All @@ -45,7 +59,20 @@ func (s *hdfsSource) Open() error {
if s.c.HDFS.Address != "" {
options.Addresses = strings.Split(s.c.HDFS.Address, ",")
}
options.User = s.c.HDFS.User

if s.c.HDFS.ServicePrincipalName != "" {
options.KerberosClient, err = s.c.HDFS.getKerberosClient()
if err != nil {
return err
}

options.KerberosServicePrincipleName = s.c.HDFS.ServicePrincipalName
if s.c.HDFS.DataTransferProtection != "" {
options.DataTransferProtection = s.c.HDFS.DataTransferProtection
}
} else {
options.User = s.c.HDFS.User
}

cli, err := hdfs.NewClient(options)
if err != nil {
Expand Down Expand Up @@ -85,3 +112,66 @@ func (s *hdfsSource) Close() error {
func (c *HDFSConfig) String() string {
return fmt.Sprintf("hdfs %s %s", c.Address, c.Path)
}
func (c *HDFSConfig) getKerberosClient() (*krb.Client, error) {
krb5ConfigFile := c.Krb5ConfigFile
if krb5ConfigFile == "" {
krb5ConfigFile = os.Getenv("KRB5_CONFIG")
}
if krb5ConfigFile == "" {
krb5ConfigFile = defaultKrb5ConfigFile
}
krb5conf, err := config.Load(krb5ConfigFile)
if err != nil {
return nil, err
}

settings := []func(*krb.Settings){
krb.DisablePAFXFAST(c.DisablePAFXFAST),
}

var krb5client *krb.Client
var needLogin = true
if c.Password != "" {
krb5client = krb.NewWithPassword(c.User, krb5conf.LibDefaults.DefaultRealm, c.Password, krb5conf, settings...)
} else if c.KeyTabFile != "" {
var kt *keytab.Keytab
if kt, err = keytab.Load(c.KeyTabFile); err != nil {
return nil, err
}
krb5client = krb.NewWithKeytab(c.User, krb5conf.LibDefaults.DefaultRealm, kt, krb5conf, settings...)
} else {
ccacheFile := c.CCacheFile
if ccacheFile == "" {
ccacheFile = os.Getenv("KRB5CCNAME")
if strings.Contains(ccacheFile, ":") {
if strings.HasPrefix(ccacheFile, "FILE:") {
ccacheFile = strings.SplitN(ccacheFile, ":", 2)[1]
}
}
}

if ccacheFile == "" {
var u *user.User
if u, err = user.Current(); err != nil {
return nil, err
}
ccacheFile = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
}
var ccache *credentials.CCache
if ccache, err = credentials.LoadCCache(ccacheFile); err != nil {
return nil, err
}
krb5client, err = krb.NewFromCCache(ccache, krb5conf, settings...)
if err != nil {
return nil, err
}
needLogin = false
}

if needLogin {
if err = krb5client.Login(); err != nil {
return nil, err
}
}
return krb5client, nil
}
Loading