环境:CentOS 6.3, Kafka 8.1, Flume 1.6, elasticsearch-1.4.4
配置文件如下:
[adadmin@s9 apache-flume-1.6.0-bin]$ vi conf/flume.conf
#define source, sink, channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/adadmin/.bash_history
# Describe the sink
#only test
#a1.sinks.k1.type = logger
#load to Kafka
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.batchSize = 5
#a1.sinks.k1.brokerList = xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092
#a1.sinks.k1.topic = flume_topic1
#load to ElasticSearch
a1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
a1.sinks.k1.hostNames = xxx.xxx.xxx.xxx:9300
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 100
a1.sinks.k1.indexName = logstash
a1.sinks.k1.ttl = 5
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启用Flume agent
[adadmin@s9 apache-flume-1.6.0-bin]$ bin/flume-ng agent -c /home/adadmin/apache-flume-1.6.0-bin/conf -f /home/adadmin/apache-flume-1.6.0-bin/conf/flume.conf -n a1 -Dflume.root.logger=INFO,console
(注:在导入ElasticSearch时需要把此文件的lib导入到flume的库目录下,操作如下:
[adadmin@s9 apache-flume-1.6.0-bin]$ mkdir -p plugins.d/elasticsearch/libext
[adadmin@s9 apache-flume-1.6.0-bin]$cp /home/adadmin/elasticsearch-1.4.4/lib/*.jar plugins.d/elasticsearch/libext
)