-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathexample.ts
89 lines (77 loc) · 2.79 KB
/
example.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import CanalConnector from "../src/canal-connector";
import CanalConnectors from "../src/canal-connectors";
import Message from "../src/Message";
import { com } from '../src/gen/canal-proto'
import protocol = com.alibaba.otter.canal.protocol;
(async function () {
const host: string = '127.0.0.1';
const port: number = 11111;
const destination: string = 'example';
const username: string = '';
const password: string = '';
let connector: CanalConnector = CanalConnectors.newSingleConnector({
host,
port,
destination,
username,
password,
filter: '.*\..*'
});
try {
await connector.connect();
while (true) {
let message: Message = await connector.getWithoutAck(1);
let batchId: number = message.id;
if (batchId != -1 && message.entries?.length) {
try {
printEntry(message.entries);
} finally {
await connector.ack(batchId);
}
}
await sleep(3_000);
}
} catch(e: any) {
console.error('Occur error', e as Error);
} finally {
connector.isConnect() && connector.disconnect();
}
})();
function sleep(millis: number): Promise<void> {
return new Promise((resolve, _) => setTimeout(resolve, millis));
}
function printEntry(entries: protocol.Entry[]): void {
for (let entry of entries) {
if (entry.entryType === protocol.EntryType.TRANSACTIONBEGIN ||
entry.entryType === protocol.EntryType.TRANSACTIONEND) {
console.log('Transaction entry, continue.');
continue;
}
let rowChage = protocol.RowChange.decode(entry.storeValue);
if (rowChage.isDdl) {
console.log('Ddl entry, sql:', rowChage.sql);
}
for (let rowData of rowChage.rowDatas) {
switch (rowChage.eventType) {
case protocol.EventType.DELETE:
printColumns(rowChage.eventType, rowData.beforeColumns);
break;
case protocol.EventType.INSERT:
printColumns(rowChage.eventType, rowData.afterColumns);
break;
default:
printColumns(rowChage.eventType, rowData.beforeColumns);
printColumns(rowChage.eventType, rowData.afterColumns);
break;
}
}
}
}
function printColumns(eventType: protocol.EventType | null | undefined,
columns: protocol.IColumn[] | null | undefined): void {
if (columns) {
for (const column of columns) {
console.log(`[${eventType}] ${column.name} : ${column.value} <${column.updated ? 'updated' : ''}>`)
}
}
}