import { Injectable } from '@angular/core';
import { Paho } from 'ng2-mqtt/mqttws31';
import { Observable, ReplaySubject } from 'rxjs';
export class TopicDefinition {
name: string;
option?: any;
}
@Injectable({
providedIn: 'root'
})
export class MqttService {
private client: Paho.MQTT.Client;
private topics: string[];
private payload: ReplaySubject<Paho.MQTT.Message>;
private connection: ReplaySubject<boolean>;
public status: boolean;
private config: any;
constructor() {
this.topics = [];
this.payload = new ReplaySubject<Paho.MQTT.Message>(1);
this.connection = new ReplaySubject<boolean>(1);
}
//constructor(host: string, clientId: string);
//constructor(host: string, port: number, clientId: string);
//constructor(host: string, port: number, path: string, clientId: string);
public initMqttClient(host: string, port: number, clientID?: string): Promise<boolean> {
clientID = clientID ? clientID : this.generateUUID();
//创建客户端实例
this.client = new Paho.MQTT.Client(host, port, '', clientID);
// 建立連線,连接成功返回true,否则返回false
return new Promise((resolve, reject) => {
// 建立連線
this.client.connect({
cleanSession: false,
onSuccess: () => {
console.log(
'%c ?? MQTT Connection Success ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
this.status = true;
this.connection.next(true);
resolve(true);
}
});
// 无法连接或断线时触发
this.client.onConnectionLost = async (responseObject: object) => {
console.log(
'%c ? MQTT Connection Lost ',
'background: #000; color: ##E43935; line-height: 26px;'
);
this.status = false;
this.connection.next(false);
await this.reonnect();
reject(false);
};
// 当接收到订阅訊息时触发
this.client.onMessageArrived = this.onMessageArrived.bind(this);
});
}
/**
* MQTT的Client Getter
*
* @method public
* @return 回传MQTT的Client
*/
public get mqttClient(): Paho.MQTT.Client {
return this.client;
}
/**
* 監聽MQTT連線是否斷線
*
* @method public
* @return 回傳一個Observable,讓呼叫者可以監聽MQTT連線是否斷線
*/
public listenConnection(): Observable<boolean> {
return this.connection.asObservable();
}
/**
* MQTT重新連縣
*
* @method public
*/
public reonnect(): Promise<boolean> {
return new Promise<boolean>((resolve, reject) => {
this.client.connect({
cleanSession: false,
onSuccess: () => {
console.log(
'%c ?? MQTT Connection Success ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
this.status = true;
this.connection.next(true);
resolve(true);
}
});
});
}
/**
* 將MQTT斷線
*
* @method public
*/
public disconnect(): void {
this.client.disconnect();
}
/**
* ---------------------------------------------------------------------------
* @NOTE MQTT訂閱Topic
* ---------------------------------------------------------------------------
*/
/**
* 訂閱MQTT的Topic
*
* @method public
* @param topic 訂閱的Topics
*/
public subscribeTopic(topics: TopicDefinition[]): void {
console.log('Subscribe topic:', topics);
console.log(
'%c ?? Subscribe MQTT Topics ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
if (this.topics.length === 0) {
this.topics = [];
}
topics.forEach(topic => {
this.topics.push(topic.name);
this.client.subscribe(topic.name, topic.option);
});
}
/**
* 取消訂閱所有的Topics
*
* @method public
*/
public unsubscribeAllTopics(): void {
console.log(
'%c ?? Unsubscribe MQTT All Topics ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
if (this.topics) {
this.topics.forEach(topic => {
this.client.unsubscribe(topic, null);
});
this.topics = [];
}
}
/**
* 取消訂閱MQTT的特定Topic
*
* @method public
* @param topic MQTT的特定Topic
*/
public unsubscribeTopic(topic: string): void {
console.log(
`%c ?? Unsubscribe MQTT Topics: ${topic}`,
'background: #000; color: #5FBA7D; line-height: 26px;'
);
const topicIndex = this.topics.indexOf(topic);
this.topics.splice(topicIndex, 1);
this.client.unsubscribe(topic, null);
}
/**
* 當收到MQTT訂閱的訊息
*
* @method private
* @param payload 訂閱的訊息
*/
private onMessageArrived(payload: Paho.MQTT.Message): void {
// console.log(
// '%c ?? MQTT Message Arrvived ',
// 'background: #000; color: #5FBA7D; line-height: 26px;'
// );
const payloads: Paho.MQTT.Message = {
destinationName: payload.destinationName,
payloadBytes: payload.payloadBytes,
payloadString: this.byteToString(payload.payloadBytes),
duplicate: payload.duplicate,
retained: payload.retained,
qos: payload.qos,
};
this.payload.next(payloads);
}
/**
* 監聽MQTT訂閱的訊息
*
* @method public
* @return 回傳一個Observable,讓呼叫者可以訂閱MQTT的訊息
*/
public listenMessage(): Observable<Paho.MQTT.Message> {
return this.payload.asObservable();
}
/**
* 將MQTT Payload Bytes轉成String
*
* @method private
* @param bytes MQTT Payload的Bytes
* @return 回傳轉換後的Payload
*/
private decodePayloads(bytes: any): string {
const result = String.fromCharCode(...bytes);
return result;
}
/**
* ---------------------------------------------------------------------------
* @NOTE 送出訊息
* ---------------------------------------------------------------------------
*/
/**
* Publish至MQTT
*
* @method public
* @param topic 目的地Topic
* @param value 要送出的數據
*/
public send(topic: string, value: any): void {
const payloads = new Paho.MQTT.Message(value);
payloads.destinationName = topic;
this.client.send(payloads);
}
/**
* ---------------------------------------------------------------------------
* @NOTE UUID
* ---------------------------------------------------------------------------
*/
/**
* 產生UUID
*
* @method public
* @return 回傳UUID
*/
public generateUUID(): string {
return this.generateS4() + this.generateS4() + '-' + this.generateS4() + '-'
+ this.generateS4() + '-' + this.generateS4() + '-' + this.generateS4() +
this.generateS4() + this.generateS4();
}
/**
* 产生16位随机码
* @return 回傳隨機碼
*/
private generateS4(): string {
return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);
}
byteToString(arr): string {
if (typeof arr === 'string') {
return arr;
}
let str = '';
// tslint:disable-next-line:variable-name
const _arr = arr;
for (let i = 0; i < _arr.length; i++) {
// tslint:disable-next-line:one-variable-per-declaration
const one = _arr[i].toString(2),
v = one.match(/^1+?(?=0)/);
if (v && one.length === 8) {
const bytesLength = v[0].length;
let store = _arr[i].toString(2).slice(7 - bytesLength);
for (let st = 1; st < bytesLength; st++) {
store += _arr[st + i].toString(2).slice(2);
}
str += String.fromCharCode(parseInt(store, 2));
i += bytesLength - 1;
} else {
str += String.fromCharCode(_arr[i]);
}
}
return str;
}
InitMessage(msg): { evt_tp: any, value: any } {
try {
const rawMsg = JSON.parse(msg.payloadString);
// console.log(rawMsg.evt_data.value);
return { evt_tp: rawMsg.evt_tp, value: rawMsg.evt_data.value };
} catch (e) {
console.log(e);
}
}
InitMessageforlist(msg): { linename: any,stationname:any, value: any } {
try {
const rawMsg = JSON.parse(msg.payloadString);
return { linename: rawMsg.evt_data['line'],stationname:rawMsg.evt_data['station'], value: rawMsg.evt_data.value[0] };
} catch (e) {
console.log(e);
}
}
}
MQTT services
最后编辑于 :
?著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 问题 早上使用SC命令部署Service,启动服务一直报The **** service on Local Com...
- composer 自己的 git 仓库出现问题: composer update "hdll/services:d...
- 协议就是通信双方的一个约定,即,表示第1位传输的什么、第2位传输的什么……。在MQTT协议中,一个MQTT数据包由...
- 如果不了解MQTT的可以看这篇文章http://www.cnblogs.com/yangfengwu/p/7764...
- 在项目开发过程中,由于需要数据的实时推送,但是在实际过程中,用到了MQtt的方法,搜遍网上 那个的讲解,...