spark streaming读取kafka上的protobuf格式的数据

1. 通过proto文件生成java文件夹

vi test1.proto

syntax = “proto2”;
package example;

message Hello{
required string name = 1;
required int32 id = 2;
}

生成Test1.java
protoc –java_out=pbdir test1.proto

 

2. 将Test1.java拷贝到src/main/java/example目录下

 

3. 通过spark streaming读取kafka上的pb数据
import Test1._

createKafkaStream(ssc, pb_topic, kafkaParams1).map(r => r._2).map(r => {val p = Hello.parseFrom(r.getBytes); p.getId + “\\t” + p.getName})

sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

在build.sbt中添加”org.apache.httpcomponents” %% “httpclient” % “4.5.2”

编译的时候出现报错:

sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

[error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

 

在stackoverflow找到一个解决方法

Change the first %% to a single %. The double character version is for fetching cross-built libraries, and yours isn’t.

去掉一个%,修改如下:

“org.apache.httpcomponents” % “httpclient” % “4.5.2”

 

spark kafka.common.ConsumerRebalanceFailedException

方法1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10

 

方法2. 在spark读取kafka的代码修改

val kafkaParams = Map(
“zookeeper.connect” -> zkQuorum,
“group.id” -> “default”,
“auto.offset.reset” -> “largest”,
“zookeeper.session.timeout.ms” -> “6000”,
“zookeeper.connection.timeout.ms” -> “6000”,
“zookeeper.sync.time.ms” -> “2000”,
“rebalance.backoff.ms” -> “10000”,
“rebalance.max.retries” -> “20”
)

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic, StorageLevel.MEMORY_ONLY_SER).map(_._2)

hadoop streaming对数据进行排序

环境:  hadoop 1.2, python

有这样的数据,两列,中间是用\\t分隔的,需要按第二列从大到小的顺序排列。

02款雅阁 0.00611111111111
04款奥德赛 0.00813131313131
06ms201 0.000866666666667
06ms201图集 0.00704678362573
06雅阁 0.0145098039216
07常服大衣 0.00915032679739
08年本田思域 0.00111111111111
1.5d弯头 0.0211538461538
1.5匹空调 0.00929292929293
1.5米衣柜设计图 0.01640625

hadoop streaming的python程序写法如下:

hadoop streaming -input datain  -output dataout -mapper cat -reducer cat -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -jobconf stream.num.map.output.key.fieds=2 -jobconf stream.map.output.field.separator=”\\t” -jobconf mapred.text.key.comparator.options=”-k2,2nr”  -jobconf mapred.reduce.tasks=1

其中map, reduce都是操作系统命令cat,org.apache.hadoop.mapred.lib.KeyFieldBasedComparator 来自定义使用key中的部分字段做比较,stream.map.output.field.separator指定map出来的数据按\\t来分隔,stream.num.map.output.key.fieds是指map出来的数据的key/value分隔符在哪,这里将第一列和第二列都作为key部分,mapred.text.key.comparator.options指key中对比的方式, -k2, 2nr是指从第二列到第二列按数值反转排序。

 

Windows 7安装lxml

环境:Windows 7,  python 2.7

需要使用lxml来解析网页, 还得安装VCForPython27, 安装过程中发现一系统的问题:

pip install lxml

easy_install lxml

都有这个报错,是编译时出现的。

Could not find function xmlCheckVersion in library libxml2. Is libxml2 installed ?

最后直接从http://www.lfd.uci.edu/~gohlke/pythonlibs/dp2ng7en/lxml-3.6.4-cp27-cp27m-win_amd64.whl下载

pip install lxml-3.6.4-cp27-cp27m-win_amd64.whl

python打包成exe执行文件

环境:windows 7,  python 2.7

写python文件格式.py的程序需要将其打包成可执行的文件形式,可以使用PyInstaller来打包。

下载PyInstaller-3.1文件,使用打包命令如下:

