Skip to content

Commit

Permalink
[Improvement][RemoteLogging]Support AbsRemoteLogHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
pegasas committed Mar 29, 2024
1 parent ae1fe84 commit 8c01e2c
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 13 deletions.
12 changes: 6 additions & 6 deletions docs/docs/en/guide/remote-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ If you deploy DolphinScheduler in `Standalone` mode, you only need to configure
```properties
# Whether to enable remote logging
remote.logging.enable=false
# if remote.logging.enable = true, set the target of remote logging
# if remote.logging.enable = true, set the target of remote logging, currently support OSS, S3, GCS, ABS
remote.logging.target=OSS
# if remote.logging.enable = true, set the log base directory
remote.logging.base.dir=logs
Expand Down Expand Up @@ -66,12 +66,12 @@ remote.logging.google.cloud.storage.bucket.name=<your-bucket>
Configure `common.properties` as follows:

```properties
# abs container name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.container.name=<your-container>
# abs account name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string>
remote.logging.abs.account.name=<your-account-name>
# abs account key, required if you set resource.storage.type=ABS
remote.logging.abs.account.key=<your-account-key>
# abs container name, required if you set resource.storage.type=ABS
remote.logging.abs.container.name=<your-container-name>
```

### Notice
Expand Down
12 changes: 6 additions & 6 deletions docs/docs/zh/guide/remote-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Apache DolphinScheduler支持将任务日志传输到远端存储上。当配置
```properties
# 是否开启远程日志存储
remote.logging.enable=true
# 任务日志写入的远端存储,目前支持OSS, S3, GCS
# 任务日志写入的远端存储,目前支持OSS, S3, GCS, ABS
remote.logging.target=OSS
# 任务日志在远端存储上的目录
remote.logging.base.dir=logs
Expand Down Expand Up @@ -66,12 +66,12 @@ remote.logging.google.cloud.storage.bucket.name=<your-bucket>
配置`common.propertis`如下:

```properties
# abs container name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.container.name=<your-container>
# abs account name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string>
remote.logging.abs.account.name=<your-account-name>
# abs account key, required if you set resource.storage.type=ABS
remote.logging.abs.account.key=<your-account-key>
# abs container name, required if you set resource.storage.type=ABS
remote.logging.abs.container.name=<your-container-name>
```

### 注意事项
Expand Down
5 changes: 5 additions & 0 deletions dolphinscheduler-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
<artifactId>esdk-obs-java-bundle</artifactId>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>

