# docker-compose.yml 示例
version: '3'
services:
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: 123456
MYSQL_DATABASE: test_db
ports:
- "3306:3306"
elasticsearch:
image: elasticsearch:7.17.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
canal-server:
image: canal/canal-server:v1.1.6
environment:
canal.instance.master.address: mysql:3306
canal.instance.dbUsername: root
canal.instance.dbPassword: 123456
depends_on:
- mysql
canal-adapter:
image: slpcat/canal-adapter:latest
environment:
canal.conf.mode: http
depends_on:
- canal-server
- elasticsearch
# canal.properties
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
# 启用MQ模式(可选)
canal.serverMode = kafka
canal.mq.servers = localhost:9092
# application.yml
canal.conf:
mode: tcp # kafka, rocketMQ, rabbitMQ
canalServerHost: 127.0.0.1:11111
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/test_db?useSSL=false
username: root
password: 123456
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: es7
hosts: 127.0.0.1:9200
properties:
mode: rest
# security.auth: test:123456
cluster.name: elasticsearch
# es7/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest_user
_id: _id
sql: "SELECT id AS _id, name, email, age, created_at FROM user"
etlCondition: "where created_at>={}"
commitBatch: 3000
// 自定义Canal客户端示例
public class ESDataSyncClient {
public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
int batchSize = 1000;
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
if (batchId != -1) {
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
processRowData(entry);
}
}
connector.ack(batchId);
}
}
}
private static void processRowData(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(
entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
handleInsert(rowData, entry.getHeader().getTableName());
break;
case UPDATE:
handleUpdate(rowData, entry.getHeader().getTableName());
break;
case DELETE:
handleDelete(rowData, entry.getHeader().getTableName());
break;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void handleInsert(CanalEntry.RowData rowData, String tableName) {
Map<String, Object> source = new HashMap<>();
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
for (CanalEntry.Column column : columns) {
source.put(column.getName(), column.getValue());
}
// 同步到Elasticsearch
IndexRequest request = new IndexRequest("mytest_user")
.id(source.get("id").toString())
.source(source);
// 执行ES操作
}
}
// 批量写入ES提高性能
BulkRequest bulkRequest = new BulkRequest();
for (Document doc : documents) {
IndexRequest request = new IndexRequest("index")
.id(doc.getId())
.source(doc.getSource());
bulkRequest.add(request);
}
BulkResponse response = client.bulk(bulkRequest);
// 自定义数据转换
public class DataTransformer {
public static Map<String, Object> transform(Map<String, Object> source) {
Map<String, Object> target = new HashMap<>();
// 字段映射
target.put("userName", source.get("name"));
target.put("userAge", Integer.parseInt(source.get("age").toString()));
// 时间格式转换
target.put("createTime", formatDate(source.get("created_at")));
return target;
}
}
# adapter重试配置
canalAdapters:
- instance: example
retries: 3
timeout: 5000
errorHandler:
type: log_and_continue
deadLetterQueue: dlq_es_sync
# Canal Server状态检查
curl http://localhost:11111/health
# ES索引状态
curl http://localhost:9200/_cat/indices?v
# Prometheus监控配置示例
- job_name: 'canal'
static_configs:
- targets: ['localhost:11111']
- job_name: 'elasticsearch'
static_configs:
- targets: ['localhost:9200']
索引设计
同步策略
容错处理
性能优化
数据不一致
同步延迟
内存溢出
这种架构既能保证数据同步的实时性,又能通过合理的配置确保系统的稳定性和可维护性。