Skip to content

Commit

Permalink
Merge pull request #828 from lcybo/master
Browse files Browse the repository at this point in the history
canal 性能指标采集
  • Loading branch information
agapple authored Aug 10, 2018
2 parents d7f48f5 + ec00792 commit fb1126c
Show file tree
Hide file tree
Showing 40 changed files with 13,910 additions and 13,369 deletions.
12 changes: 6 additions & 6 deletions deployer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
</dependency>

<!-- 这里指定runtime的metrics provider-->
<!--<dependency>-->
<!--<groupId>com.alibaba.otter</groupId>-->
<!--<artifactId>canal.prometheus</artifactId>-->
<!--<version>${project.version}</version>-->
<!--<scope>runtime</scope>-->
<!--</dependency>-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.prometheus</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
Expand Down
15 changes: 0 additions & 15 deletions deployer/src/main/bin/metrics_env.sh

This file was deleted.

7 changes: 1 addition & 6 deletions deployer/src/main/bin/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,7 @@ then
echo LOG CONFIGURATION : $logback_configurationFile
echo canal conf : $canal_conf
echo CLASSPATH :$CLASSPATH
# metrics support options
# if [ -x $base/bin/metrics_env.sh ]; then
# . $base/bin/metrics_env.sh
# echo METRICS_OPTS $METRICS_OPTS
# fi
$JAVA $JAVA_OPTS $METRICS_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal.log 2>&1 &
echo $! > $base/bin/canal.pid

echo "cd to $current_path for continue"
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ EXPOSE 2222 11111 8000 8080
WORKDIR /home/admin

