由于最近的项目中使用到kafka,项目使用nodejs语言,遇到一些问题
nvm版本: v6.17.1
kafka-node版本:5.0.0
在项目中遇到报错:kafka.KafkaClient is not a constructor
解决办法:由于我之前安装的kafka-node版本较低,在安装kafka-node的高版本时没有覆盖掉导致,将kafka-node低版本完全卸载后重新安装5.0.0的kafka-node版本后该问题解决。
最后,记录一下我使用kafka-node生产者的代码,该代码还在测试中,后面修改会同步:
var kafka = require("kafka-node");
var config = require('appConfig');//我的配置文件
var Client = kafka.KafkaClient;
var Producer = kafka.Producer;
var client = null;
var payloads = null;
exports.sendKafkaMessage = function (topic,message,operation,callback) {
try {
console.log("sendKafkaMessage,host:"+config.kafka_host);
if (this.client == null ) {
this.client = new Client({kafkaHost: config.kafka_host});
log.app.info("连接kafka中");
}
var producer = new Producer(this.client,{ requireAcks: 1 });
var payloads = [{
topic: topic,
messages: JSON.stringify(message),
partition: 0, // default 0
// attributes: 2, // default: 0
}];
console.log("send message:["+JSON.stringify(message)+"],to topic:"+topic);
producer.on('ready', function () {
try {
producer.send(payloads, function (err, data) {
if (err){
console.log("send message:["+JSON.stringify(message)+"] error,topic:["+topic+"],data:"+JSON.stringify(data));
callback(err);
} else {
console.log("send message:["+JSON.stringify(message)+"] success,topic:"+topic+"],data:"+JSON.stringify(data));
// callback(null);
}
producer.close();
})
}catch (e) {
console.log(e);
}
});
producer.on('error', function (err) {
console.log('error:', err);
callback(err);
});
}catch (e) {
console.log(e);
}
callback(null);
}