asyncio-mqtt - 异步 MQTT 客户端

1. 项目简介

项目信息
项目地址PyPI在新窗口打开 | GitHub在新窗口打开
官方文档sbtinstruments.github.io在新窗口打开
开源协议BSD-3-Clause
Python 版本Python 3.7 ~ 3.11
标签asyncio

asyncio-mqtt 提供了基于 paho-mqtt在新窗口打开 的现代的、异步的接口。

  • 没有回调
  • 没有更多的返回代码(使用 MqttError 就够了)
  • 优雅的断开连接
  • 与异步代码兼容
  • 完全的类型提示
  • 整个系统只有不到 900 行的代码。

asyncio-mqtt 安装后同样也会安装 paho.mqtt,部分配置可能用到 paho.mqtt 库中的内容。

安装:

pip install asyncio-mqtt

直接安装 GitHub 上的最新版本:

pip install git+https://github.com/sbtinstruments/asyncio-mqtt

Windows 使用注意

从 Python 3.8 开始,asyncio 的默认事件循环是 ProactorEventLoop。该循环不支持 asyncio-mqtt 需要的 add_reader 方法。请切换到一个支持 add_reader 方法的事件循环,比如内置的 SelectorEventLoop

# 修改为 Windows 的 "Selector" 事件循环
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 正常使用即可
asyncio.run(main())

2. 快速开始

发布:

async with Client("test.mosquitto.org") as client:
    await client.publish("humidity/outside", payload=0.38)

订阅:

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)

payload 可以是 intfloatstrbytesbytearrayNone

数字被默认转换为字符串,如果需要数字原始值,可以使用 struct.pack()None 代表零长度负载。

3. 客户端配置

在初始化客户端时,可参考下面的配置,下面的值均为默认值。关于各个参数的更多信息,请看 paho-mqtt 的文档在新窗口打开

import asyncio_mqtt as aiomqtt
import paho.mqtt as mqtt


aiomqtt.Client(
    hostname="test.mosquitto.org",  # 唯一的必须参数
    port=1883,
    username=None,
    password=None,
    logger=None,
    client_id=None,
    tls_context=None,
    tls_params=None,
    proxy=None,
    protocol=None,
    will=None,
    clean_session=None,
    transport="tcp",
    keepalive=60,
    bind_address="",
    bind_port=0,
    clean_start=mqtt.client.MQTT_CLEAN_START_FIRST_ONLY,
    properties=None,
    message_retry_set=20,
    socket_options=(),
    max_concurrent_outgoing_calls=None,
    websocket_path=None,
    websocket_headers=None,
)

4. 最佳实战

4.1 重新连接

import asyncio
import asyncio_mqtt as aiomqtt

async def main():
    reconnect_interval = 5  # In seconds
    while True:
        try:
            async with aiomqtt.Client("test.mosquitto.org") as client:
                async with client.messages() as messages:
                    await client.subscribe("humidity/#")
                    async for message in messages:
                        print(message.payload.decode())
        except aiomqtt.MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
            await asyncio.sleep(reconnect_interval)

asyncio.run(main())

4.2 取消任务

3.11+ Python 3.11 新增了 asyncio.timeout,可用监听一段时间的消息。

import asyncio
import asyncio_mqtt as aiomqtt

async def listen():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            await client.subscribe("humidity/#")
            async for message in messages:
                print(message.payload)

async def main():
    try:
        # 5 秒后取消请求
        async with asyncio.timeout(5):
            await listen()
    # 忽略 TimeoutError
    except asyncio.TimeoutError:
        pass

asyncio.run(main())

Python 3.11 之前的版本可用 task.cancel 来取消:

import asyncio
import asyncio_mqtt as aiomqtt

async def listen():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            await client.subscribe("humidity/#")
            async for message in messages:
                print(message.payload)

async def main():
    loop = asyncio.get_event_loop()
    # 创建任务
    task = loop.create_task(listen())
    # 等待特定的任务
    await asyncio.sleep(5)
    # 取消任务
    task.cancel()
    # 等待任务取消
    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

4.3 分享客户端

import asyncio
import asyncio_mqtt as aiomqtt

async def publish_humidity(client):
    await client.publish("humidity/outside", payload=0.38)

async def publish_temperature(client):
    await client.publish("temperature/outside", payload=28.3)

async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        await publish_humidity(client)
        await publish_temperature(client)

asyncio.run(main())

4.4 过滤请求

import asyncio
import asyncio_mqtt as aiomqtt

async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            await client.subscribe("#")
            async for message in messages:
                if message.topic.matches("humidity/outside"):
                    print(f"[humidity/outside] {message.payload}")
                if message.topic.matches("+/inside"):
                    print(f"[+/inside] {message.payload}")
                if message.topic.matches("temperature/#"):
                    print(f"[temperature/#] {message.payload}")

asyncio.run(main())

4.5 不阻塞运行

使用 asyncio.TaskGroup(或 asyncio.gather,要求 Python < 3.11)来运行多个任务:

import asyncio
import asyncio_mqtt as aiomqtt

async def sleep(seconds):
    await asyncio.sleep(seconds)
    print(f"Slept for {seconds} seconds!")

async def listen():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            await client.subscribe("humidity/#")
            async for message in messages:
                print(message.payload)

async def main():
    async with asyncio.TaskGroup() as group:
        group.create_task(sleep(2))
        group.create_task(listen())  # 此处开始监听
        group.create_task(sleep(3))
        group.create_task(sleep(1))

asyncio.run(main())

如果需要和其他异步 Web 框架等结合,我们有下面的方案:

import asyncio
import asyncio_mqtt as aiomqtt

async def listen():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages() as messages:
            await client.subscribe("humidity/#")
            async for message in messages:
                print(message.payload)

background_tasks = set()

async def main():
    loop = asyncio.get_event_loop()
    # 创建任务
    task = loop.create_task(listen())
    # 创建引用以避免被垃圾回收
    background_tasks.add(task)
    task.add_done_callback(background_tasks.remove)

    # 持续运行别的任务
    while True:
        await asyncio.sleep(2)

asyncio.run(main())

4.6 TLS

import asyncio
import asyncio_mqtt as aiomqtt
import ssl

tls_params = aiomqtt.TLSParameters(
    ca_certs=None,
    certfile=None,
    keyfile=None,
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLS,
    ciphers=None,
)

async def main():
    async with aiomqtt.Client("test.mosquitto.org", tls_params=tls_params) as client:
        await client.publish("humidity/outside", payload=0.38)

asyncio.run(main())

4.7 代理

import asyncio
import asyncio_mqtt as aiomqtt
import socks

proxy_params = aiomqtt.ProxySettings(
    proxy_type=socks.HTTP,
    proxy_addr="www.example.com",
    proxy_rdns=True,
    proxy_username=None,
    proxy_password=None,
)

async def main():
    async with aiomqtt.Client("test.mosquitto.org", proxy=proxy_params) as client:
        await client.publish("humidity/outside", payload=0.38)

asyncio.run(main())