Skip to content

Commit

Permalink
log broker load
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Feb 21, 2024
1 parent ad64923 commit 419d693
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
task.settWorkloadGroups(tWorkloadGroups);
task.setAuditEvent(loadAuditEvent);
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
Expand Down Expand Up @@ -137,6 +138,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
protected String comment = "";

protected List<TPipelineWorkloadGroup> tWorkloadGroups = null;
protected AuditEvent loadAuditEvent = null; // log after broker load task finish

public LoadJob(EtlJobType jobType) {
this.jobType = jobType;
Expand Down Expand Up @@ -1172,4 +1174,8 @@ public LoadStatistic getLoadStatistic() {
public void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}

public void setLoadAuditEvent(AuditEvent auditEvent) {
this.loadAuditEvent = auditEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
Expand All @@ -31,6 +32,7 @@
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.FailMsg;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo.QueryType;
Expand Down Expand Up @@ -83,6 +85,8 @@ public class LoadLoadingTask extends LoadTask {

private List<TPipelineWorkloadGroup> tWorkloadGroups = null;

private AuditEvent auditEvent = null;

public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate,
Expand Down Expand Up @@ -176,6 +180,15 @@ private void executeOnce() throws Exception {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator, QueryType.BROKER_LOAD);
actualExecute(curCoordinator, timeoutS);
} finally {
try {
if (auditEvent != null) {
auditEvent.queryId = DebugUtil.printId(loadId);
auditEvent.queryTime = System.currentTimeMillis() - beginTime;
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEvent);
}
} catch (Throwable t) {
LOG.info("broker load audit log failed, ", t);
}
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
}
Expand Down Expand Up @@ -234,4 +247,8 @@ void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}

void setAuditEvent(AuditEvent auditEvent) {
this.auditEvent = auditEvent;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -149,6 +150,7 @@ public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserExcept
loadJob.settWorkloadGroups(
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get()));
}
loadJob.setLoadAuditEvent(AuditLogHelper.getAuditEvent(ConnectContext.get(), stmt.toSql(), stmt, null, false));

Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.plugin.audit.AuditEvent;
import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.audit.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
Expand All @@ -38,6 +39,12 @@ public class AuditLogHelper {

public static void logAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt,
org.apache.doris.proto.Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
AuditEvent auditEvent = getAuditEvent(ctx, origStmt, parsedStmt, statistics, printFuzzyVariables);
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEvent);
}

public static AuditEvent getAuditEvent(ConnectContext ctx, String origStmt, StatementBase parsedStmt,
org.apache.doris.proto.Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
origStmt = origStmt.replace("\n", " ");
// slow query
long endTime = System.currentTimeMillis();
Expand Down Expand Up @@ -119,6 +126,7 @@ public static void logAuditLog(ConnectContext ctx, String origStmt, StatementBas
}
}
}
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build());
return auditEventBuilder.build();
}

}

0 comments on commit 419d693

Please sign in to comment.