spark streaming读取kafka上的protobuf格式的数据

1. 通过proto文件生成java文件夹

vi test1.proto

syntax = “proto2”;
package example;

message Hello{
required string name = 1;
required int32 id = 2;
}

生成Test1.java
protoc –java_out=pbdir test1.proto

 

2. 将Test1.java拷贝到src/main/java/example目录下

 

3. 通过spark streaming读取kafka上的pb数据
import Test1._

createKafkaStream(ssc, pb_topic, kafkaParams1).map(r => r._2).map(r => {val p = Hello.parseFrom(r.getBytes); p.getId + “\\t” + p.getName})

spark kafka.common.ConsumerRebalanceFailedException

方法1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10

 

方法2. 在spark读取kafka的代码修改

val kafkaParams = Map(
“zookeeper.connect” -> zkQuorum,
“group.id” -> “default”,
“auto.offset.reset” -> “largest”,
“zookeeper.session.timeout.ms” -> “6000”,
“zookeeper.connection.timeout.ms” -> “6000”,
“zookeeper.sync.time.ms” -> “2000”,
“rebalance.backoff.ms” -> “10000”,
“rebalance.max.retries” -> “20”
)

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic, StorageLevel.MEMORY_ONLY_SER).map(_._2)

spark处理广告数据中搜索词与bidword的大小写全角繁体问题

环境:spark 1.4.1, jpinyin-1.1.3.jar

spark-shell –executor-memory 10G –total-executor-cores 10 –jars ~/huangqiang/jpinyin-1.1.3.jar

import com.github.stuxuhai.jpinyin.ChineseHelper

val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)

val hql1 = hiveCtx.sql(“select logdate, query, custid, groupid, bidword, quality, price from search_join_log where logdate <=’2016-01-07′ and logdate >= ‘2016-01-01′ and adtype=’2′ and adid > 0 and channel in(’16’,’40’,’51’,’52’,’78’,’80’,’72’,’73’,’81’)”)

def qj2Bj(str: String): String = {
var b = str.toCharArray
val c = b.map{i =>
if(i == ‘\\\\u3000’){
“”
}else if(i > ‘\\\\uFF00’ && i < ‘\\\\uFF5F’){
(i – 65248).toChar
}else{
i.toChar
}
}
return c.mkString
}

val norm_search_join_log = hql1.map{r =>
val q_bc = qj2Bj(r(1).toString.toLowerCase())
val b_bc = qj2Bj(r(4).toString.toLowerCase())
val q_sim = ChineseHelper.convertToSimplifiedChinese(q_bc)
val b_sim = ChineseHelper.convertToSimplifiedChinese(b_bc)
(r(0).toString, q_sim, r(2).toString, r(3).toString, b_sim, r(5).toString, r(6).toString)
}

 

 

Spark standalone 模式控制应用使用的cpu和内存

环境:Spark 1.3.0

由于搭建的spark 是standalone模式,因而应用使用的内存和cpu数应由spark-env.sh的环境变量或应用程序的控制参数spark.executor.memory和spark.cores.max,不然应用将占用所有cpu数并使用其它应用无法获取cpu数,并且spark-submit中的控制参数(total-executor-cores, executor-memory无效。

环境变量:

SPARK_WORKER_MEMORY=”50g”
SPARK_WORKER_CORES=22

应用程序:

new SparkConf().set(“spark.executor.memory”, “5g”).set(“spark.cores.max”, “5”)