<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,13 @@ private Constants() {

public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";

/**
* remote logging for ABS
*/
public static final String REMOTE_LOGGING_ABS_ACCOUNT_NAME = "remote.logging.abs.account.name";
public static final String REMOTE_LOGGING_ABS_ACCOUNT_KEY = "remote.logging.abs.account.key";
public static final String REMOTE_LOGGING_ABS_CONTAINER_NAME = "remote.logging.abs.container.name";

/**
* data quality
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import org.apache.commons.lang3.StringUtils;

import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;

import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.common.StorageSharedKeyCredential;

@Slf4j
public class AbsRemoteLogHandler implements RemoteLogHandler, Closeable {

private String accountName;

private String accountKey;

private String containerName;

private BlobContainerClient blobContainerClient;

private static AbsRemoteLogHandler instance;

private AbsRemoteLogHandler() {
accountName = readAccountName();
accountKey = readAccountKey();
containerName = readContainerName();
blobContainerClient = buildBlobContainerClient();
}

public static synchronized AbsRemoteLogHandler getInstance() {
if (instance == null) {
instance = new AbsRemoteLogHandler();
}

return instance;
}

protected BlobContainerClient buildBlobContainerClient() {

BlobServiceClient serviceClient = new BlobServiceClientBuilder()
.endpoint(String.format("https://%s.blob.core.windows.net/", accountName))
.credential(new StorageSharedKeyCredential(accountName, accountKey))
.buildClient();

if (StringUtils.isBlank(containerName)) {
throw new IllegalArgumentException("remote.logging.abs.container.name is blank");
}

try {
this.blobContainerClient = serviceClient.getBlobContainerClient(containerName);
} catch (Exception ex) {
throw new IllegalArgumentException(
"containerName: " + containerName + " is not exists, you need to create them by yourself");
}

log.info("containerName: {} has been found.", containerName);

return blobContainerClient;
}

@Override
public void close() throws IOException {
// no need to close blobContainerClient
}

@Override
public void sendRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("send remote log {} to Azure Blob {}", logPath, objectName);
blobContainerClient.getBlobClient(objectName).uploadFromFile(logPath);
} catch (Exception e) {
log.error("error while sending remote log {} to Azure Blob {}", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("get remote log on Azure Blob {} to {}", objectName, logPath);

try (
BlobInputStream bis = blobContainerClient.getBlobClient(objectName).openInputStream();
FileOutputStream fos = new FileOutputStream(logPath)) {
byte[] readBuf = new byte[1024];
int readLen = 0;
while ((readLen = bis.read(readBuf)) > 0) {
fos.write(readBuf, 0, readLen);
}
}
} catch (Exception e) {
log.error("error while getting remote log on Azure Blob {} to {}", objectName, logPath, e);
}
}

protected String readAccountName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME);
}

protected String readAccountKey() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY);
}

protected String readContainerName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public RemoteLogHandler getRemoteLogHandler() {
return S3RemoteLogHandler.getInstance();
} else if ("GCS".equals(target)) {
return GcsRemoteLogHandler.getInstance();
} else if ("ABS".equals(target)) {
return AbsRemoteLogHandler.getInstance();
}

log.error("No suitable remote logging target for {}", target);
Expand Down
7 changes: 6 additions & 1 deletion dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,9 @@ remote.logging.s3.region=<region>
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>

# abs account name, required if you set resource.storage.type=ABS
remote.logging.abs.account.name=<your-account-name>
# abs account key, required if you set resource.storage.type=ABS
remote.logging.abs.account.key=<your-account-key>
# abs container name, required if you set resource.storage.type=ABS
remote.logging.abs.container.name=<your-container-name>
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import java.nio.file.Path;
import java.nio.file.Paths;

import lombok.extern.slf4j.Slf4j;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.*;

@Slf4j
@ExtendWith(MockitoExtension.class)
public class AbsRemoteLogHandlerTest {

@Mock
BlobServiceClient blobServiceClient;

@Mock
BlobContainerClient blobContainerClient;

@Mock
BlobClient blobClient;

@Test
public void testAbsRemoteLogHandlerContainerNameBlack() {
try (
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) {
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME))
.thenReturn("account_name");
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY))
.thenReturn("account_key");
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME))
.thenReturn("");
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");

IllegalArgumentException thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> {
AbsRemoteLogHandler.getInstance();
});
Assertions.assertEquals("remote.logging.abs.container.name is blank", thrown.getMessage());
}
}

@Test
public void testAbsRemoteLogHandlerContainerNotExists() {
try (
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class);
MockedConstruction<BlobServiceClientBuilder> k8sClientWrapperMockedConstruction =
Mockito.mockConstruction(BlobServiceClientBuilder.class, (mock, context) -> {
when(mock.endpoint(any(String.class))).thenReturn(mock);
when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock);
when(mock.buildClient())
.thenReturn(blobServiceClient);
})) {
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME))
.thenReturn("account_name");
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY))
.thenReturn("account_key");
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME))
.thenReturn("container_name");
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");

when(blobServiceClient.getBlobContainerClient(any(String.class))).thenThrow(
new NullPointerException("container not exists"));
IllegalArgumentException thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> {
AbsRemoteLogHandler.getInstance();
});
Assertions.assertEquals("containerName: container_name is not exists, you need to create them by yourself", thrown.getMessage());
}
}

@Test
public void testAbsRemoteLogHandler() {

try (
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class);
MockedConstruction<BlobServiceClientBuilder> blobServiceClientBuilderMockedConstruction =
Mockito.mockConstruction(BlobServiceClientBuilder.class, (mock, context) -> {
when(mock.endpoint(any(String.class))).thenReturn(mock);
when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock);
when(mock.buildClient())
.thenReturn(blobServiceClient);
});
MockedStatic<RemoteLogUtils> remoteLogUtilsMockedStatic1 = Mockito.mockStatic(RemoteLogUtils.class)) {
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME))
.thenReturn("account_name");
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY))
.thenReturn("account_key");
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME))
.thenReturn("container_name");
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");
String logPath = "logpath";
String objectName = "objectname";
remoteLogUtilsMockedStatic1.when(() -> RemoteLogUtils.getObjectNameFromLogPath(logPath)).thenReturn(objectName);

when(blobServiceClient.getBlobContainerClient(any(String.class))).thenReturn(blobContainerClient);
when(blobContainerClient.getBlobClient(objectName)).thenReturn(blobClient);

AbsRemoteLogHandler absRemoteLogHandler = AbsRemoteLogHandler.getInstance();
Assertions.assertNotNull(absRemoteLogHandler);

absRemoteLogHandler.sendRemoteLog(logPath);
Mockito.verify(blobClient, times(1)).uploadFromFile(logPath);
}
}
}

0 comments on commit 8c01e2c

Please sign in to comment.