Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop: support conf val with unit #4655

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/utils/humanize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ func ParseBytes(ctx *cli.Context, key string, unit byte) uint64 {
if len(str) == 0 {
return 0
}
return ParseBytesStr(key, str, unit)
}

func ParseBytesStr(key, str string, unit byte) uint64 {
s := str
if c := s[len(s)-1]; c < '0' || c > '9' {
unit = c
Expand Down Expand Up @@ -56,9 +59,8 @@ func ParseBytes(ctx *cli.Context, key string, unit byte) uint64 {
val *= float64(uint64(1) << shift)
}
if err != nil {
logger.Fatalf("Invalid value \"%s\" for option \"--%s\"", str, key)
logger.Fatalf("Invalid value \"%s\" for \"%s\"", str, key)
}

return uint64(val)
}

Expand All @@ -68,6 +70,10 @@ func ParseMbps(ctx *cli.Context, key string) int64 {
return 0
}

return ParseMbpsStr(key, str)
}

func ParseMbpsStr(key, str string) int64 {
s := str
var unit byte = 'M'
if c := s[len(s)-1]; c < '0' || c > '9' {
Expand All @@ -89,9 +95,8 @@ func ParseMbps(ctx *cli.Context, key string) int64 {
}
}
if err != nil {
logger.Fatalf("Invalid value \"%s\" for option \"--%s\"", str, key)
logger.Fatalf("Invalid value \"%s\" for \"%s\"", str, key)
}

return int64(val)
}

Expand Down
131 changes: 64 additions & 67 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,50 +272,50 @@ func freeHandle(fd int) {
}

type javaConf struct {
MetaURL string `json:"meta"`
Bucket string `json:"bucket"`
StorageClass string `json:"storageClass"`
ReadOnly bool `json:"readOnly"`
NoSession bool `json:"noSession"`
NoBGJob bool `json:"noBGJob"`
OpenCache float64 `json:"openCache"`
BackupMeta int64 `json:"backupMeta"`
BackupSkipTrash bool `json:"backupSkipTrash"`
Heartbeat int `json:"heartbeat"`
CacheDir string `json:"cacheDir"`
CacheSize int64 `json:"cacheSize"`
FreeSpace string `json:"freeSpace"`
AutoCreate bool `json:"autoCreate"`
CacheFullBlock bool `json:"cacheFullBlock"`
CacheChecksum string `json:"cacheChecksum"`
CacheEviction string `json:"cacheEviction"`
CacheScanInterval int `json:"cacheScanInterval"`
CacheExpire int64 `json:"cacheExpire"`
Writeback bool `json:"writeback"`
MemorySize int `json:"memorySize"`
Prefetch int `json:"prefetch"`
Readahead int `json:"readahead"`
UploadLimit int `json:"uploadLimit"`
DownloadLimit int `json:"downloadLimit"`
MaxUploads int `json:"maxUploads"`
MaxDeletes int `json:"maxDeletes"`
SkipDirNlink int `json:"skipDirNlink"`
SkipDirMtime string `json:"skipDirMtime"`
IORetries int `json:"ioRetries"`
GetTimeout int `json:"getTimeout"`
PutTimeout int `json:"putTimeout"`
FastResolve bool `json:"fastResolve"`
AttrTimeout float64 `json:"attrTimeout"`
EntryTimeout float64 `json:"entryTimeout"`
DirEntryTimeout float64 `json:"dirEntryTimeout"`
Debug bool `json:"debug"`
NoUsageReport bool `json:"noUsageReport"`
AccessLog string `json:"accessLog"`
PushGateway string `json:"pushGateway"`
PushInterval int `json:"pushInterval"`
PushAuth string `json:"pushAuth"`
PushLabels string `json:"pushLabels"`
PushGraphite string `json:"pushGraphite"`
MetaURL string `json:"meta"`
Bucket string `json:"bucket"`
StorageClass string `json:"storageClass"`
ReadOnly bool `json:"readOnly"`
NoSession bool `json:"noSession"`
NoBGJob bool `json:"noBGJob"`
OpenCache string `json:"openCache"`
BackupMeta string `json:"backupMeta"`
BackupSkipTrash bool `json:"backupSkipTrash"`
Heartbeat string `json:"heartbeat"`
CacheDir string `json:"cacheDir"`
CacheSize string `json:"cacheSize"`
FreeSpace string `json:"freeSpace"`
AutoCreate bool `json:"autoCreate"`
CacheFullBlock bool `json:"cacheFullBlock"`
CacheChecksum string `json:"cacheChecksum"`
CacheEviction string `json:"cacheEviction"`
CacheScanInterval string `json:"cacheScanInterval"`
CacheExpire string `json:"cacheExpire"`
Writeback bool `json:"writeback"`
MemorySize string `json:"memorySize"`
Prefetch int `json:"prefetch"`
Readahead string `json:"readahead"`
UploadLimit string `json:"uploadLimit"`
DownloadLimit string `json:"downloadLimit"`
MaxUploads int `json:"maxUploads"`
MaxDeletes int `json:"maxDeletes"`
SkipDirNlink int `json:"skipDirNlink"`
SkipDirMtime string `json:"skipDirMtime"`
IORetries int `json:"ioRetries"`
GetTimeout string `json:"getTimeout"`
PutTimeout string `json:"putTimeout"`
FastResolve bool `json:"fastResolve"`
AttrTimeout string `json:"attrTimeout"`
EntryTimeout string `json:"entryTimeout"`
DirEntryTimeout string `json:"dirEntryTimeout"`
Debug bool `json:"debug"`
NoUsageReport bool `json:"noUsageReport"`
AccessLog string `json:"accessLog"`
PushGateway string `json:"pushGateway"`
PushInterval string `json:"pushInterval"`
PushAuth string `json:"pushAuth"`
PushLabels string `json:"pushLabels"`
PushGraphite string `json:"pushGraphite"`
}

