将kafka的数据导入至ElasticSearch

环境:ElasticSearch 1.4.4, elasticsearch-river-kafka-1.2.1-plugin, kafka 0.8.1

安装ElasticSearch的kafka插件
.bin/plugin -install kafka-river -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.2.1/elasticsearch-river-kafka-1.2.1-plugin.zip

增加元数据
curl -XPUT ‘localhost:9200/_river/kafka-river/_meta’ -d ‘
{
“type” : “kafka”,
“kafka” : {
“zookeeper.connect” : “xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181”,
“zookeeper.connection.timeout.ms” : 10000,
“topic” : “flume-topic1”,
“message.type” : “json”
},
“index” : {
“index” : “kafka-index”,
“type” : “status”,
“bulk.size” : 3,
“concurrent.requests” : 1,
“action.type” : “index”,
“flush.interval” : “12h”
}
}’

重启ElasticSearch的服务

查看元数据状态
curl -XGET ‘http://localhost:9200/_river/kafka-river/_search?pretty’
curl -XGET ‘http://localhost:9200/_river/kafka-index/_search?pretty’
curl -XDELETE ‘localhost:9200/_river/kafka-river/’

在kafka生成json数据
bin/kafka-console-producer.sh –topic flume-topic1 –broker-list xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092
{“id”:”123″, “name”:”hq”}
{“id”:”123″, “name”:”hq”}
{“id”:”123″, “name”:”hq”}
{“id”:”123″, “name”:”hq”}

查看最终数据
curl -XGET ‘http://localhost:9200/kafka-index/_search?pretty’

作者: hqiang1984

量化自我,极简主义