Skip to content

Commit

Permalink
merge to d228001a62c9c4f8e87f82eef9a88804dfed2831 (#195)
Browse files Browse the repository at this point in the history
1. modify FE audit log format, for parsing it more convenient
2. fix bug: evaluate conjuncts assigned to SortNode in InlineViewRef
3. fix bug: invalid apache hdfs broker jar in apache_hdfs_broker_java_libraries.tar.gz
  • Loading branch information
morningman authored and imay committed May 22, 2018
1 parent 5065d63 commit 8e6eee3
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 21 deletions.
1 change: 0 additions & 1 deletion fe/src/com/baidu/palo/mysql/MysqlChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public MysqlChannel(SocketChannel channel) {
try {
if (channel.getRemoteAddress() instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();

// avoid calling getHostName() which may trigger a name service reverse lookup
remoteHostString = address.getHostString() + ":" + address.getPort();
remoteIp = address.getAddress().getHostAddress();
Expand Down
3 changes: 3 additions & 0 deletions fe/src/com/baidu/palo/planner/SingleNodePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, long default
}
Preconditions.checkState(root.hasValidStats());
root.init(analyzer);
// TODO chenhao16, before merge ValueTransferGraph, force evaluate conjuncts
// from SelectStmt outside
root = addUnassignedConjuncts(analyzer, root);
} else {
root.setLimit(stmt.getLimit());
root.computeStats(analyzer);
Expand Down
6 changes: 1 addition & 5 deletions fe/src/com/baidu/palo/planner/SortNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN,
this.useTopN = useTopN;
this.isDefaultLimit = isDefaultLimit;
this.tupleIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId()));
this.nullableTupleIds.addAll(input.getNullableTupleIds());
this.children.add(input);
this.offset = offset;
Expand Down Expand Up @@ -198,7 +199,6 @@ public int getNumInstances() {
}

public void init(Analyzer analyzer) throws InternalException {
assignConjuncts(analyzer);
// Compute the memory layout for the generated tuple.
computeStats(analyzer);
// createDefaultSmap(analyzer);
Expand Down Expand Up @@ -233,10 +233,6 @@ public void init(Analyzer analyzer) throws InternalException {
outputSmap = ExprSubstitutionMap.compose(childSmap, outputSmap, analyzer);
info.substituteOrderingExprs(outputSmap, analyzer);

if (info.getSortTupleDescriptor() != null) {
// info.checkConsistency();
}

if (LOG.isDebugEnabled()) {
LOG.debug("sort id " + tupleIds.get(0).toString() + " smap: "
+ outputSmap.debugString());
Expand Down
3 changes: 2 additions & 1 deletion fe/src/com/baidu/palo/qe/AuditBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public void reset() {
}

public void put(String key, Object value) {
sb.append('[').append(key).append('=').append(value.toString()).append(']');
sb.append("|").append(key).append("=").append(value.toString());
}

@Override
public String toString() {
return sb.toString();
}
Expand Down
23 changes: 13 additions & 10 deletions fe/src/com/baidu/palo/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void handlePing() {
ctx.getState().setOk();
}

private void auditAfterExec() {
private void auditAfterExec(String origStmt) {
MetricRepo.COUNTER_REQUEST_ALL.inc();

// slow query
Expand All @@ -98,21 +98,24 @@ private void auditAfterExec() {
ctx.getAuditBuilder().put("state", ctx.getState());
ctx.getAuditBuilder().put("time", elapseMs);
ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows());
String auditString = ctx.getAuditBuilder().toString();

if (auditString.toLowerCase().contains("select") && auditString.toLowerCase().contains("from")) {
if (ctx.getState().isQuery()) {
MetricRepo.COUNTER_QUERY_ALL.inc();
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR
&& ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) {
// err query
MetricRepo.COUNTER_QUERY_ERR.inc();
ctx.getAuditBuilder().put("monitor", "yes");
} else {
// ok query
MetricRepo.METER_QUERY.mark();
MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
}
ctx.getAuditBuilder().put("is_query", 1);
} else {
ctx.getAuditBuilder().put("is_query", 0);
}
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
ctx.getAuditBuilder().put("stmt", origStmt);

AuditLog.getQueryAudit().log(ctx.getAuditBuilder().toString());

Expand Down Expand Up @@ -146,14 +149,13 @@ private void handleQuery() {
ctx.getAuditBuilder().put("client", ctx.getMysqlChannel().getRemoteHostString());
ctx.getAuditBuilder().put("user", ctx.getUser());
ctx.getAuditBuilder().put("db", ctx.getDatabase());
ctx.getAuditBuilder().put("query", stmt.replace("\n", "\\n"));

// execute this query.
try {
executor = new StmtExecutor(ctx, stmt);
executor.execute();
// needForward = executor.isForwardtoMaster();
// outputPacket = executor.getOutputPacket();
// set if this is a QueryStmt
ctx.getState().setQuery(executor.isQueryStmt());
} catch (DdlException e) {
LOG.warn("Process one query failed because DdlException: ", e);
ctx.getState().setError(e.getMessage());
Expand All @@ -165,11 +167,11 @@ private void handleQuery() {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError("Maybe palo bug");
ctx.getState().setError("Unexpected exception: " + e.getMessage());
}

// audit after exec
auditAfterExec();
auditAfterExec(stmt.replace("\n", "\\n"));
}

// Get the column definitions of a table
Expand Down Expand Up @@ -332,7 +334,7 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError("Maybe palo bug");
ctx.getState().setError("Unexpected exception: " + e.getMessage());
}
// no matter the master execute success or fail, the master must transfer the result to follower
// and tell the follwer the current jounalID.
Expand Down Expand Up @@ -388,3 +390,4 @@ public void loop() {
}
}
}

9 changes: 9 additions & 0 deletions fe/src/com/baidu/palo/qe/QueryState.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum ErrType {
private ErrorCode errorCode;
private String infoMessage;
private ErrType errType = ErrType.OTHER_ERR;
private boolean isQuery = false;

public QueryState() {
}
Expand Down Expand Up @@ -85,6 +86,14 @@ public ErrType getErrType() {
return errType;
}

public void setQuery(boolean isQuery) {
this.isQuery = isQuery;
}

public boolean isQuery() {
return isQuery;
}

public String getInfoMessage() {
return infoMessage;
}
Expand Down
8 changes: 7 additions & 1 deletion fe/src/com/baidu/palo/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import com.baidu.palo.thrift.TResultBatch;
import com.baidu.palo.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -167,6 +166,13 @@ public ShowResultSet getShowResultSet() {
}
}

public boolean isQueryStmt() {
if (parsedStmt != null && parsedStmt instanceof QueryStmt) {
return true;
}
return false;
}

// Execute one statement.
// Exception:
// IOException: talk with client failed.
Expand Down
4 changes: 2 additions & 2 deletions fe/src/com/baidu/palo/service/FrontendServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,12 @@ public static String getMiniLoadStmt(TMiniLoadRequest request) throws UnknownHos
private void logMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
String stmt = getMiniLoadStmt(request);
AuditBuilder auditBuilder = new AuditBuilder();
auditBuilder.put("client", request.getBackend().toString());
auditBuilder.put("client", request.getBackend().getHostname() + ":" + request.getBackend().getPort());
auditBuilder.put("user", request.user);
auditBuilder.put("db", request.db);
auditBuilder.put("query", stmt);
auditBuilder.put("state", TStatusCode.OK);
auditBuilder.put("time", "0");
auditBuilder.put("stmt", stmt);

AuditLog.getQueryAudit().log(auditBuilder.toString());
}
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion fs_brokers/apache_hdfs_broker/deps/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ DEPSDIR=`dirname "$0"`
DEPSDIR=`cd ${DEPSDIR}; pwd`

CURDIR=`pwd`
if [ ! -f ${DEPSDIR}/lib/jar/apache_hdfs_broker.jar ]
if [ ! -f ${DEPSDIR}/lib/jar/commons-cli-1.2.jar ]
then
echo "Unpacking dependency libraries..."
cd ${DEPSDIR}
Expand Down

0 comments on commit 8e6eee3

Please sign in to comment.