D:\\program\\PyInstaller-3.1>pyinstaller.py -F ../../qs123/s3test.py –upx-dir upx391w

此命令将其打包成一个可执行文件并进行压缩。

 

参数:

-F 指定打包后只生成一个exe格式的文件

-D –onedir 创建一个目录,包含exe文件,但会依赖很多文件(默认选项)

-c –console, –nowindowed 使用控制台,无界面(默认)

-w –windowed, –noconsole 使用窗口,无控制台

-p 添加搜索路径,让其找到对应的库。

-i 改变生成程序的icon图标

 

 

bash history历史命令查询

环境:logstash-2.4.0, elasticsearch-1.6.1, kafka 0.8

经常需要查看bash历史,而这个文件一般存储一定量的命令,有时需要查看什么时候执行过。因而使用logstash + kafka + elasticsearch来搭建bash历史命令检索系统。

配置文件如下:

logstash.conf

input {
file {
path => “/home/adadmin/.bash_history”
add_field => {“user” => “adadmin”}
}
}
filter {
ruby {
code => “event[‘updatetime’] = event.timestamp.time.localtime.strftime(‘%Y-%m-%d %H:%M:%S.%L’)”
}
}
output {
kafka {
bootstrap_servers => “10.121.93.50:9092,10.121.93.51:9092,10.121.93.53:9092”
topic_id => “bash-history”
}
}

elasticsearch:

curl -XPUT ‘xxx.xxx.xxx.53:9200/_river/kafka-river/_meta’ -d ‘
{
“type” : “kafka”,
“kafka” : {
“zookeeper.connect” : “xxx.xxx.xxx.50:2181,xxx.xxx.xxx.51:2181,xxx.xxx.xxx.53:2181”,
“zookeeper.connection.timeout.ms” : 10000,
“topic” : “bash-history”,
“message.type” : “json”
},
“index” : {
“index” : “kafka-index”,
“type” : “status”,
“bulk.size” : 3,
“concurrent.requests” : 1,
“action.type” : “index”,
“flush.interval” : “12h”
}
}’

 

启动logstash

bin/logstash -f logstash.conf

 

在terminal上执行一些命令,数据就由logstash传到kafka,再传到elasticSearch上,可以在上面直接查看历史命令。

Confluent的schema-registry的使用

git clone https://github.com/confluentinc/schema-registry.git

cd schema-registry
git checkout tags/v2.0.0
mvn clean package -DskipTests

vi config/schema-registry.properties
设置kafkastore.connection.url为zookeeper的连接地址

nohup ./bin/schema-registry-start ./config/schema-registry.properties &

查看schema-registry进程
[adadmin@s11 ~]$ jps
26995 NodeManager
74580 Kafka
61079 SchemaRegistryMain
62615 Jps
126392 Worker
26843 DataNode
118141 QuorumPeerMain

#producer 注意:输入一条数据才enter一次,退出使用ctrl + C
./bin/kafka-avro-console-producer –broker-list 10.121.93.50:9092 –topic test –property value.schema='{“type”:”record”,”name”:”myrecord”,”fields”:[{“name”:”f1″,”type”:”string”}]}’
{“f1”: “value1”}
{“f1”: “value2”}
{“f1”: “value3”}

./bin/kafka-avro-console-consumer –broker-list 10.121.93.50:9092 –topic test-avro –from-beginning

Linux history的详解

经常使用linux内置的history命令查看历史,在当前用户下面有个.bash_history文件用于保存历史命令,另外$HISTSIZE环境变量是保存最大条数。在当前shell环境中,命令放在内存中,退出时将最近$HISTSIZE条命令保存在.bash_history上,也可以使用history -a(从登录起到现在的命令)手工保存,另外history -w将当前命令保存下来。如果想命令立刻保存下来,可以在.bashrc中设置环境变量export PROMPT_COMMAND=’history -a’