不想听我啰嗦的,可以直接看最终配置
写完一步一个脚印实现logstash同步MySQL数据到ES后,想试试多表保存到ES里,原本以为只要多复制几个JDBC加个type就可以搞定,结果悲剧的发现都是问题,在这里记录一下需要注意的问题

需要注意的配置

tracking_column

报错信息 lastmodifiedTime一定要在你查询的结果集里面 tracking_column => "lastmodifiedTime"
[2019-06-03T00:37:00,342][INFO ][logstash.inputs.jdbc     ] (0.004409s) SELECT version()
[2019-06-03T00:37:00,369][INFO ][logstash.inputs.jdbc     ] (0.002446s) SELECT count(*) AS `count` FROM (SELECT id, nickname FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
) AS `t1` LIMIT 1
[2019-06-03T00:37:00,401][INFO ][logstash.inputs.jdbc     ] (0.008020s) SELECT * FROM (SELECT id, nickname FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
) AS `t1` LIMIT 50000 OFFSET 0
[2019-06-03T00:37:00,402][WARN ][logstash.inputs.jdbc     ] tracking_column not found in dataset. {:tracking_column=>"lastmodified_time"}
{ 3362 rufus-scheduler intercepted an error:
  3362   job:
  3362     Rufus::Scheduler::CronJob "* * * * *" {}
  3362   error:
  3362     3362
  3362     TypeError
  3362     no implicit conversion of NilClass into String
  3362       uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:335:in `_parse'
  3362       uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:734:in `parse'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:73:in `set_value'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/jdbc.rb:237:in `execute_statement'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:277:in `execute_query'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:258:in `block in run'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
  3362       org/jruby/RubyKernel.java:1425:in `loop'
  3362       /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
  3362   tz:
  3362     ENV['TZ']: 
  3362     Time.now: 2019-06-03 00:37:00 -0700

**提示: tracking_column not found in dataset. {:tracking_column=>"lastmodified_time"}
在slq里面加入lastmodified_time**

SELECT id, nickname,lastmodified_time FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00' 

输出的index,不支持驼峰的格式,开始设置成sysUser,报错

报错信息
  [2019-06-03T01:55:00,698][ERROR][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"13", :_index=>"sysUser", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x3e36d2a5>], :response=>{"index"=>{"_index"=>"sysUser", "_type"=>"_doc", "_id"=>"13", "status"=>400, "error"=>{"type"=>"invalid_index_name_exception", "reason"=>"Invalid index name [sysUser], must be lowercase", "index_uuid"=>"_na_", "index"=>"sysUser"}}}}

提示:Could not index event to Elasticsearch.

修改index名称后正常

      elasticsearch {
         hosts => "http://192.168.177.128:9200"
         index => "user"
         document_id => "%{id}"
         #document_id => "%{userId}"
         user => "elastic"
         password => "123456"
         }

在打印信息时,两个配置,报错

output {
  if[type] =="sys_user"{
      stdout {
     #打印信息的时候 不同配置的id 不能一样
        id=>"%{id}"
           }
      }
  if[type] == "article"{
      stdout {
      #打印信息的时候 不同配置的id 不能一样
       # id=>"%{id}"
          }
      }
}
改正后,不要忘了将userId和articleId 做为别名放到搜索集里。我本以为的第一次尝试的时候,第二个条件的type索引一直无法导入到ES的错误是这个原因,后来测试发现不是,这个问题莫名其妙的正常了,怀疑是SHELL格式的问题或者是驼峰命名索引导致的

output {
  if[type] =="sys_user"{
      stdout {
       #打印信息的时候 不同配置的id 不能一样
          id=>"%{userId}"
          }
      
      }
  if[type] == "article"{
      stdout {
     #打印信息的时候 不同配置的id 不能一样
     id=>"%{articleId}"
        }
      }
}

最终配置

  • 运行配置文件 myconf.conf

    # Sample Logstash configuration for creating a simple
    # Beats -> Logstash -> Elasticsearch pipeline.
    
    input {
    jdbc{
      # mysql 数据库链接
      jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/vue_article?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "123456*"
      #驱动
      jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone =>"Asia/Shanghai"
      # mysql文件, 也可以直接写SQL语句在此处,如下:
      # statement => "SELECT id as articleId,author,title FROM article"
      statement_filepath => "./config/vue_article.sql"
      # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
      schedule => "* * * * *"
      type => "article"
      # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
      record_last_run => true
      # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
      use_column_value => true
      # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
      tracking_column => "lastmodifiedTime"
      tracking_column_type => "timestamp"
      last_run_metadata_path => "./last_record/logstash_article_last_time"
      # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
      # 是否将 字段(column) 名称转小写
      lowercase_column_names => false
    }
    jdbc{
      # mysql 数据库链接
      jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/vue_authentication?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "123456*"
      #驱动
      jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone =>"Asia/Shanghai"
      # mysql文件, 也可以直接写SQL语句在此处,如下:
      # statement => "SELECT id,nickname FROM sys_user"
      statement_filepath => "./config/vue_authentication_sys_user.sql"
      # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
      schedule => "* * * * *"
      type => "sys_user"
      # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
      record_last_run => true
      # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
      use_column_value => true
      # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键, 
      # lastmodifiedTime一定要在你查询的结果集里面
      tracking_column => "lastmodifiedTime"
      tracking_column_type => "timestamp"
      last_run_metadata_path => "./last_record/logstash_sys_user_last_time"
      # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
      # 是否将 字段(column) 名称转小写
      lowercase_column_names => false
    }
    }
    
    output {
    if[type] =="sys_user"{
        # stdout {
       #  #打印信息的时候 不同配置的id 不能一样
       #  # id=>"%{id}"
        #    id=>"%{userId}"
        #     }
        #
        elasticsearch {
           hosts => "http://192.168.177.128:9200"
           index => "user"
           document_id => "%{id}"
           #document_id => "%{userId}"
           user => "elastic"
           password => "123456"
           }
        }
    if[type] == "article"{
       # stdout {
       #  #打印信息的时候 不同配置的id 不能一样
       #  # id=>"%{id}"
       #      id=>"%{articleId}"
       #     }
       #
       elasticsearch {
          hosts => "http://192.168.177.128:9200"
          index => "article"
          document_id => "%{id}"
      # 生产最好不同配置的id 
          #document_id => "%{articleId}"
          user => "elastic"
          password => "123456"
          } 
        }
    }
    
  • vue_authentication_sys_user.sql
SELECT id,avatar,avatar_large as avatarLarge, create_date as createDate ,lastmodified_time as lastmodifiedTime, nickname,username,role_id as roleId,city,cover,gender,motto,rank FROM sys_user WHERE lastmodified_time >= :sql_last_value  ORDER BY lastmodified_time
  • vue_article.sql

    SELECT id, author,author_id as authorId, avatar,coment_times as comentTimes, content, create_date as createDate, des,image_url as imageUrl, lastmodified_time as lastmodifiedTime, likes,read_times as readTimes, status, tag, tag_id as tagId, title, user_id as userId FROM article WHERE lastmodified_time >= :sql_last_value ORDER BY lastmodified_time
    补充说明
    1、这两个sql为什么需要用ORDER BY lastmodified_time,如果有1000条数据,logstash按照时间,取100条,最后一条的时间是剩下900条时间最小的么;我原本以为是,事实上不是;问了一下DB,自行了解myisam和innodb的区别;我们的数据库主要是innodb的,一般索引默认是id,正常情况下创建时间create_date会跟id排序一致,但是更新数据的时间lastmodified_time就不一定了,所以这里需要进行ORDER BY;
    2、 >= 为什么要用大于等于,如果说有1000条,101条数据时间一致,取100条就会导致你丢掉一条数据,所以需要大于等于,这样会产生同样时间的数据即使没有更新,还是会导入ES;所以 设置jdbc_page_size的大小取决于你的业务
2019年6月15日00:40:38
在部署另外一套数据同步时,发现一个大坑;之前说的莫名其妙的问题又正常了,不是index的原因;
打印日志的时候,发现怎么都进不了第二个sys_user,排查发现数据库里面有个type字段,logstash通过正则表达式来过滤,所以不会进入这个条件判断;为了验证,我新增了一个条件,是数据库type值,发现确实打印了该条信息....无语,这里可以通过查询全字段使用别名的方式解决,如果牵扯业务不多的话可以考虑修改数据字段
最后修改:2020 年 02 月 13 日
如果觉得我的文章对你有用,请随意赞赏