From bbe00fcbdfa79bcee7139cb28c2c609e58290115 Mon Sep 17 00:00:00 2001 From: zh-jn Date: Wed, 19 Jan 2022 10:05:40 +0800 Subject: [PATCH] update 46 version modifications --- hadoop-huaweicloud/.gitignore | 1 - hadoop-huaweicloud/README.md | 566 ++++++----- hadoop-huaweicloud/pom.xml | 893 +++++++++--------- .../fs/obs/DefaultOBSClientFactory.java | 4 + .../hadoop/fs/obs/OBSBlockOutputStream.java | 40 +- .../apache/hadoop/fs/obs/OBSConstants.java | 22 +- .../apache/hadoop/fs/obs/OBSFileSystem.java | 124 ++- .../hadoop/fs/obs/OBSObjectBucketUtils.java | 7 +- .../hadoop/fs/obs/OBSPosixBucketUtils.java | 4 +- .../hadoop/fs/obs/security/AccessType.java | 8 + .../fs/obs/security/AuthorizeProvider.java | 17 + .../security/DelegationTokenCapability.java | 16 + .../security/OBSAuthorizationException.java | 24 + .../services/org.apache.hadoop.fs.FileSystem | 32 +- 14 files changed, 987 insertions(+), 771 deletions(-) delete mode 100644 hadoop-huaweicloud/.gitignore create mode 100644 hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AccessType.java create mode 100644 hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AuthorizeProvider.java create mode 100644 hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/DelegationTokenCapability.java create mode 100644 hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/OBSAuthorizationException.java diff --git a/hadoop-huaweicloud/.gitignore b/hadoop-huaweicloud/.gitignore deleted file mode 100644 index ae3c172..0000000 --- a/hadoop-huaweicloud/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/bin/ diff --git a/hadoop-huaweicloud/README.md b/hadoop-huaweicloud/README.md index 4396722..31d23d2 100644 --- a/hadoop-huaweicloud/README.md +++ b/hadoop-huaweicloud/README.md @@ -1,287 +1,279 @@ -### hadoop-obs 使用指南 -见华为云OBS服务官方文档:https://support.huaweicloud.com/bestpractice-obs/obs_05_1507.html - ----------------------------------------- - -### hadoop-obs 源码编译指南 -示例:mvn clean install -Pdist -Dhadoop.plat.version=3.1.1 -Dhadoop.version=3.1.1 -Dmaven.test.skip=true - -- Pdist:将hadoop-obs依赖的obs java sdk以及okttp等依赖进行了shade,方便部署安装使用 -- Dhadoop.version:定义了依赖的hadoop版本,目前仅支持依赖hadoop-2.8.x及以上版本 -- Dhadoop.plat.version:定义了jar的命名规范,和-Dhadoop.version保持一致即可 -- jar包命名规范:hadoop-huaweicloud-x.x.x-hw-y.jar包含义:前三位x.x.x为依赖的hadoop版本;最后一位y为hadoop-obs版本,例如:hadoop-huaweicloud-3.1.1-hw-45.jar,3.1.1是配套的hadoop版本,45是hadoop-obs的版本 - ----------------------------------------- - -### hadoop-obs release -Version 3.1.1.45/2.8.3.45/2.7.2.45 - -修复问题: -1. 【功能】新增预读策略,优化顺序读性能 -2. 【优化】修复文件桶追加输出流的getPos语义的正确性 -3. 【优化】将文件桶truncate接口中的HadoopIllegalArgumentException异常封装为IOException -4. 【优化】快速删除trash目标对象存在时,添加时间戳后缀到ms级,并按最大时间做冲突重试 -5. 【优化】文件桶递归删除目录409冲突时按最大时间重试 -6. 【优化】create接口添加文件立即可见性开关,默认关闭 -7. 【优化】去掉HDFSWrapper中的默认shema必须为hdfs的检查 -8. 【优化】升级obs sdk版本为3.21.4.1,修复列举操作时最后文件的lastmodifytime字段不正确 -9. 【优化】修改HDFSWrapper中的rename抛出异常为IOException - -========================================================================= - -Version 3.1.1.43/2.8.3.43/2.7.2.43 - -修复问题: -1. 【功能】新增OBSWrapper特性,用于支持已hdfs schema方式对接OBS -2. 【功能】针对对象桶修复目录过大rename性能问题 -3. 【功能】修复快速删除功能trash时并发冲突问题 -4. 【用例】补充OBSWrapper功能用例 -5. 【用例】补充trash并发冲突场景用例 - -========================================================================= - -Version 3.1.1.42/2.8.3.42/2.7.2.42 - -修复问题: -1. 【功能】解决本地写缓存文件名字符串,进程退出才清理导致的OOM问题 -2. 【功能】替换依赖的hadoop-common2.8.3为修复安全漏洞的2.8.3.0101-hw-ei-14 -3. 【功能】Metrics统计删除冗余的参数,并提供close接口用于退出时释放资源 -4. 【功能】排除hadoop-common中对jetty非安全包的依赖 - -========================================================================= - -Version 3.1.1.41/2.8.3.41/2.7.2.41 - -修复问题: -1. 【功能】支持public接口的metrics统计 -2. 【功能】调用create()接口后,文件立即可见 -3. 【功能】支持FileSystem/InputStream/OutputStream关闭后拒绝接口访问 -4. 【功能】finalize()接口不再调用带I/O操作的close(),以防止上层误用 -5. 【功能】pom文件依赖排除hadoop-common 3.1.1引入的jackson-databind不合规版本 -6. 【功能】实现getCanonicalServiceName接口,用于提升HBase BulkLoad场景同一桶内加载性能 -7. 【用例】补充完善重试机制测试用例 - -========================================================================= - -Version 3.1.1.40/2.8.3.40/2.7.2.40 - -修复问题: -1. 【功能】接口语义排查,按原生HDFS语义实现 -2. 【功能】开源社区规范整改 -3. 【功能】truncate功能支持 -4. 【功能】目录名不带/细粒度授权policy支持 -5. 【功能】递归列举接口未更新目录lasymodifyTime问题修复 -6. 【功能】重试机制增强,最大支持180s时间重试 -7. 【功能】初始化支持探测写缓存的可访问性,开关控制,默认关闭 -8. 【功能】OBS JAVA SDK调整为3.20.6.1版本 - -========================================================================= - -Version 3.1.1.39/2.8.3.39/2.7.2.39 - -修复问题: -1. 【功能】静态检查和安全扫描整改 -2. 【功能】接口测试用例补充 - - -========================================================================= - - -Version 3.1.1.38/2.8.3.38/2.7.2.38 - -修复问题: -1. 【功能】修改listStatus接口返回的目录的lastModifyTime为准确值 - - -========================================================================= - - -Version 3.1.1.37/2.8.3.37/2.7.2.37 - -修复问题: -1. 【功能】seekInStream方法中skip改为while循环,及去掉available判断 -2. 【功能】ObsClient及socket buffer大小改为256KB -3. 【功能】OBS SDK 连接池优化开关可配置 -4. 【功能】OBSBlockOutputStream日志打印优化 - - -========================================================================= - -Version 3.1.1.36/2.8.3.36/2.7.2.36 - -修复问题: -1. 【功能】eSDK鉴权协商增加开关,默认关闭 -2. 【功能】初始化时head bucket增加异常重试逻辑 -3. 【功能】onReadFailure中reopen异常时增加重试逻辑 -4. 【功能】append stream在客户端记录文件长度,修复可能的异常 -5. 【功能】增加递归listStatus接口,提升大目录列举性能 -6. 【功能】trash对象时,如果trash目录下存在同名对象,新对象时间戳后缀不带冒号 -7. 【用例】 - (1)增加用例验证eSDK鉴权协商开关有效性 - (2)增加head bucket重试机制测试用例 - (3)增加onReadFailure中reopen重试机制测试用例 - (4)增加head文件长度无效时,append stream用例 - (5)增加listStatus递归列举用例 - (6)增加trash文件和trash目录的测试用例 - - -========================================================================= - -Version 3.1.1.35/2.8.3.35/2.7.2.35 - -修复问题: -1. 【功能】OBSFileInputStream.java中文件桶的rename()接口新增重试逻辑 - - -========================================================================= - -Version 3.1.1.33/2.8.3.33/2.7.2.33 - -修复问题: -1. 【功能】OBSBlockOutputStream finalize方法中关闭流,清理写缓存文件 -2. 【功能】解决listFiles接口递归列举时遇到子目录抛空指针异常问题 -3. 【功能】合入eSDK3.19.11.1版本,解决EcsObsCredentialsProvider偶现请求失败问题 -4. 【用例】 - (1)增加listFiles接口测试用例 - - -========================================================================= - -Version 3.1.1.32/2.8.3.32/2.7.2.32 - -修复问题: -1. 【功能】list超时客户端并发列举优化 -2. 【功能】read(ByteBuffer buffer)接口重载 -3. 【功能】对象桶rename接口部分场景未按HDFS原生语义实现修复 -4. 【功能】OBSFileInputStream.java中四个参数配置开关,默认走父类方法 -5. 【功能】fs.obs.read.buff.size和fs.obs.write.buff.size默认值调整为64KB -6. 【功能】readaheadInputStreamEnabled预读相关代码注释 -7. 【用例】 - (1)对象桶rename增加四个用例 - (2)增加read(ByteBuffer buffer)接口测试用例 - (3)增加四个参数read接口开关测试用例 - - -========================================================================= - -Version 3.1.1.31/2.8.3.31/2.7.2.31 - -修复问题: -1. 【功能】OBSFileInputStream.java中带四个参数read接口未按HDFS语义实现及返回长度bug修复 -2. 【功能】list和head接口返回对象最后修改时间不一致问题修复 -3. 【功能】Rename接口源和目的相同时,返回值未按原生HDFS语义实现问题修复 -4. 【功能】递归删除BUG解决 -5. 【用例】 - (1)完善getContentSummary用例,增加对递归删除的校验 - (2)增加四个参数read接口对不同入参处理和返回值判断用例 - (3)增加list和head对应用返回对象最后修改时间比较用例 - (4)增加rename接口源和目的相同时的用例 - - -========================================================================= - -Version 3.1.1.30/2.8.3.30/2.7.2.30 - -修复问题: -1. 【功能】OBSFileSystem.java中接口getFileStatus、copyFile、listObjects、continueListObjects增加异常重试逻辑 -2. 【功能】OBSFileSystem.java中renameFolder中删除源目录实现由异步删除为批删 -3. 【功能】OBSInputStream.java中read相关的3个接口增加异常重试逻辑 -4. 【功能】对象桶getContentSummary性能优化 -5. 【功能】支持uper jar包编译 -6. 【用例】 - (1)增加getContentSummary用例 - (2)增加重试接口mock用例 - - -========================================================================= - -Version 3.1.1.29/2.8.3.27/2.7.2.29 - -修复问题: -1. 【功能】修改fs.obs.readahead.range默认值为1MB; -2. 【功能】当fs.obs.fast.upload.buffer为array时,申请的第一个block的大小默认修改为1MB,加了参数fs.obs.fast.upload.array.first.buffer; -3. 【功能】OBSBlockOutputStream增加多段上传过程中异常快速抛出,异常后,write、close都抛异常; -4. 【功能】OBSInputStream在seekInStream方法中,incrementBytesRead修改读取数据的值为真实skipped的字节数; -5. 【功能】fs.obs.connection.establish.timeout和fs.obs.connection.timeout默认值修改为120S -6. 【功能】修改deleteObejcts()接口,批删失败转单点删除,单点删除失败重试3次; -7. 【功能】修改批删触发条件,当开启enableMultiObjectsDelete开关,并且删除对象数大于等于3时才走批删 -8. 【用例】 - (1)增加批量删除用例一个,覆盖批量请求目标对象数小于100、1000~2000、大于2000场景 - (2)增加array的block用例1个和putPart快速失败用例1个,覆盖array的第一个block size为默认1MB,以及覆盖putPart快速失败用例。 - (3)增加readahead默认值为1MB用例1个 - - -========================================================================= - -Version 3.1.1.28/2.8.3.28/2.7.2.28 - -修复问题: -1. 【功能】copyFromLocalFile对目录支持,直接调用super的copyFromLocalFile方法。 -3. 【用例】增加copyFromLocalFile目录的用例 - -========================================================================= - -Version 3.1.1.27/2.8.3.27/2.7.2.27 - -修复问题: -1. 【功能】增加匿名访问方式的obsclient初始化。 -2. 【功能】将23版本的KMS和26版本的append stream只走posix桶的append方法合并。 -3. 【用例】(1)增加MainTest方法,后面流水线就通过执行该方法完成跑自动化用例。 - (2)增加assembly.xml,在编译test时,需要使用assemly方式将test打包。 - 打包方法:“compile assembly:single -Ddescriptor=E:\bigdata\obs_bigdataonobs\hadoop-tools\hadoop-huaweicloud\src\test\resources\assembly.xml --settings C:\Users\user\apache-maven-3.5.4\conf\settings_cloudmonitor.xml” - 跑用例:java -cp hadoop-huaweicloud-2.8.3.26-assembly.jar org.apache.hadoop.fs.obs.MainTest - -========================================================================= - -Version 2.8.3.26/2.7.2.26 补丁版本 - -修复问题: -1. 【功能】针对posix桶,在rename被使用为move场景,需要加上move的key。修改方式为:依照S3方法,先做HEAD,再调rename接口。 - -========================================================================= - -Version 2.8.3.25/2.7.2.25 补丁版本 - -修复问题: -1. 【功能】修改obsblockoutputstream的append模式,在大于阈值时,直接调用appendFS接口,不再走多段上传; -2. 【用例】新增append和hflush场景的4个用例: - (1). hflush大对象(20MB) - (2). hflush中对象(5MB) - (3). hflush小对象(2MB) - (4). appendstream根据缓存大小做了4次append操作; -3. 【优化】优化DELETE在409情况下,3次重试机制; -4. 【遗留问题】修改obsclient初始化方式,在credential provider\aksk\security provider都没有设置时,使用匿名方式初始化obsclient。 - -========================================================================= - -Version 2.8.3.23/2.7.2.23 - -新增需求: -1. 【功能】新增参数对象加密功能,新增需参数: -(1)fs.obs.server-side-encryption-type:加密算法,参数值为:sse-kms or sse-c; -(2)fs.obs.server-side-encryption-key:当参数(1)的值为sse-kms时,该值可选,表示kms加密key id;当参数(1)为sse-c时,该值必选,表示base64 encoded content。 -(3)kms加密需要将fs.obs.connection.ssl.enabled设为true,走https加密方式。 - -修复问题: -1. 【优化】 delete 方法加了在409的时候3次最大重试功能,减小外层任务失败的概率。 - -========================================================================= -Version 2.8.3.22/2.7.2.22 - -修复问题: -1. 【功能】优化Posix桶时的性能,减少元数据HEAD次数; - -新增需求: -1. 【功能】新增参数fs.obs.security.provider,配合ESDK从环境变量和ECS获取AK SK信息。 - -========================================================================= -Version 2.8.3.20/2.7.2.20 - -修复问题: -1. 【功能】OBSFileSystem被GC,导致的OBSInputStream中obsclient的引用为空,引起空指针异常; - -========================================================================= -Version 2.8.3.19/2.7.2.19 - -修复问题: -1. 【功能】HBASE场景,未关闭JAVA SDK的连接,导致EOF异常; +Version 3.1.1.46/2.8.3.46/2.7.2.46 +【BUG】“对象桶”场景下在服务端持续返回503的情况下rename操作依然显示成功 +【优化】在flink或是flume等场景中将频繁调用append接口,此接口因输出流position判断不准确导致出现很多不必要的warn级别的日志 +【优化】hadoop-obs 访问OBS的TCP建链超时由120s改为5s +【优化】添加aksk获取方式的INFO级别日志 +【优化】升级obs sdk为3.21.8.2,解决XXE漏洞 +【新增】新增fs.obs.outputstream.hflush.policy参数控制hflush和hsync方法的行为 + +Version 3.1.1.45/2.8.3.45/2.7.2.45 + +修复问题: +1. 【功能】新增预读策略,优化顺序读性能 +2. 【优化】修复文件桶追加输出流的getPos语义的正确性 +3. 【优化】将文件桶truncate接口中的HadoopIllegalArgumentException异常封装为IOException +4. 【优化】快速删除trash目标对象存在时,添加时间戳后缀到ms级,并按最大时间做冲突重试 +5. 【优化】文件桶递归删除目录409冲突时按最大时间重试 +6. 【优化】create接口添加文件立即可见性开关,默认关闭 +7. 【优化】去掉HDFSWrapper中的默认shema必须为hdfs的检查 +8. 【优化】升级obs sdk版本为3.21.4.1,修复列举操作时最后文件的lastmodifytime字段不正确 +9. 【优化】修改HDFSWrapper中的rename抛出异常为IOException + +========================================================================= + +Version 3.1.1.43/2.8.3.43/2.7.2.43 + +修复问题: +1. 【功能】新增OBSWrapper特性,用于支持已hdfs schema方式对接OBS +2. 【功能】针对对象桶修复目录过大rename性能问题 +3. 【功能】修复快速删除功能trash时并发冲突问题 +4. 【用例】补充OBSWrapper功能用例 +5. 【用例】补充trash并发冲突场景用例 + +========================================================================= + +Version 3.1.1.42/2.8.3.42/2.7.2.42 + +修复问题: +1. 【功能】解决本地写缓存文件名字符串,进程退出才清理导致的OOM问题 +2. 【功能】替换依赖的hadoop-common2.8.3为修复安全漏洞的2.8.3.0101-hw-ei-14 +3. 【功能】Metrics统计删除冗余的参数,并提供close接口用于退出时释放资源 +4. 【功能】排除hadoop-common中对jetty非安全包的依赖 + +========================================================================= + +Version 3.1.1.41/2.8.3.41/2.7.2.41 + +修复问题: +1. 【功能】支持public接口的metrics统计 +2. 【功能】调用create()接口后,文件立即可见 +3. 【功能】支持FileSystem/InputStream/OutputStream关闭后拒绝接口访问 +4. 【功能】finalize()接口不再调用带I/O操作的close(),以防止上层误用 +5. 【功能】pom文件依赖排除hadoop-common 3.1.1引入的jackson-databind不合规版本 +6. 【功能】实现getCanonicalServiceName接口,用于提升HBase BulkLoad场景同一桶内加载性能 +7. 【用例】补充完善重试机制测试用例 + +========================================================================= + +Version 3.1.1.40/2.8.3.40/2.7.2.40 + +修复问题: +1. 【功能】接口语义排查,按原生HDFS语义实现 +2. 【功能】开源社区规范整改 +3. 【功能】truncate功能支持 +4. 【功能】目录名不带/细粒度授权policy支持 +5. 【功能】递归列举接口未更新目录lasymodifyTime问题修复 +6. 【功能】重试机制增强,最大支持180s时间重试 +7. 【功能】初始化支持探测写缓存的可访问性,开关控制,默认关闭 +8. 【功能】OBS JAVA SDK调整为3.20.6.1版本 + +========================================================================= + +Version 3.1.1.39/2.8.3.39/2.7.2.39 + +修复问题: +1. 【功能】静态检查和安全扫描整改 +2. 【功能】接口测试用例补充 + + +========================================================================= + + +Version 3.1.1.38/2.8.3.38/2.7.2.38 + +修复问题: +1. 【功能】修改listStatus接口返回的目录的lastModifyTime为准确值 + + +========================================================================= + + +Version 3.1.1.37/2.8.3.37/2.7.2.37 + +修复问题: +1. 【功能】seekInStream方法中skip改为while循环,及去掉available判断 +2. 【功能】ObsClient及socket buffer大小改为256KB +3. 【功能】OBS SDK 连接池优化开关可配置 +4. 【功能】OBSBlockOutputStream日志打印优化 + + +========================================================================= + +Version 3.1.1.36/2.8.3.36/2.7.2.36 + +修复问题: +1. 【功能】eSDK鉴权协商增加开关,默认关闭 +2. 【功能】初始化时head bucket增加异常重试逻辑 +3. 【功能】onReadFailure中reopen异常时增加重试逻辑 +4. 【功能】append stream在客户端记录文件长度,修复可能的异常 +5. 【功能】增加递归listStatus接口,提升大目录列举性能 +6. 【功能】trash对象时,如果trash目录下存在同名对象,新对象时间戳后缀不带冒号 +7. 【用例】 + (1)增加用例验证eSDK鉴权协商开关有效性 + (2)增加head bucket重试机制测试用例 + (3)增加onReadFailure中reopen重试机制测试用例 + (4)增加head文件长度无效时,append stream用例 + (5)增加listStatus递归列举用例 + (6)增加trash文件和trash目录的测试用例 + + +========================================================================= + +Version 3.1.1.35/2.8.3.35/2.7.2.35 + +修复问题: +1. 【功能】OBSFileInputStream.java中文件桶的rename()接口新增重试逻辑 + + +========================================================================= + +Version 3.1.1.33/2.8.3.33/2.7.2.33 + +修复问题: +1. 【功能】OBSBlockOutputStream finalize方法中关闭流,清理写缓存文件 +2. 【功能】解决listFiles接口递归列举时遇到子目录抛空指针异常问题 +3. 【功能】合入eSDK3.19.11.1版本,解决EcsObsCredentialsProvider偶现请求失败问题 +4. 【用例】 + (1)增加listFiles接口测试用例 + + +========================================================================= + +Version 3.1.1.32/2.8.3.32/2.7.2.32 + +修复问题: +1. 【功能】list超时客户端并发列举优化 +2. 【功能】read(ByteBuffer buffer)接口重载 +3. 【功能】对象桶rename接口部分场景未按HDFS原生语义实现修复 +4. 【功能】OBSFileInputStream.java中四个参数配置开关,默认走父类方法 +5. 【功能】fs.obs.read.buff.size和fs.obs.write.buff.size默认值调整为64KB +6. 【功能】readaheadInputStreamEnabled预读相关代码注释 +7. 【用例】 + (1)对象桶rename增加四个用例 + (2)增加read(ByteBuffer buffer)接口测试用例 + (3)增加四个参数read接口开关测试用例 + + +========================================================================= + +Version 3.1.1.31/2.8.3.31/2.7.2.31 + +修复问题: +1. 【功能】OBSFileInputStream.java中带四个参数read接口未按HDFS语义实现及返回长度bug修复 +2. 【功能】list和head接口返回对象最后修改时间不一致问题修复 +3. 【功能】Rename接口源和目的相同时,返回值未按原生HDFS语义实现问题修复 +4. 【功能】递归删除BUG解决 +5. 【用例】 + (1)完善getContentSummary用例,增加对递归删除的校验 + (2)增加四个参数read接口对不同入参处理和返回值判断用例 + (3)增加list和head对应用返回对象最后修改时间比较用例 + (4)增加rename接口源和目的相同时的用例 + + +========================================================================= + +Version 3.1.1.30/2.8.3.30/2.7.2.30 + +修复问题: +1. 【功能】OBSFileSystem.java中接口getFileStatus、copyFile、listObjects、continueListObjects增加异常重试逻辑 +2. 【功能】OBSFileSystem.java中renameFolder中删除源目录实现由异步删除为批删 +3. 【功能】OBSInputStream.java中read相关的3个接口增加异常重试逻辑 +4. 【功能】对象桶getContentSummary性能优化 +5. 【功能】支持uper jar包编译 +6. 【用例】 + (1)增加getContentSummary用例 + (2)增加重试接口mock用例 + + +========================================================================= + +Version 3.1.1.29/2.8.3.27/2.7.2.29 + +修复问题: +1. 【功能】修改fs.obs.readahead.range默认值为1MB; +2. 【功能】当fs.obs.fast.upload.buffer为array时,申请的第一个block的大小默认修改为1MB,加了参数fs.obs.fast.upload.array.first.buffer; +3. 【功能】OBSBlockOutputStream增加多段上传过程中异常快速抛出,异常后,write、close都抛异常; +4. 【功能】OBSInputStream在seekInStream方法中,incrementBytesRead修改读取数据的值为真实skipped的字节数; +5. 【功能】fs.obs.connection.establish.timeout和fs.obs.connection.timeout默认值修改为120S +6. 【功能】修改deleteObejcts()接口,批删失败转单点删除,单点删除失败重试3次; +7. 【功能】修改批删触发条件,当开启enableMultiObjectsDelete开关,并且删除对象数大于等于3时才走批删 +8. 【用例】 + (1)增加批量删除用例一个,覆盖批量请求目标对象数小于100、1000~2000、大于2000场景 + (2)增加array的block用例1个和putPart快速失败用例1个,覆盖array的第一个block size为默认1MB,以及覆盖putPart快速失败用例。 + (3)增加readahead默认值为1MB用例1个 + + +========================================================================= + +Version 3.1.1.28/2.8.3.28/2.7.2.28 + +修复问题: +1. 【功能】copyFromLocalFile对目录支持,直接调用super的copyFromLocalFile方法。 +3. 【用例】增加copyFromLocalFile目录的用例 + +========================================================================= + +Version 3.1.1.27/2.8.3.27/2.7.2.27 + +修复问题: +1. 【功能】增加匿名访问方式的obsclient初始化。 +2. 【功能】将23版本的KMS和26版本的append stream只走posix桶的append方法合并。 +3. 【用例】(1)增加MainTest方法,后面流水线就通过执行该方法完成跑自动化用例。 + (2)增加assembly.xml,在编译test时,需要使用assemly方式将test打包。 + 打包方法:“compile assembly:single -Ddescriptor=E:\bigdata\obs_bigdataonobs\hadoop-tools\hadoop-huaweicloud\src\test\resources\assembly.xml --settings C:\Users\user\apache-maven-3.5.4\conf\settings_cloudmonitor.xml” + 跑用例:java -cp hadoop-huaweicloud-2.8.3.26-assembly.jar org.apache.hadoop.fs.obs.MainTest + +========================================================================= + +Version 2.8.3.26/2.7.2.26 补丁版本 + +修复问题: +1. 【功能】针对posix桶,在rename被使用为move场景,需要加上move的key。修改方式为:依照S3方法,先做HEAD,再调rename接口。 + +========================================================================= + +Version 2.8.3.25/2.7.2.25 补丁版本 + +修复问题: +1. 【功能】修改obsblockoutputstream的append模式,在大于阈值时,直接调用appendFS接口,不再走多段上传; +2. 【用例】新增append和hflush场景的4个用例: + (1). hflush大对象(20MB) + (2). hflush中对象(5MB) + (3). hflush小对象(2MB) + (4). appendstream根据缓存大小做了4次append操作; +3. 【优化】优化DELETE在409情况下,3次重试机制; +4. 【遗留问题】修改obsclient初始化方式,在credential provider\aksk\security provider都没有设置时,使用匿名方式初始化obsclient。 + +========================================================================= + +Version 2.8.3.23/2.7.2.23 + +新增需求: +1. 【功能】新增参数对象加密功能,新增需参数: +(1)fs.obs.server-side-encryption-type:加密算法,参数值为:sse-kms or sse-c; +(2)fs.obs.server-side-encryption-key:当参数(1)的值为sse-kms时,该值可选,表示kms加密key id;当参数(1)为sse-c时,该值必选,表示base64 encoded content。 +(3)kms加密需要将fs.obs.connection.ssl.enabled设为true,走https加密方式。 + +修复问题: +1. 【优化】 delete 方法加了在409的时候3次最大重试功能,减小外层任务失败的概率。 + +========================================================================= +Version 2.8.3.22/2.7.2.22 + +修复问题: +1. 【功能】优化Posix桶时的性能,减少元数据HEAD次数; + +新增需求: +1. 【功能】新增参数fs.obs.security.provider,配合ESDK从环境变量和ECS获取AK SK信息。 + +========================================================================= +Version 2.8.3.20/2.7.2.20 + +修复问题: +1. 【功能】OBSFileSystem被GC,导致的OBSInputStream中obsclient的引用为空,引起空指针异常; + +========================================================================= +Version 2.8.3.19/2.7.2.19 + +修复问题: +1. 【功能】HBASE场景,未关闭JAVA SDK的连接,导致EOF异常; diff --git a/hadoop-huaweicloud/pom.xml b/hadoop-huaweicloud/pom.xml index 5b2555a..c0e2e3f 100644 --- a/hadoop-huaweicloud/pom.xml +++ b/hadoop-huaweicloud/pom.xml @@ -1,447 +1,446 @@ - - - - 4.0.0 - com.huaweicloud - hadoop-huaweicloud - 2.8.3-hw-45 - hadoop-huaweicloud - - This module contains code to support integration with OBS. - - jar - - - UTF-8 - 2.8.3.0101-hw-ei-14 - 3.21.4.1 - 2.8.3 - 45 - - - - - dist - - 2.8.3 - 2.8.3.0101-hw-ei-14 - 45 - 3.21.4.1 - obs.shaded - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - false - - - - shade-obs-fs - package - - shade - - - - - - com.jamesmurty.utils - ${shading.prefix}.com.jamesmurty.utils - - - - okio - ${shading.prefix}.okio - - - - okhttp3 - ${shading.prefix}.okhttp3 - - - - com.fasterxml.jackson - ${shading.prefix}.com.fasterxml.jackson - - - - - - - - com.jamesmurty.utils:* - com.squareup.okio:* - com.squareup.okhttp3:* - com.huaweicloud:esdk-obs-java-optimised - com.fasterxml.jackson.core:* - - - - - - com.squareup.okhttp3:* - - okhttp3/internal/connection/ExchangeFinder.class - okhttp3/internal/connection/Transmitter.class - okhttp3/internal/http/RetryAndFollowUpInterceptor.class - - - - com.squareup.okio:* - - okio/AsyncTimeout.class - okio/SegmentPool.class - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - log4j2.xml - - - - - - - - - - - - - - hadoop-huaweicloud-${hadoop.plat.version}-hw-${obsa.version} - - - org.codehaus.mojo - findbugs-maven-plugin - 3.0.0 - - true - true - ${basedir}/dev-support/findbugs-exclude.xml - - Max - - - - org.apache.maven.plugins - maven-project-info-reports-plugin - 2.7 - - false - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.12.4 - - 3600 - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.5 - - - org.apache.maven.plugins - maven-dependency-plugin - 2.8 - - - copy - package - - copy-dependencies - - - ${project.build.directory}/lib - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - 8 - 8 - -Xlint:unchecked - - - - - - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - provided - - - jdk.tools - jdk.tools - - - commons-beanutils - commons-beanutils - - - commons-beanutils-core - commons-beanutils - - - jackson-core-asl - org.codehaus.jackson - - - jackson-mapper-asl - org.codehaus.jackson - - - jetty-util - org.mortbay.jetty - - - netty - io.netty - - - nimbus-jose-jwt - com.nimbusds - - - protobuf-java - com.google.protobuf - - - zookeeper - org.apache.zookeeper - - - jackson-databind - com.fasterxml.jackson.core - - - jetty-server - org.eclipse.jetty - - - jetty-servlet - org.eclipse.jetty - - - jetty-util - org.eclipse.jetty - - - jetty-util-ajax - org.eclipse.jetty - - - jetty-webapp - org.eclipse.jetty - - - - - org.apache.hadoop - hadoop-hdfs-client - ${hadoop.version} - provided - - - com.huaweicloud - esdk-obs-java-optimised - ${esdk.version} - - - org.apache.logging.log4j - log4j-api - - - org.apache.logging.log4j - log4j-core - - - - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - test - - - jackson-core-asl - org.codehaus.jackson - - - jackson-mapper-asl - org.codehaus.jackson - - - jetty-util - org.mortbay.jetty - - - netty - io.netty - - - nimbus-jose-jwt - com.nimbusds - - - protobuf-java - com.google.protobuf - - - zookeeper - org.apache.zookeeper - - - jackson-databind - com.fasterxml.jackson.core - - - jetty-server - org.eclipse.jetty - - - jetty-util - org.eclipse.jetty - - - jetty-util-ajax - org.eclipse.jetty - - - test-jar - - - junit - junit - 4.12 - test - - - org.mockito - mockito-all - 1.10.19 - test - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - ${hadoop.version} - test - - - netty - io.netty - - - protobuf-java - com.google.protobuf - - - - - org.apache.hadoop - hadoop-yarn-server-tests - ${hadoop.version} - test - - - jackson-core-asl - org.codehaus.jackson - - - jackson-mapper-asl - org.codehaus.jackson - - - jetty-util - org.mortbay.jetty - - - netty - io.netty - - - protobuf-java - com.google.protobuf - - - zookeeper - org.apache.zookeeper - - - jetty-util - org.eclipse.jetty - - - jetty-server - org.eclipse.jetty - - - jetty-util-ajax - org.eclipse.jetty - - - test-jar - - - org.apache.hadoop - hadoop-mapreduce-examples - ${hadoop.version} - test - jar - - - org.powermock - powermock-api-mockito - 1.7.4 - test - - - org.powermock - powermock-module-junit4 - 1.7.4 - test - - - org.apache.hadoop - hadoop-minicluster - ${hadoop.version} - test - - - + + + + 4.0.0 + com.huaweicloud + hadoop-huaweicloud + 2.8.3-hw-46 + hadoop-huaweicloud + + This module contains code to support integration with OBS. + + jar + + + UTF-8 + 2.8.3.0101-hw-ei-14 + 3.21.8.2 + 2.8.3 + 46 + obs.shaded + + + + + dist + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + false + + + + shade-obs-fs + package + + shade + + + + + + com.jamesmurty.utils + ${shading.prefix}.com.jamesmurty.utils + + + + okio + ${shading.prefix}.okio + + + + okhttp3 + ${shading.prefix}.okhttp3 + + + + com.fasterxml.jackson + ${shading.prefix}.com.fasterxml.jackson + + + + + + + + com.jamesmurty.utils:* + com.squareup.okio:* + com.squareup.okhttp3:* + com.huaweicloud:esdk-obs-java-optimised + com.fasterxml.jackson.core:* + + + + + + com.squareup.okhttp3:* + + okhttp3/internal/connection/ExchangeFinder.class + okhttp3/internal/connection/Transmitter.class + okhttp3/internal/http/RetryAndFollowUpInterceptor.class + + + + com.squareup.okio:* + + okio/AsyncTimeout.class + okio/SegmentPool.class + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + log4j2.xml + + + + + + + + + + + + + + hadoop-huaweicloud-${hadoop.plat.version}-hw-${obsa.version} + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.0 + + true + true + ${basedir}/dev-support/findbugs-exclude.xml + + Max + + + + org.apache.maven.plugins + maven-project-info-reports-plugin + 2.7 + + false + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.12.4 + + 3600 + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.5 + + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + copy + package + + copy-dependencies + + + ${project.build.directory}/lib + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 8 + 8 + -Xlint:unchecked + + + + org.codehaus.mojo + versions-maven-plugin + 2.8.1 + + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + jdk.tools + jdk.tools + + + commons-beanutils + commons-beanutils + + + commons-beanutils-core + commons-beanutils + + + jackson-core-asl + org.codehaus.jackson + + + jackson-mapper-asl + org.codehaus.jackson + + + jetty-util + org.mortbay.jetty + + + netty + io.netty + + + nimbus-jose-jwt + com.nimbusds + + + protobuf-java + com.google.protobuf + + + zookeeper + org.apache.zookeeper + + + jackson-databind + com.fasterxml.jackson.core + + + jetty-server + org.eclipse.jetty + + + jetty-servlet + org.eclipse.jetty + + + jetty-util + org.eclipse.jetty + + + jetty-util-ajax + org.eclipse.jetty + + + jetty-webapp + org.eclipse.jetty + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + provided + + + com.huaweicloud + esdk-obs-java-optimised + ${esdk.version} + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + test + + + jackson-core-asl + org.codehaus.jackson + + + jackson-mapper-asl + org.codehaus.jackson + + + jetty-util + org.mortbay.jetty + + + netty + io.netty + + + nimbus-jose-jwt + com.nimbusds + + + protobuf-java + com.google.protobuf + + + zookeeper + org.apache.zookeeper + + + jackson-databind + com.fasterxml.jackson.core + + + jetty-server + org.eclipse.jetty + + + jetty-util + org.eclipse.jetty + + + jetty-util-ajax + org.eclipse.jetty + + + test-jar + + + junit + junit + 4.12 + test + + + org.mockito + mockito-all + 1.10.19 + test + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + ${hadoop.version} + test + + + netty + io.netty + + + protobuf-java + com.google.protobuf + + + + + org.apache.hadoop + hadoop-yarn-server-tests + ${hadoop.version} + test + + + jackson-core-asl + org.codehaus.jackson + + + jackson-mapper-asl + org.codehaus.jackson + + + jetty-util + org.mortbay.jetty + + + netty + io.netty + + + protobuf-java + com.google.protobuf + + + zookeeper + org.apache.zookeeper + + + jetty-util + org.eclipse.jetty + + + jetty-server + org.eclipse.jetty + + + jetty-util-ajax + org.eclipse.jetty + + + test-jar + + + org.apache.hadoop + hadoop-mapreduce-examples + ${hadoop.version} + test + jar + + + org.powermock + powermock-api-mockito + 1.7.4 + test + + + org.powermock + powermock-module-junit4 + 1.7.4 + test + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + test + + + diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/DefaultOBSClientFactory.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/DefaultOBSClientFactory.java index 2c9efc6..3bffb3a 100644 --- a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/DefaultOBSClientFactory.java +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/DefaultOBSClientFactory.java @@ -215,6 +215,7 @@ private static ObsClient createHuaweiObsClient(final Configuration conf, final O throw new IOException("From option " + OBSConstants.OBS_CREDENTIALS_PROVIDER + ' ' + c, c); } + LOG.info("create ObsClient using credentialsProvider: {}", credentialsProviderClass.getName()); String sessionToken = credentialsProvider.getSessionToken(); String ak = credentialsProvider.getOBSAccessKeyId(); String sk = credentialsProvider.getOBSSecretKey(); @@ -241,6 +242,7 @@ private static ObsClient createObsClientWithoutCredentialsProvider(final Configu obsConf.setEndPoint(endPoint); if (!StringUtils.isEmpty(ak) || !StringUtils.isEmpty(sk)) { + LOG.info("create ObsClient using aksk from configuration"); obsClient = new ObsClient(ak, sk, token, obsConf); return obsClient; } @@ -255,10 +257,12 @@ private static ObsClient createObsClientWithoutCredentialsProvider(final Configu } if (securityProviderClass == null) { + LOG.info("create ObsClient when securityProviderClass is null"); obsClient = new ObsClient(ak, sk, token, obsConf); return obsClient; } + LOG.info("create ObsClient using securityProvider {}", securityProviderClass.getName()); IObsCredentialsProvider securityProvider; try { Optional cons = tryGetConstructor(securityProviderClass, diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSBlockOutputStream.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSBlockOutputStream.java index 27aabfa..2d98bce 100644 --- a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSBlockOutputStream.java +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSBlockOutputStream.java @@ -149,6 +149,8 @@ class OBSBlockOutputStream extends OutputStream implements Syncable { */ private boolean mockUploadPartError = false; + private String hflushPolicy = OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_SYNC; + /** * An OBS output stream which uploads partitions in a separate pool of * threads; different {@link OBSDataBlocks.BlockFactory} instances can @@ -175,6 +177,8 @@ class OBSBlockOutputStream extends OutputStream implements Syncable { "Block size is too small: %d", owner.getPartSize()); this.executorService = MoreExecutors.listeningDecorator(execService); this.multiPartUpload = null; + this.hflushPolicy = owner.getConf() + .get(OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY, OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_SYNC); // create that first block. This guarantees that an open + close // sequence writes a 0-byte entry. createBlockIfNeeded(); @@ -505,6 +509,7 @@ private synchronized void putObject() throws IOException { // the putObject call automatically closes the input // stream afterwards. writeOperationHelper.putObject(putObjectRequest); + objectLen += size; } finally { OBSCommonUtils.closeAll(block); } @@ -534,8 +539,22 @@ public synchronized void hflush() throws IOException { fs.checkOpen(); checkStreamOpen(); long startTime = System.currentTimeMillis(); - // hflush hsyn same - flushOrSync(); + switch (this.hflushPolicy) { + case OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_SYNC: + // hflush hsyn same + flushOrSync(); + break; + case OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_FLUSH: + flush(); + break; + + case OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_EMPTY: + // do nothing + break; + + default: + throw new IOException(String.format("unsupported downgrade policy '%s'", this.hflushPolicy)); + } long endTime = System.currentTimeMillis(); if (fs.getMetricSwitch()) { @@ -661,7 +680,22 @@ public synchronized void hsync() throws IOException { fs.checkOpen(); checkStreamOpen(); long startTime = System.currentTimeMillis(); - flushOrSync(); + switch (this.hflushPolicy) { + case OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_SYNC: + // hflush hsyn same + flushOrSync(); + break; + case OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_FLUSH: + sync(); + break; + + case OBSConstants.OUTPUT_STREAM_HFLUSH_POLICY_EMPTY: + // do nothing + break; + + default: + throw new IOException(String.format("unsupported downgrade policy '%s'", this.hflushPolicy)); + } long endTime = System.currentTimeMillis(); if (fs.getMetricSwitch()) { BasicMetricsConsumer.MetricRecord record = new BasicMetricsConsumer.MetricRecord(null, diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSConstants.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSConstants.java index 144df0d..b06940b 100644 --- a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSConstants.java +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSConstants.java @@ -167,7 +167,7 @@ public final class OBSConstants { /** * Default value of {@link #ESTABLISH_TIMEOUT}. */ - static final int DEFAULT_ESTABLISH_TIMEOUT = 120000; + static final int DEFAULT_ESTABLISH_TIMEOUT = 5000; /** * Seconds until we give up on a connection to obs. @@ -762,6 +762,14 @@ public final class OBSConstants { */ static final boolean DEFAULT_FILE_VISIBILITY_AFTER_CREATE_ENABLE = false; + public static final String AUTHORIZER_PROVIDER = "fs.obs.authorize.provider"; + + public static final String AUTHORIZE_FAIL_FALLBACK= "fs.obs.authorize.fail.fallback"; + public static final boolean DEFAULT_AUTHORIZE_FAIL_FALLBACK = false; + + public static final String AUTHORIZE_EXCEPTION_FALLBACK= "fs.obs.authorize.exception.fallback"; + public static final boolean DEFAULT_AUTHORIZE_EXCEPTION_FALLBACK = true; + /** * Second to millisecond factor. */ @@ -778,6 +786,18 @@ public final class OBSConstants { static final boolean DEFAULT_METRICS_SWITCH = false; + /** + * OBSBlockOutputStream implement the Syncable interface with its full semantic, + * but this will lead to low performance in some scenario, for detail see [BUG2021092400077]. + */ + static final String OUTPUT_STREAM_HFLUSH_POLICY = "fs.obs.outputstream.hflush.policy"; + + static final String OUTPUT_STREAM_HFLUSH_POLICY_SYNC = "sync"; // use original policy + + static final String OUTPUT_STREAM_HFLUSH_POLICY_FLUSH = "flush"; // downgrade hflush/hsync to the buffer's flush + + static final String OUTPUT_STREAM_HFLUSH_POLICY_EMPTY = "empty"; // downgrade hflush/hsync to empty func, which means calling hflush/hsync will do nothing + private OBSConstants() { } } diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSFileSystem.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSFileSystem.java index 5464922..eb7dea2 100644 --- a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSFileSystem.java +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSFileSystem.java @@ -46,9 +46,14 @@ import org.apache.hadoop.fs.obs.input.InputPolicyFactory; import org.apache.hadoop.fs.obs.input.InputPolicys; import org.apache.hadoop.fs.obs.input.OBSInputStream; +import org.apache.hadoop.fs.obs.security.AccessType; +import org.apache.hadoop.fs.obs.security.AuthorizeProvider; +import org.apache.hadoop.fs.obs.security.DelegationTokenCapability; +import org.apache.hadoop.fs.obs.security.OBSAuthorizationException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; @@ -270,6 +275,8 @@ public class OBSFileSystem extends FileSystem { */ private boolean enableCanonicalServiceName = false; + AuthorizeProvider authorizer; + /** * A map from file names to {@link FSDataOutputStream} objects that are * currently being written by this client. Note that a file can only be @@ -446,6 +453,10 @@ public void initialize(final URI name, final Configuration originalConf) throws enableFileVisibilityAfterCreate = conf.getBoolean(OBSConstants.FILE_VISIBILITY_AFTER_CREATE_ENABLE, OBSConstants.DEFAULT_FILE_VISIBILITY_AFTER_CREATE_ENABLE); + enableCanonicalServiceName = conf.getBoolean(OBSConstants.GET_CANONICAL_SERVICE_NAME_ENABLE, + OBSConstants.DEFAULT_GET_CANONICAL_SERVICE_NAME_ENABLE); + + this.authorizer = initAuthorizeProvider(conf); } catch (ObsException e) { throw OBSCommonUtils.translateException("initializing ", new Path(name), e); } @@ -453,6 +464,72 @@ public void initialize(final URI name, final Configuration originalConf) throws LOG.info("Finish initializing filesystem instance for uri: {}", uri); } + private AuthorizeProvider initAuthorizeProvider(Configuration conf) throws IOException { + AuthorizeProvider authorizer = null; + Class authClassName = conf.getClass(OBSConstants.AUTHORIZER_PROVIDER, null); + if (authClassName != null) { + try { + LOG.info("authorize provider is " + authClassName.getName()); + authorizer = (AuthorizeProvider)authClassName.newInstance(); + authorizer.init(conf); + } catch (Exception e) { + LOG.error(String.format("init %s failed", OBSConstants.AUTHORIZER_PROVIDER), e); + throw new IOException(String.format("init %s failed", OBSConstants.AUTHORIZER_PROVIDER), e); + } + } + + return authorizer; + } + + private void checkPermission(Path path, AccessType accessType) throws IOException { + if (authorizer == null) { + LOG.debug("authorize provider is not initialized. No authorization will be performed."); + } else { + boolean failFallback = this.getConf().getBoolean(OBSConstants.AUTHORIZE_FAIL_FALLBACK, + OBSConstants.DEFAULT_AUTHORIZE_FAIL_FALLBACK); + boolean exceptionFallback = this.getConf().getBoolean(OBSConstants.AUTHORIZE_EXCEPTION_FALLBACK, + OBSConstants.DEFAULT_AUTHORIZE_EXCEPTION_FALLBACK); + + String key = OBSCommonUtils.pathToKey(this, path); + Boolean result = true; + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + try { + long st = System.currentTimeMillis(); + result = authorizer.isAuthorized(this.bucket, key, accessType); + long et = System.currentTimeMillis(); + if (LOG.isDebugEnabled()){ + LOG.debug("authorizing:[user: {}], [action: {}], " + + "[bucket: {}], [path: {}], [result: {}], [cost: {}]", + currentUser, + accessType.toString(), + this.bucket, + path.toString(), + result, et - st); + } + } catch (Exception e) { + if (exceptionFallback) { + LOG.warn("authorize exception fallback:[user: {}], [action: {}], [bucket: {}], [key: {}]", + currentUser, accessType.toString(),this.bucket,key); + } else { + throw e; + } + } + if (!result) { + if (failFallback) { + LOG.warn("authorize fail fallback:[user: {}], [action: {}], [bucket: {}], [key: {}]", + currentUser, accessType.toString(),this.bucket,key); + } else { + throw new OBSAuthorizationException(String.format("permission denied:[user: %s], [action: %s], " + + "[bucket: %s], [key: %s]", + currentUser, + accessType.toString(), + this.bucket, + key)); + } + } + } + } + private void initThreadPools(final Configuration conf) { long keepAliveTime = OBSCommonUtils.longOption(conf, OBSConstants.KEEPALIVE_TIME, OBSConstants.DEFAULT_KEEPALIVE_TIME, 0); @@ -716,11 +793,7 @@ public FSDataInputStream open(final Path f, final int bufferSize) throws IOExcep } throw new FileNotFoundException("Can't open " + f + " because it is a directory"); } - // FSDataInputStream fsDataInputStream = new FSDataInputStream( - // new OBSInputStream(bucket, OBSCommonUtils.pathToKey(this, f), - // fileStatus.getLen(), - // obs, statistics, readAheadRange, this)); - + checkPermission(f, AccessType.READ); FSInputStream fsInputStream = inputPolicyFactory.create(this, bucket, OBSCommonUtils.pathToKey(this, f), fileStatus.getLen(), statistics, boundedMultipartUploadThreadPool); FSDataInputStream fsDataInputStream = new FSDataInputStream(fsInputStream); @@ -806,6 +879,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi exist = false; } + checkPermission(f,AccessType.WRITE); FSDataOutputStream outputStream = new FSDataOutputStream(new OBSBlockOutputStream(this, key, 0, new SemaphoredDelegatingExecutor(boundedMultipartUploadThreadPool, blockOutputActiveBlocks, true), false), null); @@ -923,6 +997,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi exist = false; } + checkPermission(f,AccessType.WRITE); outputStream = new FSDataOutputStream(new OBSBlockOutputStream(this, key, 0, new SemaphoredDelegatingExecutor(boundedMultipartUploadThreadPool, blockOutputActiveBlocks, true), true), null); @@ -1055,6 +1130,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr throw new IOException("Cannot append " + f + " that is being written."); } + checkPermission(f,AccessType.WRITE); FSDataOutputStream outputStream = new FSDataOutputStream(new OBSBlockOutputStream(this, key, objectLen, new SemaphoredDelegatingExecutor(boundedMultipartUploadThreadPool, blockOutputActiveBlocks, true), true), null,objectLen); @@ -1096,7 +1172,6 @@ public boolean truncate(Path f, long newLength) throws IOException { if (!enablePosix) { super.truncate(f, newLength); } - if (newLength < 0) { throw new IOException(new HadoopIllegalArgumentException( "Cannot truncate " + f + " to a negative file size: " + newLength + ".")); @@ -1131,6 +1206,7 @@ public boolean truncate(Path f, long newLength) throws IOException { + newLength + ".")); } + checkPermission(f, AccessType.WRITE); OBSPosixBucketUtils.innerFsTruncateWithRetry(this, f, newLength); return true; @@ -1170,6 +1246,8 @@ public boolean rename(final Path src, final Path dst) throws IOException { LOG.debug("Rename path {} to {} start", src, dst); try { if (enablePosix) { + checkPermission(src, AccessType.WRITE); + checkPermission(dst,AccessType.WRITE); boolean success = OBSPosixBucketUtils.renameBasedOnPosix(this, src, dst); endTime = System.currentTimeMillis(); if (getMetricSwitch()) { @@ -1179,6 +1257,8 @@ public boolean rename(final Path src, final Path dst) throws IOException { } return success; } else { + checkPermission(src, AccessType.WRITE); + checkPermission(dst, AccessType.WRITE); boolean success = OBSObjectBucketUtils.renameBasedOnObject(this, src, dst); endTime = System.currentTimeMillis(); if (getMetricSwitch()) { @@ -1307,6 +1387,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException LOG.debug("delete: path {} - recursive {}", status.getPath(), recursive); if (enablePosix) { + checkPermission(f,AccessType.WRITE); boolean success = OBSPosixBucketUtils.fsDelete(this, status, recursive); endTime = System.currentTimeMillis(); if (getMetricSwitch()) { @@ -1316,7 +1397,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException } return success; } - + checkPermission(f,AccessType.WRITE); boolean success = OBSObjectBucketUtils.objectDelete(this, status, recursive); endTime = System.currentTimeMillis(); if (getMetricSwitch()) { @@ -1383,6 +1464,7 @@ String getTrashDir() { @Override public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { checkOpen(); + checkPermission(f,AccessType.READ); long startTime = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); long endTime; @@ -1423,6 +1505,7 @@ public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOExc */ public FileStatus[] listStatus(final Path f, final boolean recursive) throws FileNotFoundException, IOException { checkOpen(); + checkPermission(f,AccessType.READ); long startTime = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); long endTime; @@ -1508,6 +1591,7 @@ String getShortUserName() { public boolean mkdirs(final Path path, final FsPermission permission) throws IOException, FileAlreadyExistsException { checkOpen(); + checkPermission(path,AccessType.WRITE); long startTime = System.currentTimeMillis(); long endTime; try { @@ -1727,19 +1811,30 @@ public void close() throws IOException { LOG.info("Finish closing filesystem instance for uri: {}", uri); } - /** - * Override {@code getCanonicalServiceName} and return {@code null} since - * delegation token is not supported. - */ @Override public String getCanonicalServiceName() { - // Does not support Token, only enable for HBase BulkLoad - if (enableCanonicalServiceName) { + if (authorizer != null && authorizer instanceof DelegationTokenCapability) { + LOG.debug("getting CanonicalServiceName"); + return ((DelegationTokenCapability)authorizer).getCanonicalServiceName(); + } else if (enableCanonicalServiceName) { + // Does not support Token, only enable for HBase BulkLoad return getScheme() + "://" + bucket; } return null; } + @Override + public Token getDelegationToken(String renewer) throws IOException { + if (authorizer != null && authorizer instanceof DelegationTokenCapability) { + long st = System.currentTimeMillis(); + Token delegationToken = ((DelegationTokenCapability) authorizer).getDelegationToken(renewer); + long et = System.currentTimeMillis(); + LOG.debug("getDelegationToken:[renewer: {}], [cost: {}]", renewer, et - st); + return delegationToken; + } + return super.getDelegationToken(renewer); + } + /** * Return copy part size. * @@ -1853,6 +1948,7 @@ int getMaxKeys() { public RemoteIterator listFiles(final Path f, final boolean recursive) throws FileNotFoundException, IOException { checkOpen(); + checkPermission(f,AccessType.READ); long startTime = System.currentTimeMillis(); long endTime; RemoteIterator locatedFileStatus; @@ -1873,7 +1969,6 @@ public RemoteIterator listFiles(final Path f, final boolean r } throw new AccessControlException(e); } - if (fileStatus.isFile()) { locatedFileStatus = new OBSListing.SingleStatusRemoteIterator( OBSCommonUtils.toLocatedFileStatus(this, fileStatus)); @@ -1951,6 +2046,7 @@ public RemoteIterator listLocatedStatus(final Path f) throws public RemoteIterator listLocatedStatus(final Path f, final PathFilter filter) throws FileNotFoundException, IOException { checkOpen(); + checkPermission(f,AccessType.READ); Path path = OBSCommonUtils.qualify(this, f); LOG.debug("listLocatedStatus({}, {}", path, filter); long startTime = System.currentTimeMillis(); diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSObjectBucketUtils.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSObjectBucketUtils.java index d705b25..e1cba55 100644 --- a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSObjectBucketUtils.java +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSObjectBucketUtils.java @@ -406,6 +406,7 @@ static void createEmptyObject(final OBSFileSystem owner, final String objectName long delayMs; int retryTime = 0; long startTime = System.currentTimeMillis(); + IOException lastException = null; while (System.currentTimeMillis() - startTime <= OBSCommonUtils.MAX_TIME_IN_MILLISECONDS_TO_RETRY) { InputStream im = null; try { @@ -422,7 +423,7 @@ public int read() { owner.getSchemeStatistics().incrementBytesWritten(putObjectRequest.getMetadata().getContentLength()); return; } catch (ObsException e) { - LOG.debug("Delete path failed with [{}], " + "retry time [{}] - request id [{}] - " + LOG.debug("create empty obj failed with [{}], " + "retry time [{}] - request id [{}] - " + "error code [{}] - error message [{}]", e.getResponseCode(), retryTime, e.getErrorRequestId(), e.getErrorCode(), e.getErrorMessage()); @@ -431,6 +432,8 @@ public int read() { throw ioException; } + lastException = ioException; + delayMs = OBSCommonUtils.getSleepTimeInMs(retryTime); retryTime++; if (System.currentTimeMillis() - startTime + delayMs @@ -447,6 +450,7 @@ public int read() { } } } + throw lastException; } /** @@ -487,6 +491,7 @@ static void copyFile(final OBSFileSystem owner, final String srcKey, final Strin } } } + innerCopyFile(owner, srcKey, dstKey, size); } private static void innerCopyFile(final OBSFileSystem owner, final String srcKey, final String dstKey, diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSPosixBucketUtils.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSPosixBucketUtils.java index c641a4d..035070c 100644 --- a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSPosixBucketUtils.java +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSPosixBucketUtils.java @@ -500,7 +500,7 @@ static void fsCreateFolder(final OBSFileSystem owner, final String objectName) t final NewFolderRequest newFolderRequest = new NewFolderRequest(owner.getBucket(), newObjectName); newFolderRequest.setAcl(owner.getCannedACL()); long len = newFolderRequest.getObjectKey().length(); - + IOException lastException = null; long delayMs; int retryTime = 0; long startTime = System.currentTimeMillis(); @@ -518,6 +518,7 @@ static void fsCreateFolder(final OBSFileSystem owner, final String objectName) t if (!(ioException instanceof OBSIOException)) { throw ioException; } + lastException = ioException; delayMs = OBSCommonUtils.getSleepTimeInMs(retryTime); retryTime++; @@ -531,6 +532,7 @@ static void fsCreateFolder(final OBSFileSystem owner, final String objectName) t } } } + throw lastException; } // Used to get the status of a file or folder in a file-gateway bucket. diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AccessType.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AccessType.java new file mode 100644 index 0000000..a197cf6 --- /dev/null +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AccessType.java @@ -0,0 +1,8 @@ +package org.apache.hadoop.fs.obs.security; + +public enum AccessType { + LIST, + WRITE, + READ, + DELETE, +} \ No newline at end of file diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AuthorizeProvider.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AuthorizeProvider.java new file mode 100644 index 0000000..fff73f9 --- /dev/null +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/AuthorizeProvider.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.fs.obs.security; + +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; + +/** + * 功能描述 + * + * @since 2021-06-21 + */ +public interface AuthorizeProvider { + + void init(Configuration conf) throws IOException; + + boolean isAuthorized(String bucket, String key, AccessType action) throws IOException; + +} diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/DelegationTokenCapability.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/DelegationTokenCapability.java new file mode 100644 index 0000000..e525c6f --- /dev/null +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/DelegationTokenCapability.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.fs.obs.security; + +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; + +/** + * 功能描述 + * + * @since 2021-09-15 + */ +public interface DelegationTokenCapability { + String getCanonicalServiceName(); + + Token getDelegationToken(String renewer) throws IOException; +} diff --git a/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/OBSAuthorizationException.java b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/OBSAuthorizationException.java new file mode 100644 index 0000000..8179ce4 --- /dev/null +++ b/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/security/OBSAuthorizationException.java @@ -0,0 +1,24 @@ +package org.apache.hadoop.fs.obs.security; + +import java.io.IOException; + +/** + * 功能描述 + * + * @since 2021-09-15 + */ +public class OBSAuthorizationException extends IOException { + private static final long serialVersionUID = 1L; + + public OBSAuthorizationException(String message, Exception e) { + super(message, e); + } + + public OBSAuthorizationException(String message) { + super(message); + } + + public OBSAuthorizationException(Throwable e) { + super(e); + } +} diff --git a/hadoop-huaweicloud/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-huaweicloud/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem index e77425a..ec0387a 100644 --- a/hadoop-huaweicloud/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem +++ b/hadoop-huaweicloud/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -1,16 +1,16 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.hadoop.fs.obs.OBSFileSystem +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.obs.OBSFileSystem