ELK学习笔记(二)数据同步

本文主要介绍如何使用ELK工具,将MySQL中的数据同步至ElasticSearch.

本文主要分为以下三个部分:

  • 环境准备:包括启动MySQL与ELK,在MySQL与ES中分别创建对应的表格,开发Python脚本用于往MySQL中创建表格、批量插入数据
  • 全量数据同步:将MySQL表中数据全量同步至ES
  • 增量数据同步:将MySQL表中数据增量同步至ES
ELK实现数据同步

环境准备

使用docker-compose启动mysql, docker-compose.yml如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
version: '3'

services:
mysql8.0:
image: mysql:8.0
container_name: mysql8.0
restart: always
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: orm_test
ports:
- 3306:3306
volumes:
- ./mysql/data/:/var/lib/mysql/
- ./mysql/conf/:/etc/mysql/conf.d/
- ./mysql/init/:/docker-entrypoint-initdb.d/
command:
# 将mysql8.0默认密码策略 修改为 原先 策略 (mysql8.0对其默认策略做了更改 会导致密码无法匹配)
--default-authentication-plugin=mysql_native_password
--character-set-server=utf8mb4
--collation-server=utf8mb4_general_ci
--explicit_defaults_for_timestamp=true

使用Python中的sqlalchemy + pymysql往MySQL中创建users表格,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: mysql_create_table.py
# @time: 2023/12/23 23:29
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, DATETIME
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()


class Users(Base):
__tablename__ = 'users'

id = Column(INTEGER, primary_key=True)
name = Column(VARCHAR(256), nullable=False)
age = Column(INTEGER)
place = Column(VARCHAR(256), nullable=False)
insert_time = Column(DATETIME)

def __init__(self, id, name, age, place, insert_time):
self.id = id
self.name = name
self.age = age
self.place = place
self.insert_time = insert_time


def init_db():
engine = create_engine(
"mysql+pymysql://root:root@localhost:3306/orm_test",
echo=True
)
Base.metadata.create_all(engine)
print('Create table successfully!')


if __name__ == '__main__':
init_db()

users表结构如下:

1
2
3
4
5
6
7
8
9
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| name | varchar(256) | NO | | NULL | |
| age | int | YES | | NULL | |
| place | varchar(256) | NO | | NULL | |
| insert_time | datetime | YES | | NULL | |
+-------------+--------------+------+-----+---------+----------------+

插入5条数据,脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: mysql_insert_data.py
# @time: 2023/12/23 23:29
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from mysql_create_table import Users
from datetime import datetime as dt

def get_time():
time.sleep(5)
return dt.now().strftime("%Y-%m-%d %H:%M:%S")

def insert_data():
# 初始化数据库连接
engine = create_engine("mysql+pymysql://root:root@localhost:3306/orm_test")
# 创建DBSession类型
DBSession = sessionmaker(bind=engine)

# 创建session对象
session = DBSession()
# 插入单条数据
# 创建新User对象
new_user = Users(id=1, name='Jack', age=25, place='USA', insert_time=get_time())
# 添加到session
session.add(new_user)
# 提交即保存到数据库
session.commit()

time.sleep(5)

# 插入多条数据
user_list = [Users(id=2, name='Green', age=26, place='UK', insert_time=get_time()),
Users(id=3, name='Alex', age=31, place='GER', insert_time=get_time()),
Users(id=4, name='Chen', age=52, place='CHN', insert_time=get_time()),
Users(id=5, name='Zhang', age=42, place='CHN', insert_time=get_time())
]
session.add_all(user_list)
session.commit()
# 关闭session
session.close()
print('insert into db successfully!')


if __name__ == '__main__':
insert_data()

在需要同步的ES中创建对应的index为mysql_users, 其mapping如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"mysql_users" : {
"mappings" : {
"properties" : {
"age" : {
"type" : "integer"
},
"date_format(insert_time, '%y-%m-%d %h:%i:%s')" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"insert_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"name" : {
"type" : "text"
},
"place" : {
"type" : "text"
},
"user_id" : {
"type" : "integer"
}
}
}
}
}

下载Java链接Mysq的第三方Jar包mysql-connector-java-8.0.16.jar.

全量数据同步

将Mysql中users表的数据全量同步至ES中的mysql_users这个index中. docker-compose.yml文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

version: '3'

