Skip to content

Service Push

业务服务(例如计费的"发票已支付")触发 MQTT publish。演示 app.publish() 怎么接到正常请求/响应之外的服务事件流上。

bash
bun run --cwd examples/service-push dev

源码

ts
import { aedes } from '@mqttkit/aedes'
import mqtt from 'mqtt'
import { MqttApp, router } from '@mqttkit/core'

class BillingService {
  private handler?: (event: { uid: string; payload: Buffer }) => Promise<void>

  onInvoicePaid(handler: (event: { uid: string; payload: Buffer }) => Promise<void>): void {
    this.handler = handler
  }

  async simulateInvoicePaid(uid: string): Promise<void> {
    await this.handler?.({
      uid,
      payload: Buffer.from(JSON.stringify({
        type: 'invoice.paid',
        uid,
        paidAt: new Date().toISOString(),
      })),
    })
  }
}

const billing = new BillingService()

const app = new MqttApp()
  .use(aedes({ tcp: { port: 1886 } }))
  .use(
    router().topic('users/:uid/notifications', {
      subscribe: true,
      publish: false,
    }),
  )

billing.onInvoicePaid(async (event) => {
  await app.publish(`users/${event.uid}/notifications`, event.payload, { qos: 1 })
  console.log('service -> mqtt', {
    topic: `users/${event.uid}/notifications`,
    payload: event.payload.toString(),
  })
})

await app.listen()
console.log('mqttkit service push example listening on mqtt://localhost:1886')

const demoClient = mqtt.connect('mqtt://localhost:1886', {
  clientId: 'demo-notification-client',
  reconnectPeriod: 0,
})

demoClient.on('connect', () => {
  demoClient.subscribe('users/demo/notifications', { qos: 1 })
})

demoClient.on('message', (topic, payload) => {
  console.log('mqtt client received', {
    topic,
    payload: payload.toString(),
  })
})

setInterval(() => {
  void billing.simulateInvoicePaid('demo')
}, 3_000)

在 GitHub 查看 · Service Push 指南

基于 MIT 协议发布