ES 译文之如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同

译者前言
近期的主要工作是在为公司的 APP 增加搜索功能。因为也遇到了需要把关系型数据库中的数据同步 ElasticSearch 中的问题,故抽了点时间翻译了这篇官方的博文。最近,在数据同步方面也有些思考。
本篇文章的重点不在 Logstash 的 JDBC 插件的使用方法,而是数据同步会遇到的一些细节问题如何处理。我觉得,这些设计思想是通用的,无论你使用的何种方式进行数据同步。
翻译正文

为了利用 ElasticSearch 强大的搜索能力,大部分的业务都会在关系型数据库的基础上部署 ElasticSearch。这类场景下,保持 ElasticSearch 和关系型数据库之间的数据同步是非常必要的。
本篇博文将会介绍如何通过 Logstash 实现在 MySQL 和 ElasticSearch 之间数据的高效复制与同步。
注:文中演示的代码和方法都经过在 MySQL 中的测试,理论上适应于所有的关系型数据库。
本文中,组件的相关信息如下:

MySQL:  8.0.16.
Elasticsearch: 7.1.1
Logstash: 7.1.1
Java: 1.8.0_162-b12
JDBC input plugin: v4.3.13
JDBC connector: Connector/J 8.0.16

数据同步概述
本文将会通过 Logstash 的 JDBC input 插件进行 ElasticSearch 和 MySQL 之间的数据同步。从概念上讲,JDBC 插件将通过周期性的轮询以发现上次迭代后的新增和更新的数据。为了正常工作,几个条件需要满足:
ElasticSearch 中 _id 设置必须来自 MySQL 中 id 字段。它提供了 MySQL 和 ElasticSearch 之间文档数据的映射关系。如果一条记录在 MySQL 更新,那么,ElasticSearch 所有关联文档都应该被重写。要说明的是,重写 ElasticSearch 中的文档和更新操作的效率相同。在内部实现上,一个更新操作由删除一个旧文档和创建一个新文档两部分组成。
当 MySQL 中插入或更新一条记录时,必须包含一个字段用于保存字段的插入或更新时间。如此一来, Logstash 就可以实现每次请求只获取上次轮询后更新或插入的记录。Logstash 每次轮询都会保存从 MySQL 中读取到的最新的插入或更新时间,该时间大于上次轮询最新时间。
如果满足了上述条件,我们就可以配置 Logstash 周期性的从 MySQL 中读取所有最新更新或插入的记录,然后写入到 Elasticsearch 中。
关于 Logstash 的配置代码,本文稍后会给出。
MySQL 设置
MySQL 库和表的配置如下:

CREATE DATABASE es_db

USE es_db

DROP TABLE IF EXISTS es_table

CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

配置中有几点需要说明,如下:

es_table,MySQL 的数据表,我们将把它的数据同步到 ElasticSearch 中;
id,记录的唯一标识。注意,id 定义为主键的同时,也定义为唯一建,可以保证每个 id 在表中只出现一次。同步 ElasticSearch 时,将会转化为文档的 _id;
client_name,表示用户定义用来保存数据的字段,为使博文保持简洁,我们只定义了一个字段,更多字段也很容易加入。接下来的演示,我们会更新该字段,用以说明不仅仅新插入记录会同步到 MySQL,更新记录同样会同步到 MySQL;
modification_time,用于保存记录的更新或插入时间,它使得 Logstash 可以在每次轮询时只请求上次轮询后新增更新的记录;
insertion_time,该字段用于一条记录插入时间,主要是为演示方便,对同步而言,并非必须;

