vue3中如何使用mqtt数据传输

 更新时间:2024年11月01日 16:15:55   作者:宝拉不想努力了  
这篇文章主要为大家详细介绍了vue3中如何使用mqtt数据传输,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

使用版本

"mqtt": "^5.8.0",

安装指令

npm install mqtt --save
------
yarn add mqtt

配置

connection: {
  protocol: "ws",
  host: "broker.emqx.io",
  port: 8083,
  endpoint: "/mqtt",
  clean: true,
  connectTimeout: 30 * 1000, // ms
  reconnectPeriod: 4000, // ms
  clientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),
  // 随机数 每次不能重复
  username: "emqx_test",
  password: "emqx_test",
},

连接

import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)

client.on('connect', (e) => {
  // 订阅主题
  
})

订阅主题

client.subscribe(topic, { qos: 1 }, (err) => {
  if (!err) {
    console.log('订阅成功')
  } else {
    console.log('消息订阅失败!')
  }
})

消息发布

给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {
  if (!err) {
    console.log('发送成功')
    client.subscribe(topic, { qos: 1 }, (err) => {
      if (!err) {
        console.log('订阅成功')
      } else {
        console.log('消息订阅失败!')
      }
    })
  } else {
    console.log('消息发送失败!')
  }
})

取消订阅

client.unsubscribe(topicList, (error) => {
  console.log('主题为' + topicList + '取消订阅成功', error)
})

断开连接

export function unconnect() {
  client.end()
  client = null
  // Message.warning('服务器已断开连接!')
  console.log('服务器已断开连接!')
}

mqtt封装使用(ts版)

import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';

interface ClientOptions extends IClientOptions {
  clientId: string;
}

interface SubscribeOptions {
  topic: string;
  callback: (topic: string, message: string) => void;
  subscribeOption?: mqtt.IClientSubscribeOptions;
}

interface PublishOptions {
  topic: string;
  message: string;
}

class Mqtt {
  private static instance: Mqtt;
  private client: MqttClient | undefined;
  private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};
  private pendingSubscriptions: SubscribeOptions[] = [];
  private pendingPublications: PublishOptions[] = [];
  private isConnected: boolean = false;

  private constructor(url?: string) {
    if (url) {
      this.connect(url);
    }
  }

  public static getInstance(url?: string): Mqtt {
    if (!Mqtt.instance) {
      Mqtt.instance = new Mqtt(url);
    } else if (url && !Mqtt.instance.client) {
      Mqtt.instance.connect(url);
    }
    return Mqtt.instance;
  }

  private connect(url: string): void {
    console.log(url, clientOptions);
    if (!this.client) {
      this.client = mqtt.connect(url, clientOptions);
      this.client.on('connect', this.onConnect);
      this.client.on('reconnect', this.onReconnect);
      this.client.on('error', this.onError);
      this.client.on('message', this.onMessage);
    }
  }

  public disconnect(): void {
    if (this.client) {
      this.client.end();
      this.client = undefined;
      this.subscribeMembers = {};
      this.isConnected = false;
      console.log(`服务器已断开连接!`);
    }
  }

  public subscribe({ topic, callback }: SubscribeOptions): void {
    if (this.isConnected) {
      this.client?.subscribe(topic, { qos: 1 }, error => {
        if (error) {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);
        } else {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);
        }
      });
      this.subscribeMembers[topic] = callback;
    } else {
      this.pendingSubscriptions.push({ topic, callback });
    }
  }

  public unsubscribe(topic: string): void {
    if (!this.client) {
      return;
    }
    this.client.unsubscribe(topic, error => {
      if (error) {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);
      } else {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);
      }
    });
    this.subscribeMembers[topic] = undefined;
  }

  public publish({ topic, message }: PublishOptions): void {
    if (this.isConnected) {
      this.client?.publish(topic, message, { qos: 1 }, e => {
        if (e) {
          console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);
        }
      });
    } else {
      this.pendingPublications.push({ topic, message });
    }
  }

  private onConnect = (e: any): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);
    this.isConnected = true;
    this.processPendingSubscriptions();
    this.processPendingPublications();
  };

  private onReconnect = (): void => {
    console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);
    this.isConnected = false;
  };

  private onError = (error: Error): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);
    this.isConnected = false;
  };

  private onMessage = (topic: string, message: Buffer): void => {
    // console.log(
    //   `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,
    //   message.toString(),
    // );
    const callback = this.subscribeMembers?.[topic];
    callback?.(topic, message.toString());
  };

  private processPendingSubscriptions(): void {
    while (this.pendingSubscriptions.length > 0) {
      const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
      this.subscribe({ topic, callback, subscribeOption });
    }
  }

  private processPendingPublications(): void {
    while (this.pendingPublications.length > 0) {
      const { topic, message } = this.pendingPublications.shift()!;
      this.publish({ topic, message });
    }
  }
}

