spark mllib训练模型后如何做在线预测

最近在想一个问题,使用spark mllib训练后的模型如何做一个在线预测的服务? 毕竟mllib只提供离线的训练和预测。想到大概有四种方法:

1. 使用spark streaming + kafka

直接使用spark streaming加载训练好的模型,然后通过从kafka上读取特征来预测数据,并将预测结果写回kafka中供客户端获取到。

2. spark + grpc

通过将spark mllib中的predict函数做成rpc service的形式,具体参考:

https://scalapb.github.io/grpc.html

3. spark + spray

通过spark mllib中的predict函数做restful的形式来预测。

4. spark + python flask

通过python调用spark mllib训练好的模型,借助flash提供接口

5. spark + python grpc

通过python grpc调用spark mllib训练好的模型,提供其它语言rpc接口

 

 

目前方法1, 4, 5是可行, 方法2,3并没有实际实现的经验,还需再探索。

python调用tensorflow出错

jerry@ubuntu:~/serving$ python
Python 2.7.6 (default, Mar 22 2014, 22:59:56)
[GCC 4.8.2] on linux2
Type “help”, “copyright”, “credits” or “license” for more information.
>>> import tensorflow
RuntimeError: module compiled against API version 0xa but this version of numpy is 0x9
Traceback (most recent call last):
File “<stdin>”, line 1, in <module>
File “/usr/local/lib/python2.7/dist-packages/tensorflow/__init__.py”, line 23, in <module>
from tensorflow.python import *
File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/__init__.py”, line 49, in <module>
from tensorflow.python import pywrap_tensorflow
File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/pywrap_tensorflow.py”, line 28, in <module>
_pywrap_tensorflow = swig_import_helper()
File “/usr/local/lib/python2.7/dist-packages/tensorflow/python/pywrap_tensorflow.py”, line 24, in swig_import_helper
_mod = imp.load_module(‘_pywrap_tensorflow’, fp, pathname, description)
ImportError: numpy.core.multiarray failed to import

 

升级numpy至最新版本

sudo pip install numpy –upgrade

sudo mv  /usr/lib/python2.7/dist-packages/numpy /usr/lib/python2.7/dist-packages/numpy_old

 

查看新的numpy路径

import numpy

numpy.__path__

pip安装包太慢

最近使用pip安装包发现非常之慢,估计是gfw搞的鬼。没办法只好使用国内的镜像。

vi ~/.pip/pip.conf

[global]
index-url = http://pypi.douban.com/simple

或者

[global]
trusted-host = mirrors.aliyun.com
index-url = http://mirrors.aliyun.com/pypi/simple

python日期操作

 

from datetime import datetime,date

dayOfWeek = datetime.now().weekday()
print dayOfWeek

******************************************************************

import time
import datetime

dtime = datetime.datetime.now()
ans_time = time.mktime(dtime.timetuple())
unix_ts = 1439111214.0
time = datetime.datetime.fromtimestamp(unix_ts)

spark streaming调用http get存储数据

环境:spark 1.6, 存储是一个http get的服务

在build.sbt中添加”org.apache.httpcomponents” % “httpclient” % “4.5.2”  ,记得第一个分隔符是%,而不是%%。

经过多次尝试,最终代码如下:

agg_wd_business.foreach(d => {
val httpParams = new BasicHttpParams()

HttpConnectionParams.setConnectionTimeout(httpParams, 50)
HttpConnectionParams.setSoTimeout(httpParams, 50)
val client = new DefaultHttpClient(httpParams)
val request = new HttpGet(“http://xxx.xxx.xxx.xxx:9010/rt?” + URLEncoder.encode(d, “UTF-8”))
request.addHeader(“Connection”, “close”)
try{
val response = client.execute(request)
val handler = new BasicResponseHandler()
handler.handleResponse(response).trim.toString
}catch{
case ex: SocketTimeoutException => None
case ex: Exception => None
}
})

发送一个http get请求,设置超时,设置为短连接,并不保证请求一定成功。由于生成的数据有30万左右,得调用http get这么次,而nginx搭配的服务并不能快速地响应。

ScalaPB生成scala的protobuf文件

环境: sbt, scala 2.10.4

 

1.

vi project/scalapb.sbt

addSbtPlugin(“com.thesamet” % “sbt-protoc” % “0.99.1”)

libraryDependencies += “com.trueaccord.scalapb” %% “compilerplugin” % “0.5.43”

 

2.

vi build.sbt

PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)

// If you need scalapb/scalapb.proto or anything from google/protobuf/*.proto
//ScalaPB looks for protocol buffer files in src/main/protobuf, but this can be customized. Running the compile command in sbt will both generate Scala sources from your protos and compile them.

libraryDependencies += “com.trueaccord.scalapb” %% “scalapb-runtime” % com.trueaccord.scalapb.compiler.Version.scalapbVersion % “protobuf”

 

3.
mkdir src/main/protobuf

vi src/main/protobuf/hello.proto

syntax = “proto3”;
package example;

message HelloRequest {
string name = 1;
}

sbt assembly
生成的scala文件放在 target/scala-2.10/src_managed/main/example/hello/HelloRequest.scala

4.

使用如下
import hello._

val h = HelloRequest().withName(“hq”)
val hba = h.toByteArray
println(hba) //serialize
println(HelloRequest.parseFrom(hba).name) //unserialize

spark streaming读取kafka上的protobuf格式的数据

1. 通过proto文件生成java文件夹

vi test1.proto

syntax = “proto2”;
package example;

message Hello{
required string name = 1;
required int32 id = 2;
}

生成Test1.java
protoc –java_out=pbdir test1.proto

 

2. 将Test1.java拷贝到src/main/java/example目录下

 

3. 通过spark streaming读取kafka上的pb数据
import Test1._

createKafkaStream(ssc, pb_topic, kafkaParams1).map(r => r._2).map(r => {val p = Hello.parseFrom(r.getBytes); p.getId + “\\t” + p.getName})

sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

在build.sbt中添加”org.apache.httpcomponents” %% “httpclient” % “4.5.2”

编译的时候出现报错:

sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

[error] (*:update) sbt.ResolveException: unresolved dependency: org.apache.httpcomponents#httpclient_2.10;4.5.2: not found

 

在stackoverflow找到一个解决方法

Change the first %% to a single %. The double character version is for fetching cross-built libraries, and yours isn’t.

去掉一个%,修改如下:

“org.apache.httpcomponents” % “httpclient” % “4.5.2”

 

spark kafka.common.ConsumerRebalanceFailedException

方法1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10

 

方法2. 在spark读取kafka的代码修改

val kafkaParams = Map(
“zookeeper.connect” -> zkQuorum,
“group.id” -> “default”,
“auto.offset.reset” -> “largest”,
“zookeeper.session.timeout.ms” -> “6000”,
“zookeeper.connection.timeout.ms” -> “6000”,
“zookeeper.sync.time.ms” -> “2000”,
“rebalance.backoff.ms” -> “10000”,
“rebalance.max.retries” -> “20”
)

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic, StorageLevel.MEMORY_ONLY_SER).map(_._2)