Skip to content

Commit

Permalink
[Improvement][RemoteLogging] Add AbsRemoteLogHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
pegasas committed Mar 26, 2024
1 parent 2e16907 commit 35b160c
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 0 deletions.
5 changes: 5 additions & 0 deletions dolphinscheduler-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
<artifactId>esdk-obs-java-bundle</artifactId>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>

<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,13 @@ private Constants() {

public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";

/**
* remote logging for ABS
*/
public static final String REMOTE_LOGGING_ABS_ACCOUNT_NAME = "remote.logging.abs.account.name";
public static final String REMOTE_LOGGING_ABS_ACCOUNT_KEY = "remote.logging.abs.account.key";
public static final String REMOTE_LOGGING_ABS_CONTAINER_NAME = "remote.logging.abs.container.name";

/**
* data quality
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.common.StorageSharedKeyCredential;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.IOException;

@Slf4j
public class AbsRemoteLogHandler implements RemoteLogHandler, Closeable {

private String accountName;

private String accountKey;

private String containerName;

private BlobContainerClient blobContainerClient;

private static AbsRemoteLogHandler instance;

private AbsRemoteLogHandler() {

}

public static synchronized AbsRemoteLogHandler getInstance() {
if (instance == null) {
instance = new AbsRemoteLogHandler();
instance.init();
}

return instance;
}

public void init() {
accountName = readAccountName();
accountKey = readAccountKey();
containerName = readContainerName();
blobContainerClient = buildBlobContainerClient();
}

protected BlobContainerClient buildBlobContainerClient() {

BlobServiceClient serviceClient = new BlobServiceClientBuilder()
.endpoint(String.format("https://%s.blob.core.windows.net/", accountName))
.credential(new StorageSharedKeyCredential(accountName, accountKey))
.buildClient();

if (StringUtils.isBlank(containerName)) {
throw new IllegalArgumentException("remote.logging.s3.bucket.name is blank");
}

try {
BlobContainerClient blobContainerClient = serviceClient.getBlobContainerClient(containerName);
} catch (Exception ex) {
throw new IllegalArgumentException(
"containerName: " + containerName + " is not exists, you need to create them by yourself");
}

log.info("containerName: {} has been found.", containerName);

return blobContainerClient;
}

@Override
public void close() throws IOException {
// no need to close blobContainerClient
}

@Override
public void sendRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("send remote log {} to Azure Blob {}", logPath, objectName);
blobContainerClient.getBlobClient(objectName).uploadFromFile(logPath);
} catch (Exception e) {
log.error("error while sending remote log {} to Azure Blob {}", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("get remote log on Azure Blob {} to {}", objectName, logPath);

try (
BlobInputStream bis = blobContainerClient.getBlobClient(objectName).openInputStream();
FileOutputStream fos = new FileOutputStream(logPath)) {
byte[] readBuf = new byte[1024];
int readLen = 0;
while ((readLen = bis.read(readBuf)) > 0) {
fos.write(readBuf, 0, readLen);
}
}
} catch (Exception e) {
log.error("error while getting remote log on Azure Blob {} to {}", objectName, logPath, e);
}
}

protected String readAccountName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME);
}

protected String readAccountKey() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY);
}

protected String readContainerName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public RemoteLogHandler getRemoteLogHandler() {
return S3RemoteLogHandler.getInstance();
} else if ("GCS".equals(target)) {
return GcsRemoteLogHandler.getInstance();
} else if ("ABS".equals(target)) {
return AbsRemoteLogHandler.getInstance();
}

log.error("No suitable remote logging target for {}", target);
Expand Down

0 comments on commit 35b160c

Please sign in to comment.