const clientOptions: ClientOptions = {
  clean: true,
  connectTimeout: 500,
  protocolVersion: 5,
  rejectUnauthorized: false,
  username: 'admin',
  password: 'Anjian-emqx',
  clientId: `client-${Date.now()}`
};

// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);

注意:

1.环境配置

.env.test
VITE_OTHER_SERVICE_BASE_URL= `{
  "mqtt": "ws://192.168.11.14:8083/mqtt"
}`

2.qos设置 前后端统一为1

以上就是vue3中如何使用mqtt数据传输的详细内容,更多关于vue3 mqtt数据传输的资料请关注脚本之家其它相关文章!

相关文章

  • vue导出word纯前端的实现方式

    vue导出word纯前端的实现方式

    这篇文章主要介绍了vue导出word纯前端的实现方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-04-04
  • vue实现页面渲染时候执行某需求的示例代码

    vue实现页面渲染时候执行某需求的示例代码

    本文主要介绍了vue实现页面渲染时候执行某需求,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-05-05
  • 安装vue-cli的简易过程

    安装vue-cli的简易过程

    安装vue-cli的前提是你已经安装了npm,安装npm你可以直接下载node的安装包进行安装。接下来通过本文给大家介绍安装vue-cli的简易过程,感兴趣的朋友跟随脚本之家小编一起学习吧
    2018-05-05
  • 基于vue+ bootstrap实现图片上传图片展示功能

    基于vue+ bootstrap实现图片上传图片展示功能

    这篇文章主要介绍了基于vue+ bootstrap实现图片上传图片展示功能,需要的朋友可以参考下
    2017-05-05
  • 关于vue-router的beforeEach无限循环的问题解决

    关于vue-router的beforeEach无限循环的问题解决

    本篇文章主要介绍了关于vue-router的beforeEach无限循环的问题解决,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-09-09
  • vue中如何解除数据之间的双向绑定

    vue中如何解除数据之间的双向绑定

    这篇文章主要介绍了vue中如何解除数据之间的双向绑定,具有很好的参考价值,希望对
    2022-09-09
  • Vite多环境配置项目高定制化能力详解

    Vite多环境配置项目高定制化能力详解

    这篇文章主要为大家介绍了Vite多环境配置项目高定制化能力详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07
  • vue实现虚拟滚动的示例详解

    vue实现虚拟滚动的示例详解

    虚拟滚动或者移动是指禁止原生滚动,之后通过监听浏览器的相关事件实现模拟滚动,下面小编就来和大家详细介绍一下vue实现虚拟滚动的示例代码,需要的可以参考下
    2023-10-10
  • element-ui中使用upload组件获取上传文件信息

    element-ui中使用upload组件获取上传文件信息

    这篇文章主要介绍了element-ui中使用upload组件获取上传文件信息方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Vue3 中的  shallowReactive 详解

    Vue3 中的  shallowReactive 详解

    本文深入探讨了Vue3中新特性shallowReactive的使用和原理,shallowReactive是创建浅响应式对象的API,只对对象的第一层属性进行响应式转换,不对嵌套对象处理,有助于性能优化和状态管理,同时也指出了shallowReactive在使用中的注意事项
    2024-10-10

最新评论