不想听我啰嗦的,可以直接看最终配置
写完一步一个脚印实现logstash同步MySQL数据到ES后,想试试多表保存到ES里,原本以为只要多复制几个JDBC加个type就可以搞定,结果悲剧的发现都是问题,在这里记录一下需要注意的问题
需要注意的配置
tracking_column
报错信息 lastmodifiedTime一定要在你查询的结果集里面 tracking_column => "lastmodifiedTime"
[2019-06-03T00:37:00,342][INFO ][logstash.inputs.jdbc ] (0.004409s) SELECT version()
[2019-06-03T00:37:00,369][INFO ][logstash.inputs.jdbc ] (0.002446s) SELECT count(*) AS `count` FROM (SELECT id, nickname FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
) AS `t1` LIMIT 1
[2019-06-03T00:37:00,401][INFO ][logstash.inputs.jdbc ] (0.008020s) SELECT * FROM (SELECT id, nickname FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
) AS `t1` LIMIT 50000 OFFSET 0
[2019-06-03T00:37:00,402][WARN ][logstash.inputs.jdbc ] tracking_column not found in dataset. {:tracking_column=>"lastmodified_time"}
{ 3362 rufus-scheduler intercepted an error:
3362 job:
3362 Rufus::Scheduler::CronJob "* * * * *" {}
3362 error:
3362 3362
3362 TypeError
3362 no implicit conversion of NilClass into String
3362 uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date/format.rb:335:in `_parse'
3362 uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/date.rb:734:in `parse'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/value_tracking.rb:73:in `set_value'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/jdbc.rb:237:in `execute_statement'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:277:in `execute_query'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:258:in `block in run'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
3362 org/jruby/RubyKernel.java:1425:in `loop'
3362 /home/elastic/logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
3362 tz:
3362 ENV['TZ']:
3362 Time.now: 2019-06-03 00:37:00 -0700
**提示: tracking_column not found in dataset. {:tracking_column=>"lastmodified_time"}
在slq里面加入lastmodified_time**
SELECT id, nickname,lastmodified_time FROM sys_user WHERE lastmodified_time > '1970-01-01 08:00:00'
输出的index,不支持驼峰的格式,开始设置成sysUser,报错
报错信息
[2019-06-03T01:55:00,698][ERROR][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"13", :_index=>"sysUser", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x3e36d2a5>], :response=>{"index"=>{"_index"=>"sysUser", "_type"=>"_doc", "_id"=>"13", "status"=>400, "error"=>{"type"=>"invalid_index_name_exception", "reason"=>"Invalid index name [sysUser], must be lowercase", "index_uuid"=>"_na_", "index"=>"sysUser"}}}}
提示:Could not index event to Elasticsearch.
修改index名称后正常
elasticsearch {
hosts => "http://192.168.177.128:9200"
index => "user"
document_id => "%{id}"
#document_id => "%{userId}"
user => "elastic"
password => "123456"
}
在打印信息时,两个配置,报错
output {
if[type] =="sys_user"{
stdout {
#打印信息的时候 不同配置的id 不能一样
id=>"%{id}"
}
}
if[type] == "article"{
stdout {
#打印信息的时候 不同配置的id 不能一样
# id=>"%{id}"
}
}
}
改正后,不要忘了将userId和articleId 做为别名放到搜索集里。我本以为的第一次尝试的时候,第二个条件的type索引一直无法导入到ES的错误是这个原因,后来测试发现不是,这个问题莫名其妙的正常了,怀疑是SHELL格式的问题或者是驼峰命名索引导致的
output {
if[type] =="sys_user"{
stdout {
#打印信息的时候 不同配置的id 不能一样
id=>"%{userId}"
}
}
if[type] == "article"{
stdout {
#打印信息的时候 不同配置的id 不能一样
id=>"%{articleId}"
}
}
}
最终配置
运行配置文件 myconf.conf
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { jdbc{ # mysql 数据库链接 jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/vue_article?useUnicode=true&characterEncoding=utf-8&useSSL=false" # 用户名和密码 jdbc_user => "root" jdbc_password => "123456*" #驱动 jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" jdbc_default_timezone =>"Asia/Shanghai" # mysql文件, 也可以直接写SQL语句在此处,如下: # statement => "SELECT id as articleId,author,title FROM article" statement_filepath => "./config/vue_article.sql" # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年) schedule => "* * * * *" type => "article" # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 record_last_run => true # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值. use_column_value => true # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键 tracking_column => "lastmodifiedTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_article_last_time" # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false # 是否将 字段(column) 名称转小写 lowercase_column_names => false } jdbc{ # mysql 数据库链接 jdbc_connection_string => "jdbc:mysql://192.168.177.128:3306/vue_authentication?useUnicode=true&characterEncoding=utf-8&useSSL=false" # 用户名和密码 jdbc_user => "root" jdbc_password => "123456*" #驱动 jdbc_driver_library => "/home/elastic/logstash-7.1.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" jdbc_default_timezone =>"Asia/Shanghai" # mysql文件, 也可以直接写SQL语句在此处,如下: # statement => "SELECT id,nickname FROM sys_user" statement_filepath => "./config/vue_authentication_sys_user.sql" # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年) schedule => "* * * * *" type => "sys_user" # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 record_last_run => true # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值. use_column_value => true # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键, # lastmodifiedTime一定要在你查询的结果集里面 tracking_column => "lastmodifiedTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_sys_user_last_time" # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false # 是否将 字段(column) 名称转小写 lowercase_column_names => false } } output { if[type] =="sys_user"{ # stdout { # #打印信息的时候 不同配置的id 不能一样 # # id=>"%{id}" # id=>"%{userId}" # } # elasticsearch { hosts => "http://192.168.177.128:9200" index => "user" document_id => "%{id}" #document_id => "%{userId}" user => "elastic" password => "123456" } } if[type] == "article"{ # stdout { # #打印信息的时候 不同配置的id 不能一样 # # id=>"%{id}" # id=>"%{articleId}" # } # elasticsearch { hosts => "http://192.168.177.128:9200" index => "article" document_id => "%{id}" # 生产最好不同配置的id #document_id => "%{articleId}" user => "elastic" password => "123456" } } }
- vue_authentication_sys_user.sql
SELECT id,avatar,avatar_large as avatarLarge, create_date as createDate ,lastmodified_time as lastmodifiedTime, nickname,username,role_id as roleId,city,cover,gender,motto,rank FROM sys_user WHERE lastmodified_time >= :sql_last_value ORDER BY lastmodified_time
vue_article.sql
SELECT id, author,author_id as authorId, avatar,coment_times as comentTimes, content, create_date as createDate, des,image_url as imageUrl, lastmodified_time as lastmodifiedTime, likes,read_times as readTimes, status, tag, tag_id as tagId, title, user_id as userId FROM article WHERE lastmodified_time >= :sql_last_value ORDER BY lastmodified_time
补充说明
1、这两个sql为什么需要用ORDER BY lastmodified_time,如果有1000条数据,logstash按照时间,取100条,最后一条的时间是剩下900条时间最小的么;我原本以为是,事实上不是;问了一下DB,自行了解myisam和innodb的区别;我们的数据库主要是innodb的,一般索引默认是id,正常情况下创建时间create_date会跟id排序一致,但是更新数据的时间lastmodified_time就不一定了,所以这里需要进行ORDER BY;
2、 >= 为什么要用大于等于,如果说有1000条,101条数据时间一致,取100条就会导致你丢掉一条数据,所以需要大于等于,这样会产生同样时间的数据即使没有更新,还是会导入ES;所以 设置jdbc_page_size的大小取决于你的业务
2019年6月15日00:40:38
在部署另外一套数据同步时,发现一个大坑;之前说的莫名其妙的问题又正常了,不是index的原因;
打印日志的时候,发现怎么都进不了第二个sys_user,排查发现数据库里面有个type字段,logstash通过正则表达式来过滤,所以不会进入这个条件判断;为了验证,我新增了一个条件,是数据库type值,发现确实打印了该条信息....无语,这里可以通过查询全字段使用别名的方式解决,如果牵扯业务不多的话可以考虑修改数据字段