如何将kafka的数据导入到Elastic

想到有三种方法:

1. logstash

2. kafka-connect-elasticsearch

3. elasticsearch-river-kafka-1.2.1-plugin

方法一:简单,只需启动一个代理程序

方法二:与confluent绑定紧,有些复杂

方法三:代码很久没更新,后续支持比较差

 

logstash使用如下:

input {
kafka {
zk_connect => “kafka:2181”
group_id => “logstash”
topic_id => “apache_logs”
consumer_threads => 16
}
}
output {
elasticsearch {
document_id => “%{my_uuid}”
}
}

https://www.elastic.co/blog/just-enough-kafka-for-the-elastic-stack-part2

Spark整合Elastic

环境: spark 1.6, ElasticSearch 1.6.1, elasticsearch-hadoop

通过 elasticsearch-hadoop可以将spark 处理后的数据保存在Elastic上,后续数据的检查和查询非常方便。

https://db-blog.web.cern.ch/blog/prasanth-kothuri/2016-05-integrating-hadoop-and-elasticsearch-%E2%80%93-part-2-%E2%80%93-writing-and-querying

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

https://spark-packages.org/package/elastic/elasticsearch-hadoop

好记忆不如烂笔头

之前解决问题的时候忘了做笔记,后来遇到相同的问题的时候,不记得之前有个解决方案。年纪大了好多东西都得记得做,有时候忘了这忘了呢,以后做这种技术的话最好还是自己做一下笔记,用wordpress

Windows 7安装lxml

环境:Windows 7,  python 2.7

需要使用lxml来解析网页, 还得安装VCForPython27, 安装过程中发现一系统的问题:

pip install lxml

easy_install lxml

都有这个报错,是编译时出现的。

Could not find function xmlCheckVersion in library libxml2. Is libxml2 installed ?

最后直接从http://www.lfd.uci.edu/~gohlke/pythonlibs/dp2ng7en/lxml-3.6.4-cp27-cp27m-win_amd64.whl下载

pip install lxml-3.6.4-cp27-cp27m-win_amd64.whl

bash history历史命令查询

环境:logstash-2.4.0, elasticsearch-1.6.1, kafka 0.8

经常需要查看bash历史,而这个文件一般存储一定量的命令,有时需要查看什么时候执行过。因而使用logstash + kafka + elasticsearch来搭建bash历史命令检索系统。

配置文件如下:

logstash.conf

input {
file {
path => “/home/adadmin/.bash_history”
add_field => {“user” => “adadmin”}
}
}
filter {
ruby {
code => “event[‘updatetime’] = event.timestamp.time.localtime.strftime(‘%Y-%m-%d %H:%M:%S.%L’)”
}
}
output {
kafka {
bootstrap_servers => “10.121.93.50:9092,10.121.93.51:9092,10.121.93.53:9092”
topic_id => “bash-history”
}
}

elasticsearch:

curl -XPUT ‘xxx.xxx.xxx.53:9200/_river/kafka-river/_meta’ -d ‘
{
“type” : “kafka”,
“kafka” : {
“zookeeper.connect” : “xxx.xxx.xxx.50:2181,xxx.xxx.xxx.51:2181,xxx.xxx.xxx.53:2181”,
“zookeeper.connection.timeout.ms” : 10000,
“topic” : “bash-history”,
“message.type” : “json”
},
“index” : {
“index” : “kafka-index”,
“type” : “status”,
“bulk.size” : 3,
“concurrent.requests” : 1,
“action.type” : “index”,
“flush.interval” : “12h”
}
}’

 

启动logstash

bin/logstash -f logstash.conf

 

在terminal上执行一些命令,数据就由logstash传到kafka,再传到elasticSearch上,可以在上面直接查看历史命令。