func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) int64 {
Expand Down Expand Up @@ -445,8 +445,8 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
metaConf.SkipDirMtime = utils.Duration(jConf.SkipDirMtime)
metaConf.ReadOnly = jConf.ReadOnly
metaConf.NoBGJob = jConf.NoBGJob || jConf.NoSession
metaConf.OpenCache = time.Duration(jConf.OpenCache * 1e9)
metaConf.Heartbeat = time.Second * time.Duration(jConf.Heartbeat)
metaConf.OpenCache = utils.Duration(jConf.OpenCache)
metaConf.Heartbeat = utils.Duration(jConf.Heartbeat)
m := meta.NewClient(jConf.MetaURL, metaConf)
format, err := m.Load(true)
if err != nil {
Expand Down Expand Up @@ -479,10 +479,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
registerer.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registerer.MustRegister(collectors.NewGoCollector())

var interval time.Duration
if jConf.PushInterval > 0 {
interval = time.Second * time.Duration(jConf.PushInterval)
}
var interval = utils.Duration(jConf.PushInterval)
if jConf.PushGraphite != "" {
push2Graphite(jConf.PushGraphite, interval, registry, commonLabels)
}
Expand Down Expand Up @@ -517,25 +514,25 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
Compress: format.Compression,
CacheDir: jConf.CacheDir,
CacheMode: 0644, // all user can read cache
CacheSize: uint64(jConf.CacheSize << 20),
CacheSize: utils.ParseBytesStr("cache-size", jConf.CacheSize, 'M'),
FreeSpace: float32(freeSpaceRatio),
AutoCreate: jConf.AutoCreate,
CacheFullBlock: jConf.CacheFullBlock,
CacheChecksum: jConf.CacheChecksum,
CacheEviction: jConf.CacheEviction,
CacheScanInterval: time.Second * time.Duration(jConf.CacheScanInterval),
CacheExpire: time.Second * time.Duration(jConf.CacheExpire),
CacheScanInterval: utils.Duration(jConf.CacheScanInterval),
CacheExpire: utils.Duration(jConf.CacheExpire),
MaxUpload: jConf.MaxUploads,
MaxRetries: jConf.IORetries,
UploadLimit: int64(jConf.UploadLimit) * 1e6 / 8,
DownloadLimit: int64(jConf.DownloadLimit) * 1e6 / 8,
UploadLimit: utils.ParseMbpsStr("upload-limit", jConf.UploadLimit),
DownloadLimit: utils.ParseMbpsStr("download-limit", jConf.DownloadLimit),
Prefetch: jConf.Prefetch,
Writeback: jConf.Writeback,
HashPrefix: format.HashPrefix,
GetTimeout: time.Second * time.Duration(jConf.GetTimeout),
PutTimeout: time.Second * time.Duration(jConf.PutTimeout),
BufferSize: uint64(jConf.MemorySize << 20),
Readahead: jConf.Readahead << 20,
GetTimeout: utils.Duration(jConf.GetTimeout),
PutTimeout: utils.Duration(jConf.PutTimeout),
BufferSize: utils.ParseBytesStr("memory-size", jConf.MemorySize, 'M'),
Readahead: int(utils.ParseBytesStr("max-readahead", jConf.Readahead, 'M')),
}
if chunkConf.UploadLimit == 0 {
chunkConf.UploadLimit = format.UploadLimit * 1e6 / 8
Expand All @@ -561,11 +558,11 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
return nil
}
m.OnReload(func(fmt *meta.Format) {
if jConf.UploadLimit > 0 {
fmt.UploadLimit = int64(jConf.UploadLimit)
if chunkConf.UploadLimit > 0 {
fmt.UploadLimit = chunkConf.UploadLimit
}
if jConf.DownloadLimit > 0 {
fmt.DownloadLimit = int64(jConf.DownloadLimit)
if chunkConf.DownloadLimit > 0 {
fmt.DownloadLimit = chunkConf.DownloadLimit
}
store.UpdateLimit(fmt.UploadLimit, fmt.DownloadLimit)
})
Expand All @@ -574,12 +571,12 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
Meta: metaConf,
Format: *format,
Chunk: &chunkConf,
AttrTimeout: time.Millisecond * time.Duration(jConf.AttrTimeout*1000),
EntryTimeout: time.Millisecond * time.Duration(jConf.EntryTimeout*1000),
DirEntryTimeout: time.Millisecond * time.Duration(jConf.DirEntryTimeout*1000),
AttrTimeout: utils.Duration(jConf.AttrTimeout),
EntryTimeout: utils.Duration(jConf.EntryTimeout),
DirEntryTimeout: utils.Duration(jConf.DirEntryTimeout),
AccessLog: jConf.AccessLog,
FastResolve: jConf.FastResolve,
BackupMeta: time.Second * time.Duration(jConf.BackupMeta),
BackupMeta: utils.Duration(jConf.BackupMeta),
BackupSkipTrash: jConf.BackupSkipTrash,
}
if !jConf.ReadOnly && !jConf.NoSession && !jConf.NoBGJob && conf.BackupMeta > 0 {
Expand Down
33 changes: 16 additions & 17 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,35 +372,34 @@ public void initialize(URI uri, Configuration conf) throws IOException {
obj.put("noSession", Boolean.valueOf(getConf(conf, "no-session", "false")));
obj.put("noBGJob", Boolean.valueOf(getConf(conf, "no-bgjob", "false")));
obj.put("cacheDir", getConf(conf, "cache-dir", "memory"));
obj.put("cacheSize", Integer.valueOf(getConf(conf, "cache-size", "100")));
obj.put("openCache", Float.valueOf(getConf(conf, "open-cache", "0.0")));
obj.put("backupMeta", Integer.valueOf(getConf(conf, "backup-meta", "3600")));
obj.put("cacheSize", getConf(conf, "cache-size", "100"));
obj.put("openCache", getConf(conf, "open-cache", "0.0"));
obj.put("backupMeta", getConf(conf, "backup-meta", "3600"));
obj.put("backupSkipTrash", Boolean.valueOf(getConf(conf, "backup-skip-trash", "false")));
obj.put("heartbeat", Integer.valueOf(getConf(conf, "heartbeat", "12")));
obj.put("attrTimeout", Float.valueOf(getConf(conf, "attr-cache", "0.0")));
obj.put("entryTimeout", Float.valueOf(getConf(conf, "entry-cache", "0.0")));
obj.put("dirEntryTimeout", Float.valueOf(getConf(conf, "dir-entry-cache", "0.0")));
obj.put("heartbeat", getConf(conf, "heartbeat", "12"));
obj.put("attrTimeout", getConf(conf, "attr-cache", "0.0"));
obj.put("entryTimeout", getConf(conf, "entry-cache", "0.0"));
obj.put("dirEntryTimeout", getConf(conf, "dir-entry-cache", "0.0"));
obj.put("cacheFullBlock", Boolean.valueOf(getConf(conf, "cache-full-block", "true")));
obj.put("cacheChecksum", getConf(conf, "verify-cache-checksum", "full"));
obj.put("cacheEviction", getConf(conf, "cache-eviction", "2-random"));
obj.put("cacheScanInterval", Integer.valueOf(getConf(conf, "cache-scan-interval", "300")));
obj.put("cacheExpire", Integer.valueOf(getConf(conf, "cache-expire", "0")));
obj.put("metacache", Boolean.valueOf(getConf(conf, "metacache", "true")));
obj.put("cacheScanInterval", getConf(conf, "cache-scan-interval", "300"));
obj.put("cacheExpire", getConf(conf, "cache-expire", "0"));
obj.put("autoCreate", Boolean.valueOf(getConf(conf, "auto-create-cache-dir", "true")));
obj.put("maxUploads", Integer.valueOf(getConf(conf, "max-uploads", "20")));
obj.put("maxDeletes", Integer.valueOf(getConf(conf, "max-deletes", "10")));
obj.put("skipDirNlink", Integer.valueOf(getConf(conf, "skip-dir-nlink", "20")));
obj.put("skipDirMtime", getConf(conf, "skip-dir-mtime", "100ms"));
obj.put("uploadLimit", Integer.valueOf(getConf(conf, "upload-limit", "0")));
obj.put("downloadLimit", Integer.valueOf(getConf(conf, "download-limit", "0")));
obj.put("uploadLimit", getConf(conf, "upload-limit", "0"));
obj.put("downloadLimit", getConf(conf, "download-limit", "0"));
obj.put("ioRetries", Integer.valueOf(getConf(conf, "io-retries", "10")));
obj.put("getTimeout", Integer.valueOf(getConf(conf, "get-timeout", getConf(conf, "object-timeout", "5"))));
obj.put("putTimeout", Integer.valueOf(getConf(conf, "put-timeout", getConf(conf, "object-timeout", "60"))));
obj.put("memorySize", Integer.valueOf(getConf(conf, "memory-size", "300")));
obj.put("getTimeout", getConf(conf, "get-timeout", getConf(conf, "object-timeout", "5")));
obj.put("putTimeout", getConf(conf, "put-timeout", getConf(conf, "object-timeout", "60")));
obj.put("memorySize", getConf(conf, "memory-size", "300"));
obj.put("prefetch", Integer.valueOf(getConf(conf, "prefetch", "1")));
obj.put("readahead", Integer.valueOf(getConf(conf, "max-readahead", "0")));
obj.put("readahead", getConf(conf, "max-readahead", "0"));
obj.put("pushGateway", getConf(conf, "push-gateway", ""));
obj.put("pushInterval", Integer.valueOf(getConf(conf, "push-interval", "10")));
obj.put("pushInterval", getConf(conf, "push-interval", "10"));
obj.put("pushAuth", getConf(conf, "push-auth", ""));
obj.put("pushLabels", getConf(conf, "push-labels", ""));
obj.put("pushGraphite", getConf(conf, "push-graphite", ""));
Expand Down
Loading