services:
mysql8.0:
image: mysql:8.0
container_name: mysql8.0
restart: always
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: orm_test
ports:
- 3306:3306
volumes:
- ./mysql/data/:/var/lib/mysql/
- ./mysql/conf/:/etc/mysql/conf.d/
- ./mysql/init/:/docker-entrypoint-initdb.d/
command:
# 将mysql8.0默认密码策略 修改为 原先 策略 (mysql8.0对其默认策略做了更改 会导致密码无法匹配)
--default-authentication-plugin=mysql_native_password
--character-set-server=utf8mb4
--collation-server=utf8mb4_general_ci
--explicit_defaults_for_timestamp=true
networks:
- mysql_elk_net

logstash:
container_name: logstash-7.17.0
image: docker.elastic.co/logstash/logstash:7.17.0
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/logstash.yml
- ./logstash/data:/usr/share/logstash/data
- ./logstash/pipeline:/usr/share/logstash/pipeline
networks:
- mysql_elk_net
depends_on:
- elasticsearch

elasticsearch:
container_name: elasticsearch-7.17.0
image: elasticsearch:7.17.0
environment:
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
- "http.host=0.0.0.0"
- "node.name=elastic01"
- "cluster.name=cluster_elasticsearch"
- "discovery.type=single-node"
ports:
- "9200:9200"
- "9300:9300"
volumes:
- ./es/plugins:/usr/share/elasticsearch/plugins
- ./es/data:/usr/share/elasticsearch/data
networks:
- mysql_elk_net
depends_on:
- mysql8.0

kibana:
container_name: kibana-7.17.0
image: kibana:7.17.0
ports:
- "5601:5601"
networks:
- mysql_elk_net
depends_on:
- elasticsearch

# 网络配置
networks:
mysql_elk_net:
driver: bridge

logstash.conf配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/data/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql8.0:3306/orm_test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "root"
# 为了格式化日期,需要将date字段转换为字符串
statement => "SELECT id as user_id, name, age, place, date_format(insert_time, '%Y-%m-%d %H:%i:%S') as insert_time from users"
}
}
filter {
mutate {
remove_field => ["@timestamp"]
}
mutate {
remove_field => ["@version"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "mysql_users"
document_id => "%{user_id}"
action => "index"
}
}

对ES中的mysql_users进行查询,如下图:

ES中已同步MySQL表中的全量数据

增量数据同步

docker-compose.yml文件同上述全量数据同步。在增量数据同步时,对logstash.conf修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/data/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql8.0:3306/orm_test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC"
jdbc_user => "root"
jdbc_password => "root"
# 为了格式化日期,需要将date字段转换为字符串
statement => "SELECT id as user_id,name,age,place,date_format(insert_time, '%Y-%m-%d %H:%i:%S') as insert_time from users where insert_time > :sql_last_value"
record_last_run => true
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "insert_time"
last_run_metadata_path => "./user"
schedule => "* * * * *"
}
}
filter {
mutate {
remove_field => ["@timestamp"]
}
mutate {
remove_field => ["@version"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "mysql_users"
document_id => "%{user_id}"
action => "index"
}
}

跟全量抽取的配置相比,上述配置input部分的jdbc插件配置有所改变。

  • statement的SQL语句,末尾加上了where条件,表示只查询insert_time字段大于最后一次抽取标记的数据。
  • record_last_run设置为true,表示要保存抽取标记,抽取标记的保存路径在last_run_metadata_path中指定。
  • tracking_column_type设置为timestamp表示抽取标记字段是时间类型的,如果选择自增长数字主键作为抽取标记,则tracking_column_type应当设置为numeric。
  • 配置tracking_column用于指定抽取标记的列名称,use_column_value设置为true表示使用数据的内容作为抽取标记,否则使用最后一次查询的时间作为抽取标记。
  • 最后schedule用于配置定时抽取的cron表达式,"* * * * *"表示每分钟抽取一次。

执行时,由于第一次没有抽取标记,该脚本会抽取MySQL表users的全部数据到索引mysql_users,抽取完后,会在脚本的当前目录下生成一个user文件,打开它可以看到当前的数据抽取标记,该内容应当为字段insert_time的最新值。

ES中已同步MySQL表中的增量数据

总结

本文主要介绍了如何使用ELK来实现数据同步,将MySQL中的表数据同步至ElasticSearch中,可分为全量数据同步与增量数据同步。实际上,ELK不仅仅能提供MySQL同步至ES,还能做到各种数据库、文件系统等之间的同步,功能十分强大。

感谢大家的阅读~

欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。


ELK学习笔记(二)数据同步
https://percent4.github.io/ELK学习笔记(二)数据同步/
作者
Jclian91
发布于
2024年1月11日
许可协议