centos7
mysql: MySQL5.7 安装配置
logstash:版本logstash-7.1.1下载
ES: 安装配置
logstash-input-jdbc 方式,该方式只支持新增和更新数据,不支持删除操作

准备

安装 logstash-input-jdbc

[root@localhost logstash-7.1.1]# bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful

下载mysql-connector-java-5.1.46.zip 并解压
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip

[root@localhost logstash-7.1.1]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
--2019-05-30 19:48:30--  https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
Resolving cdn.mysql.com (cdn.mysql.com)... 104.127.195.16
Connecting to cdn.mysql.com (cdn.mysql.com)|104.127.195.16|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4792694 (4.6M) [application/zip]
Saving to: ‘mysql-connector-java-5.1.46.zip’
100%[================================================================================================================================================>] 4,792,694    963KB/s   in 5.2s   

2019-05-30 19:48:37 (902 KB/s) - ‘mysql-connector-java-5.1.46.zip’ saved [4792694/4792694]
[root@localhost logstash-7.1.1]# unzip mysql-connector-java-5.1.46.zip 
[root@localhost logstash-7.1.1]# ls
bin     CONTRIBUTORS  Gemfile       lib          logstash-core             modules                      mysql-connector-java-5.1.46.zip  tools   x-pack
config  data          Gemfile.lock  LICENSE.txt  logstash-core-plugin-api  mysql-connector-java-5.1.46  NOTICE.TXT                       vendor

在数据库中准备数据表(测试表table1)

INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (1, '测试123');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (2, '测试234');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (3, '测试345');
INSERT INTO `test_datax_1`.`table1`(`id`, `name`) VALUES (4, '测试4');

在logstash/config 目录中创建jdbc.sql 后面会用

内容
select * from table1

配置config

  • 进入logstash/config 复制一份logstash-sample.conf

    cp logstash-sample.conf mylogstash.conf
  • 编辑

    # Sample Logstash configuration for creating a simple
    # Beats -> Logstash -> Elasticsearch pipeline.
    
    input {
    jdbc{
      # mysql 数据库链接
      # jdbc_connection_string => "jdbc:mysql:192.168.177.128:3306/test_datax_1?characterEncoding=utf8"
      jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/test_datax_1?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root123132*"
      #驱动
      jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone =>"Asia/Shanghai"
      # mysql文件, 也可以直接写SQL语句在此处,如下:
      # statement => "select * from t_order where update_time >= :sql_last_value;"
      statement_filepath => "./config/jdbc.sql"
      # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
      schedule => "* * * * *"
      type => "jdbc"
      # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
      #record_last_run => true
      # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
      # use_column_value => true
      # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
      # tracking_column => "update_time"
      # tracking_column_type => "timestamp"
      # last_run_metadata_path => "./logstash_capital_bill_last_id"
      # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      # clean_run => false
      # 是否将 字段(column) 名称转小写
      lowercase_column_names => false
    }
    }
    
    output {
     # elasticsearch {
     #   hosts => ["http://localhost:9200"]
     #   index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
     #   #user => "elastic"
     #   #password => "changeme"
     # } 
     #
     stdout {
       codec => json_lines
       }   
    }
  • 执行 (此处只打印)

    [root@localhost logstash-7.1.1]# bin/logstash -f config/mylogstash.conf 

    成功后返回结果:

    {"@timestamp":"2019-05-31T04:02:00.280Z","id":1,"name":"测试123","@version":"1","type":"jdbc"}
    {"@timestamp":"2019-05-31T04:02:00.281Z","id":2,"name":"测试234","@version":"1","type":"jdbc"}
    {"@timestamp":"2019-05-31T04:02:00.281Z","id":3,"name":"测试345","@version":"1","type":"jdbc"}
    {"@timestamp":"2019-05-31T04:02:00.281Z","id":4,"name":"测试4","@version":"1","type":"jdbc"}
    [2019-05-30T21:03:00,188][INFO ][logstash.inputs.jdbc     ] (0.006211s) SELECT version()
    [2019-05-30T21:03:00,201][INFO ][logstash.inputs.jdbc     ] (0.010732s) SELECT version()
    [2019-05-30T21:03:00,214][INFO ][logstash.inputs.jdbc     ] (0.008067s) SELECT count(*) AS `count` FROM (select * from table1
    ) AS `t1` LIMIT 1
    [2019-05-30T21:03:00,232][INFO ][logstash.inputs.jdbc     ] (0.006079s) SELECT * FROM (select * from table1
    ) AS `t1` LIMIT 50000 OFFSET 0
    {"@timestamp":"2019-05-31T04:03:00.233Z","id":1,"name":"测试123","@version":"1","type":"jdbc"}
    {"@timestamp":"2019-05-31T04:03:00.234Z","id":2,"name":"测试234","@version":"1","type":"jdbc"}
    {"@timestamp":"2019-05-31T04:03:00.234Z","id":3,"name":"测试345","@version":"1","type":"jdbc"}
    {"@timestamp":"2019-05-31T04:03:00.234Z","id":4,"name":"测试4","@version":"1","type":"jdbc"}
    ^C[2019-05-30T21:03:45,379][ERROR][org.logstash.Logstash    ] org.jruby.exceptions.ThreadKill

