如何将kafka的数据导入到Elastic

想到有三种方法:

1. logstash

2. kafka-connect-elasticsearch

3. elasticsearch-river-kafka-1.2.1-plugin

方法一:简单,只需启动一个代理程序

方法二:与confluent绑定紧,有些复杂

方法三:代码很久没更新,后续支持比较差

 

logstash使用如下:

input {
kafka {
zk_connect => “kafka:2181”
group_id => “logstash”
topic_id => “apache_logs”
consumer_threads => 16
}
}
output {
elasticsearch {
document_id => “%{my_uuid}”
}
}

https://www.elastic.co/blog/just-enough-kafka-for-the-elastic-stack-part2

spark streaming读取kafka上的protobuf格式的数据

1. 通过proto文件生成java文件夹

vi test1.proto

syntax = “proto2”;
package example;

message Hello{
required string name = 1;
required int32 id = 2;
}

生成Test1.java
protoc –java_out=pbdir test1.proto

 

2. 将Test1.java拷贝到src/main/java/example目录下

 

3. 通过spark streaming读取kafka上的pb数据
import Test1._

createKafkaStream(ssc, pb_topic, kafkaParams1).map(r => r._2).map(r => {val p = Hello.parseFrom(r.getBytes); p.getId + “\\t” + p.getName})

spark kafka.common.ConsumerRebalanceFailedException

方法1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10

 

方法2. 在spark读取kafka的代码修改

val kafkaParams = Map(
“zookeeper.connect” -> zkQuorum,
“group.id” -> “default”,
“auto.offset.reset” -> “largest”,
“zookeeper.session.timeout.ms” -> “6000”,
“zookeeper.connection.timeout.ms” -> “6000”,
“zookeeper.sync.time.ms” -> “2000”,
“rebalance.backoff.ms” -> “10000”,
“rebalance.max.retries” -> “20”
)

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic, StorageLevel.MEMORY_ONLY_SER).map(_._2)