MySQL 操作
前面设置完成,我们可以通过如下命令插入记录:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);`

使用如下命令更新记录:

UPDATE es_table SET client_name = <new client name> WHERE id=<id>;

使用如下命令更新插入记录:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>

同步代码
Logstash 的 pipeline 配置代码如下,它实现了前面描述的功能,从 MySQL 到 ElasticSearch 的数据同步。

input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => "<my username>"
    jdbc_password => "<my password>"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *",
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  # stdout { codec => "rubydebug" }
  elasticsearch {
    index => "rdbms_sync_idx"
    document_id => "%{[%metedata][_id]}"
  }
}

关于 Pipeline 配置的几点说明,如下:

tracking_column

此处配置为 "unix_ts_in_secs"。它被用于追踪最新的记录,并被保存在 .logstash_jdbc_last_run 文件中,下一次轮询将以这个边界位置为准进行记录获取。SELECT 语句中,可通过 :sql_last_value 访问该配置字段的值。

unix_ts_in_secs

由 SELECT 语句生成,是 "modification_time" 的 UNIX TIMESTAMP。它被前面讨论的 "track_column" 引用。使用 UNIX TIMESTAMP,而非其他时间形式,可以减少复杂性,防止时区导致的时间不一致问题。

sql_last_value

内建的配置参数,指定每次轮询的开始位置。在 input 配置中,可被 SELECT 语句引用。在每次轮询开始前,从 .logstash_jdbc_last_run 中读取,此案例中,即为 "unix_ts_in_secs" 的最近值。如此便可保证每次轮询只获取最新插入和更新的记录。

schedule

通过 cron 语法指定轮询的执行周期,例子中,"/5 " 表示每 5 秒轮询一次。

modification_time < NOW()

SELECT 语句查询条件的一部分,当前解释不清,具体情况待下面的章节再作介绍。

filter

该配置指定将 MySQL 中的 id 复制到 metadata 字段 _id 中,用以确保 ElasticSearch 中的文档写入正确的 _id。而之所以使用 metadata,因为它是临时的,不会使文档中产生新的字段。同时,我们也会把不希望写入 Elasticsearch 的字段 id 和 @version 移除。

output

在 output 输出段的配置,我们指定了文档应该被输出到 ElasticSearch,并且设置输出文档 _id 为 filter 段创建的 metadata 的 _id。如果需要调试,注释部分的 rubydebug 可以实现。
SELECT 语句的正确性分析
接下来,我们将开始解释为什么 SELECT 语句中包含 modification_time < NOW() 是非常重要的。为了解释这个问题,我们将引入两个反例演示说明,为什么下面介绍的两种最直观的方法是错误的。还有,为什么 modification_time < Now() 可以克服这些问题。
直观场景一
当 where 子句中仅仅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而没有 modification < Now() 的情况下,工作是否正常。这个场景下,SELECT 语句是如下形式:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time)
AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >
:sql_last_value) ORDER BY modification_time ASC"

粗略一看,似乎没发现什么问题,应该可以正常工作。但其实,这里有一些边界情况,可能导致一些文档的丢失。举个例子,假设 MySQL 每秒插入两个文档,Logstash 每 5 秒执行一次。如下图所示,时间范围 T0 至 T10,数据记录 R1 至 R22。

Logstash 的第一次轮询发生在 T5 时刻,读取记录 R1 至 R11,即图中青色区域。此时,sql_last_value 即为 T5,这个时间是从 R11 中获取到的。
如果,当 Logstash 完成从 MySQL 读取数据后,同样在 T5 时刻,又有一条记录插入到 MySQL 中。 而下一次的轮询只会拉取到大于 T5 的记录,这意味着 R12 将会丢失。如图所示,青色和灰色区域分别表示当次和上次轮询获取到的记录。

注意,这类场景下的 R12 将永远不会再被写入到 ElasticSearch。
直观场景二
为了解决这个问题,或许有人会想,如果把 where 子句中的大于(>)改为大于等于(>=)是否可行。SELECT 语句如下

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"

这种方式其实也不理想。这种情况下,某些文档可能会被两次读取,重复写入到 ElasticSearch 中。虽然这不影响结果的正确性,但却做了多余的工作。如下图所示,Logstash 的首次轮询和场景一相同,青色区域表示已经读取的记录。

Logstash 的第二次轮询将会读取所有大于等于 T5 的记录。如下图所示,注意 R11,即紫色区域,将会被再次发送到 ElasticSearch 中。

这两种场景的实现效果都不理想。场景一会导致数据丢失,这是无法容忍的。场景二,存在重复读取写入的问题,虽然对数据正确性没有影响,但执行了多余的 IO。
终极方案
前面的两场方案都不可行,我们需要继续寻找其他解决方案。其实也很简单,通过指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),我们就可以保证每条记录有且只发送一次。
如下图所示,Logstash 轮询发生在 T5 时刻。因为指定了 modification_time < NOW(),文档只会读取到 T4 时刻,并且 sql_last_value 的值也将会被设置为 T4。

开始下一次的轮询,当前时间 T10。
由于设置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,并且当前 sql_last_value 为 T4,因此,本次的轮询将从 T5 开始。而 modification_time < NOW() 决定了只有时间小于等于 T9 的记录才会被读取。最后,sql_last_value 也将被设置为 T9。

如此,MySQL 中的每个记录就可以做到都能被精确读取了一次,如此就可以避免每次轮询可能导致的当前时间间隔内数据丢失或重复读取的问题。
系统测试
简单的测试可以帮助我们验证配置是否如我们所愿。我们可以写入一些数据至数据库,如下:

INSERT INTO es_table (id, client_name) VALUES (1, ‘Jim Carrey‘);
INSERT INTO es_table (id, client_name) VALUES (2, ‘Mike Myers‘);
INSERT INTO es_table (id, client_name) VALUES (3, ‘Bryan Adams‘);

一旦 JDBC 输入插件触发执行,将会从 MySQL 中读取所有记录,并写入到 ElasticSearch 中。我们可以通过查询语句查看 ElasticSearch 中的文档。

`GET rdbms_sync_idx/_search`

执行结果如下:

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "insertion_time" : "2019-06-18T12:58:56.000Z",
          "@timestamp" : "2019-06-18T13:04:27.436Z",
          "modification_time" : "2019-06-18T12:58:56.000Z",
          "client_name" : "Jim Carrey"
        }
      },
Etc …

更新 id=1 的文档,如下:

UPDATE es_table SET client_name = ‘Jimbo Kerry‘ WHERE id=1;

通过 _id = 1,可以实现文档的正确更新。通过执行如下命令查看文档:

GET rdbms_sync_idx/_doc/1

结果如下:

{
  "_index" : "rdbms_sync_idx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "insertion_time" : "2019-06-18T12:58:56.000Z",
    "@timestamp" : "2019-06-18T13:09:30.300Z",
    "modification_time" : "2019-06-18T13:09:28.000Z",
    "client_name" : "Jimbo Kerry"
  }
}

文档 _version 被设置为 2,并且 modification_time 和 insertion_time 已经不一样了,client_name 已经正确更新。而 @timestamp,不是我们需要关注的,它是 Logstash 默认添加的。
更新添加 upsert 执行语句如下:

INSERT INTO es_table (id, client_name) VALUES (4, ‘Bob is new‘) ON DUPLICATE KEY UPDATE client_name=‘Bob exists already‘;

复制代码和之前一样,我们可以通过查看 ElasticSearch 中相应文档,便可验证同步的正确性。
文档删除
不知道你是否已经发现,如果一个文档从 MySQL 中删除,并不会同步到 ElasticSearch 。关于这个问题,列举一些可供我们考虑的方案,如下:
MySQL 中的记录可通过包含 is_deleted 字段用以表明该条记录是否有效。一旦发生更新,is_deleted 也会同步更新到 ElasticSearch 中。如果通过这种方式,在执行 MySQL 或 ElasticSearch 查询时,我们需要重写查询语句来过滤掉 is_deleted 为 true 的记录。同时,需要一些后台进程将 MySQL 和 ElasticSearch 中的这些文档删除。
另一个可选方案,应用系统负责 MySQL 和 ElasticSearch 中数据的删除,即应用系统在删除 MySQL 中数据的同时,也要负责将 ElasticSearch 中相应的文档删除。
总结
本文介绍了如何通过 Logstash 进行关系型数据库和 ElasticSearch 之间的数据同步。文中以 MySQL 为例,但理论上,演示的方法和代码也应该同样适应于其他的关系型数据库。

原文地址:https://blog.51cto.com/14207399/2419450

时间: 07-11

ES 译文之如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同的相关文章

mysql基本认识【关系型数据库和nosql、mysql操作流程和体系,库操作,表操作,数据的操作,字符集的操作,以及php作为client操作数据库】对连接本身没有疑问

1.关系型数据库永久性保存数据的仓库php的变量只是php脚本执行期间,临时性保存变量的空间[使用内存空间临时保存] 关系型数据库:利用二者的关系来描述实体的信息.[利用二维表字段名和字段值来进行描述][关系型数据库根本不是可以使用外键将两个表构建成关联的意思,而是实现描述实体的二维表的形式] nosql:not only sql[sql表示操作关系型数据的语言]所以nosql指的就是非关系型数据库[典型的是键值对型的数据(redis.memcache)][nosql可以视情况添加信息,不需要对

形象了解关系型数据库三大范式

接下来就对每一级范式进行一下解释,首先是第一范式(1NF).符合1NF的关系(你可以理解为数据表."关系"和"关系模式"的区别,类似于面向对象程序设计中"类"与"对象"的区别."关系"是"关系模式"的一个实例,你可以把"关系"理解为一张带数据的表,而"关系模式"是这张数据表的表结构.1NF的定义为:符合1NF的关系中的每个属性都不可再分.表1所示的

关系型数据库和文档型数据库的比较

关系型数据库 比较 nosql和关系型数据库比较? 关系型数据库与NOSQL数据库的区别 10个出色的NoSQL数据库 15个nosql数据库 走近NoSQL数据库的四大家族 深度解读

关系型数据库与非关系型数据库的区别

在关系型数据库中,导致性能欠佳的最主要因素是多表的关联查询,以及复杂的数据分析类型的复杂SQL报表查询.     为了保证数据库的ACID特性(ACID:● 安全存储合适的数据  ●快速检索合适的数据  ●支持多个并行的用户会话 ), 我们必须尽量按照其要求的范式进行设计,关系型数据库中的表都是存储一些格式化的数据结构,每个元组字段的组成都一样,即使不是每个元组都需要所有的字段,但数据库会为每个元组分配所有的字段,这样的结构可以便于表与表之间进行连接等操作,但从另一个角度来说它也是关系型数据库性

关系型数据库

最初人们都是以纸质文件保存数据,为了保存这些纸质文档,我们可能需要很多的文件夹和文件柜,而且需要对他们进行编号排列,而且如果需要进行查找,修改会极为的不方便,在计算机出现以后我们可以使用电子文件保存数据,大大减少了存储问题,但是同样的进行查找修改数据存在着诸多不便,因此提出数据库的概率,方便进行数据的集中控制. 关系型数据库是数据库的一种模式,采用关系模型,将世界看成实体和联系组成,实体通过其具备的属性来描述,将实体按照属性存放到关系数据库的核心单元--表中,表中的每一行代表一个实体对象,每一列

图数据库与关系型数据库的区别与联系

最近我在用图形数据库来完成对一个初创项目的支持.在使用过程中觉得这种图形数据库实际上挺有意思的.因此在这里给大家做一个简单的介绍. NoSQL数据库相信大家都听说过.它们常常可以用来处理传统的关系型数据库所难以解决的一系列问题.通常情况下,这些NoSQL数据库分为Graph,Document,Column Family以及Key-Value Store等四种.这四种类型的数据库分别使用了不同的数据结构来记录数据.因此它们所适用的场景也不尽相同. 其中最为特别的便是图形数据库了.可以说,它和其它的

&#8203;NOSQL与关系型数据库的区别

关系型数据库存在的瓶颈 1.高并发读写需求网站的用户并发性非常高,往往达到每秒上万次读写请求,对于传统关系型数据库来说,硬盘I/O是一个很大的瓶颈 2.海量数据的高效率读写网站每天产生的数据量是巨大的,对于关系型数据库来说,在一张包含海量数据的表中查询,效率是非常低的 3.高扩展性和可用性在基于web的结构当中,数据库是最难进行横向扩展的,当一个应用系统的用户量和访问量与日俱增的时候,数据库却没有办法像web server和app server那样简单的通过添加更多的硬件和服务节点来扩展性能和负

mysql系列之一关系型数据库基础理论

mysql-关系型数据库基础理论 关系模型:(结构化数据模型) 关系模型 实体- 关系模型 对象关系模型:基于对象的数据模型 半结构化数据模型 关系:关系代数运算 交集: 并集: 差集: 全集: 补集: SQL:结构化查询语言 DML:数据操作语言 insert delete select update DDL:数据定义语言 create drop alter DCL:数据控制语言 grant revoke 关系型数据库 表示层:表 逻辑层:存储引擎 物理层:数据文件 数据存储和查询 存储管理器

关系型数据库知识小结

一.基础术语 DML(data manipulation language): 如SELECT.UPDATE.INSERT.DELETE,主要用来对数据库里的数据进行操作的语言 DDL(data definition language): 主要的命令有CREATE.ALTER.DROP等,DDL主要是用在定义或改变表(TABLE)的结构,数据类型,表之间的链接和约束等初始化工作上,大多在建立表时使用. DCL(Data Control Language):数据库控制功能.是用来设置或更改数据库用

关系型数据库和非关系型数据库

目前的市面上的关系型数据有:Oracle.DB2.Microsoft SQL Server.Microsoft Access.MySQL等. 非关系型数据库有:NOSQL,Cloudant 非关系型数据库主流的还是NOSQL,  那么 NOSQL和关系型数据库的区别是: 优:1.成本:nosql数据库简单易部署,基本都是开源软件,不需要像使用oracle那样花费大量成本购买使用,相比关系型数据库价格便宜. 2.查询速度:nosql数据库将数据存储于缓存之中,关系型数据库将数据存储在硬盘中,自然查