添加更新依赖字段,比如新增的时候希望根据数据库数据变化来更新ES中的对应的数据,一般设置update_time作为更新字段,此时也可以直接配置ES的输入

  • 修改配置
input {
  jdbc{
    # mysql 数据库链接
    # jdbc_connection_string => "jdbc:mysql:192.168.177.128:3306/test_datax_1?characterEncoding=utf8"
    jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/test_datax_1?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    # 用户名和密码
    jdbc_user => "root"
    jdbc_password => "admin2019*"
    #驱动
    jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
    # 驱动类名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    jdbc_default_timezone =>"Asia/Shanghai"
    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement => "select * from t_order where update_time >= :sql_last_value;"
    statement_filepath => "./config/jdbc.sql"
    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    type => "jdbc"
    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
     use_column_value => true
    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "update_time"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./logstash_capital_bill_last_id"
    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    # clean_run => false
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}

output {
  elasticsearch {
    hosts => "http://192.168.177.128:9200"
    #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    index => "tabel_test"
    # 将表Id作为ES的主键,防止数据重复,这样会影响效率,双刃剑
    document_id => "%{id}"
    user => "elastic"
    password => "123456"
  }
 # 打印信息,生产环境不需要
 stdout {
     codec => json_lines
     }   
}
  • 修改vim config/jdbc.sql

    select * from table1 WHERE update_time > :sql_last_value

    修改数据库表table1,新增update_time字段

  • 运行

    [root@localhost logstash-7.1.1]# bin/logstash -f config/mylogstash.conf

    修改时间update_time,或者新增数据,可以看到后台会打印下信息

  • 在ES查询,此处使用kibana
    查询:GET /tabel_test/_search

    返回
    {
    "took" : 0,
    "timed_out" : false,
    "_shards" : {
      "total" : 1,
      "successful" : 1,
      "skipped" : 0,
      "failed" : 0
    },
    "hits" : {
      "total" : {
        "value" : 8,
        "relation" : "eq"
      },
      "max_score" : 1.0,
      "hits" : [
        {
          "_index" : "tabel_test",
          "_type" : "_doc",
          "_id" : "1",
          "_score" : 1.0,
          "_source" : {
            "update_time" : "2019-05-31T14:00:00.000Z",
            "@version" : "1",
            "@timestamp" : "2019-05-31T07:32:00.891Z",
            "name" : "修改测试1",
            "id" : 1,
            "type" : "jdbc"
          }
        },
    .....

    到此结束

reference

Elasticsearch - Logstash实现mysql同步数据到elasticsearch

最后修改:2020 年 02 月 13 日
如果觉得我的文章对你有用,请随意赞赏