ENTRYPOINT [ "/alidata/bin/main.sh" ]
CMD [ "/home/admin/app.sh" ]
CMD [ "/home/admin/app.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Collections;
import java.util.List;

import com.alibaba.otter.canal.meta.FileMixedMetaManager;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -38,12 +39,7 @@
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
import com.alibaba.otter.canal.parse.index.*;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.sink.entry.EntryEventSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.RandomUtils;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void uncaughtException(Thread t,




protected abstract BinlogParser buildParser();

protected abstract ErosaConnection buildErosaConnection();
Expand Down Expand Up @@ -643,4 +645,5 @@ public long getServerId() {
public void setServerId(long serverId) {
this.serverId = serverId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public void add(CanalEntry.Entry entry) throws InterruptedException {
flush();
}
break;
case HEARTBEAT:
// master过来的heartbeat,说明binlog已经读完了,是idle状态
put(entry);
flush();
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.alibaba.otter.canal.parse.inbound.mysql;

import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,23 +22,24 @@

public abstract class AbstractMysqlEventParser extends AbstractEventParser {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final long BINLOG_START_OFFEST = 4L;
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected static final long BINLOG_START_OFFEST = 4L;

protected TableMetaTSDBFactory tableMetaTSDBFactory = new DefaultTableMetaTSDBFactory();
protected boolean enableTsdb = false;
protected TableMetaTSDBFactory tableMetaTSDBFactory = new DefaultTableMetaTSDBFactory();
protected boolean enableTsdb = false;
protected String tsdbSpringXml;
protected TableMetaTSDB tableMetaTSDB;

// 编码信息
protected byte connectionCharsetNumber = (byte) 33;
protected Charset connectionCharset = Charset.forName("UTF-8");
protected boolean filterQueryDcl = false;
protected boolean filterQueryDml = false;
protected boolean filterQueryDdl = false;
protected boolean filterRows = false;
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;
protected byte connectionCharsetNumber = (byte) 33;
protected Charset connectionCharset = Charset.forName("UTF-8");
protected boolean filterQueryDcl = false;
protected boolean filterQueryDml = false;
protected boolean filterQueryDdl = false;
protected boolean filterRows = false;
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;
private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L);

protected BinlogParser buildParser() {
LogEventConvert convert = new LogEventConvert();
Expand Down Expand Up @@ -131,11 +133,13 @@ public void setEventBlackFilter(CanalEventFilter eventBlackFilter) {
}

protected MultiStageCoprocessor buildMultiStageCoprocessor() {
return new MysqlMultiStageCoprocessor(parallelBufferSize,
parallelThreadSize,
(LogEventConvert) binlogParser,
transactionBuffer,
destination);
MysqlMultiStageCoprocessor mysqlMultiStageCoprocessor = new MysqlMultiStageCoprocessor(parallelBufferSize,
parallelThreadSize,
(LogEventConvert) binlogParser,
transactionBuffer,
destination);
mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime);
return mysqlMultiStageCoprocessor;
}

// ============================ setter / getter =========================
Expand Down Expand Up @@ -204,4 +208,8 @@ public void setTableMetaTSDBFactory(TableMetaTSDBFactory tableMetaTSDBFactory) {
this.tableMetaTSDBFactory = tableMetaTSDBFactory;
}

public AtomicLong getEventsPublishBlockingTime() {
return this.eventsPublishBlockingTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
Expand Down Expand Up @@ -39,18 +40,20 @@

public class MysqlConnection implements ErosaConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private MysqlConnector connector;
private long slaveId;
private Charset charset = Charset.forName("UTF-8");
private BinlogFormat binlogFormat;
private BinlogImage binlogImage;
private MysqlConnector connector;
private long slaveId;
private Charset charset = Charset.forName("UTF-8");
private BinlogFormat binlogFormat;
private BinlogImage binlogImage;

// tsdb releated
private AuthenticationInfo authInfo;
protected int connTimeout = 5 * 1000; // 5秒
protected int soTimeout = 60 * 60 * 1000; // 1小时
private AuthenticationInfo authInfo;
protected int connTimeout = 5 * 1000; // 5秒
protected int soTimeout = 60 * 60 * 1000; // 1小时
// dump binlog bytes, 暂不包括meta与TSDB
private AtomicLong receivedBinlogBytes;

public MysqlConnection(){
}
Expand Down Expand Up @@ -126,6 +129,7 @@ public void seek(String binlogfilename, Long binlogPosition, SinkFunction func)
decoder.handle(LogEvent.XID_EVENT);
LogContext context = new LogContext();
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogEvent event = null;
event = decoder.decode(fetcher, context);

Expand All @@ -148,6 +152,7 @@ public void dump(String binlogfilename, Long binlogPosition, SinkFunction func)
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogEvent event = null;
event = decoder.decode(fetcher, context);

Expand Down Expand Up @@ -176,6 +181,7 @@ public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogEvent event = null;
event = decoder.decode(fetcher, context);

Expand Down Expand Up @@ -206,6 +212,7 @@ public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocess
try {
fetcher.start(connector.getChannel());
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogBuffer buffer = fetcher.duplicate();
fetcher.consume(fetcher.limit());
if (!coprocessor.publish(buffer)) {
Expand All @@ -232,6 +239,7 @@ public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOEx
try {
fetcher.start(connector.getChannel());
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogBuffer buffer = fetcher.duplicate();
fetcher.consume(fetcher.limit());
if (!coprocessor.publish(buffer)) {
Expand Down Expand Up @@ -346,7 +354,6 @@ public long queryServerId() throws IOException {
* <li>net_read_timeout</li>
* </ol>
*
* @param channel
* @throws IOException
*/
private void updateSettings() throws IOException {
Expand Down Expand Up @@ -465,6 +472,14 @@ private void loadBinlogImage() {
}
}

private void accumulateReceivedBytes(long x) {
if (receivedBinlogBytes != null) {
receivedBinlogBytes.addAndGet(x);
}
}



public static enum BinlogFormat {

STATEMENT("STATEMENT"), ROW("ROW"), MIXED("MIXED");
Expand Down Expand Up @@ -604,4 +619,8 @@ public void setAuthInfo(AuthenticationInfo authInfo) {
this.authInfo = authInfo;
}

public void setReceivedBinlogBytes(AtomicLong receivedBinlogBytes) {
this.receivedBinlogBytes = receivedBinlogBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
private BinlogImage[] supportBinlogImages; // 支持的binlogImage,如果设置会执行强校验

// update by yishun.chen,特殊异常处理参数
private int dumpErrorCount = 0; // binlogDump失败异常计数
private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值
private int dumpErrorCount = 0; // binlogDump失败异常计数
private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值

// instance received binlog bytes
private final AtomicLong receivedBinlogBytes = new AtomicLong(0L);

protected ErosaConnection buildErosaConnection() {
return buildMysqlConnection(this.runningInfo);
Expand Down Expand Up @@ -313,6 +316,7 @@ private MysqlConnection buildMysqlConnection(AuthenticationInfo runningInfo) {
connection.getConnector().setSendBufferSize(sendBufferSize);
connection.getConnector().setSoTimeout(defaultConnectionTimeoutInSeconds * 1000);
connection.setCharset(connectionCharset);
connection.setReceivedBinlogBytes(receivedBinlogBytes);
// 随机生成slaveId
if (this.slaveId <= 0) {
this.slaveId = generateUniqueServerId();
Expand Down Expand Up @@ -511,7 +515,7 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
private Long findTransactionBeginPosition(ErosaConnection mysqlConnection, final EntryPosition entryPosition)
throws IOException {
// 针对开始的第一条为非Begin记录,需要从该binlog扫描
final AtomicLong preTransactionStartPosition = new AtomicLong(0L);
final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
mysqlConnection.reconnect();
mysqlConnection.seek(entryPosition.getJournalName(), 4L, new SinkFunction<LogEvent>() {

Expand Down Expand Up @@ -910,4 +914,10 @@ public void setDumpErrorCountThreshold(int dumpErrorCountThreshold) {
this.dumpErrorCountThreshold = dumpErrorCountThreshold;
}



public AtomicLong getReceivedBinlogBytes() {
return this.receivedBinlogBytes;
}

}
Loading

0 comments on commit fb1126c

Please sign in to comment.