spark写入kafka问题

环境:Spark 1.6,  kafka 0.8

Failed to send producer request with correlation id java.io.IOException: Connection reset by peer kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

由于使用spark读取和写入到kafka中,出现以上问题,一直以为是参数性能调整问题,调整不同的参数。

在producer端

producerConf.put(“retry.backoff.ms”, “8000”);
producerConf.put(“message.send.max.retries”, “10”);
producerConf.put(“topic.metadata.refresh.interval.ms”, “0”);
producerConf.put(“fetch.message.max.bytes”, “5252880”)
producerConf.put(“request.required.acks”, “0”)

在broker端 server.properties

message.max.bytes=5252880
replica.fetch.max.bytes=5252880
request.timeout.ms=600000

都无法解决些问题,后来才了解到producer默认的写入的方式是同步,因此问题就是在这一个参数上

producerConf.put(“producer.type”, “async”)

作者: hqiang1984

量化自我,极简主义