Skip to content

AsyncAPI 独立服务

从注册的路由生成 AsyncAPI 3.0,独立 HTTP 服务上挂渲染好的浏览页面。一份 TypeBox schema 同时承担运行时校验、ctx.body 静态类型、AsyncAPI payload 文档三件事。

bash
bun run --cwd examples/asyncapi-docs dev
  • MQTT: mqtt://localhost:1883
  • AsyncAPI JSON: http://localhost:9000/asyncapi.json
  • AsyncAPI YAML: http://localhost:9000/asyncapi.yaml
  • 渲染文档: http://localhost:9000/docs

源码

ts
/**
 * mqttkit + TypeBox + AsyncAPI 一体化示例。
 *
 * **核心模式**:一份 TypeBox schema 同时承担三件事 ——
 *   1) 运行时校验(`app.addSchemaProvider(typeboxProvider)`)
 *   2) `ctx.body` 静态类型推断(`Static<T>`,零导出)
 *   3) AsyncAPI 文档 payload(TypeBox 本身就是 JSON Schema,无需转换)
 *
 * 这是 mqttkit 推荐的 schema 写法。如果你坚持用 zod 做运行时校验,
 * 需要额外把 schema 转 JSON Schema 才能进入 AsyncAPI 文档;参见
 * examples/schema-validation/src/zod.ts。
 *
 * 启动后:
 *   - MQTT:           mqtt://localhost:1883
 *   - AsyncAPI JSON:  http://localhost:9000/asyncapi.json
 *   - AsyncAPI YAML:  http://localhost:9000/asyncapi.yaml
 *   - 渲染文档:        http://localhost:9000/docs
 */
import { aedes } from '@mqttkit/aedes'
import { asyncapi } from '@mqttkit/asyncapi'
import { MqttApp, router } from '@mqttkit/core'
import { typeboxProvider } from '@mqttkit/typebox'
import { Type } from '@sinclair/typebox'

type Principal = { uid: string }
type State = { principal?: Principal }

const deviceEventSchema = Type.Object(
  {
    temperature: Type.Number({ description: 'Celsius reading' }),
    humidity: Type.Optional(Type.Number()),
    ts: Type.Optional(Type.Integer({ description: 'Unix ms' })),
  },
  { description: 'Device telemetry payload' },
)

const notificationSchema = Type.Object({
  kind: Type.Union([Type.Literal('invoice'), Type.Literal('system'), Type.Literal('chat')]),
  body: Type.String(),
})

const app = new MqttApp<State>()
  .addSchemaProvider(typeboxProvider)
  .use(
    aedes({
      tcp: { port: 1883 },
      authenticate: ({ username }) => (username ? { uid: username } : false),
    }),
  )
  .use(
    router<State>()
      .topic('devices/:uid/events', {
        publish: ({ params, principal }) => params.uid === principal?.uid,
        qos: 1,
        schema: deviceEventSchema,
        async onMessage(ctx) {
          // ctx.body 自动推断为 { temperature: number; humidity?: number; ts?: number }
          console.log(`[device ${ctx.params.uid}] ${ctx.body.temperature}°C`)
          await ctx.publish(`server/${ctx.params.uid}/echo`, ctx.payload, { qos: 0 })
        },
        meta: {
          summary: 'Device telemetry uplink',
          description: 'Device pushes sensor readings. Only the owning principal may publish.',
          tags: ['device', 'telemetry'],
          examples: [{ temperature: 22.5, humidity: 60, ts: Date.now() }],
        },
      })
      .topic('server/:uid/echo', {
        subscribe: ({ params, principal }) => params.uid === principal?.uid,
        qos: 0,
        meta: {
          summary: 'Server echo channel',
          description: 'Server echoes device events back to the owning client.',
          tags: ['device'],
        },
      })
      .topic('users/:uid/notifications', {
        subscribe: ({ params, principal }) => params.uid === principal?.uid,
        publish: false,
        qos: 1,
        retain: true,
        schema: notificationSchema,
        meta: {
          summary: 'User notifications',
          description: 'Server-pushed notifications. Clients subscribe only.',
          tags: ['notifications'],
        },
      }),
  )
  .use(
    asyncapi({
      info: {
        title: 'mqttkit demo',
        version: '0.0.1',
        description: 'AsyncAPI generated from mqttkit router metadata, schemas powered by TypeBox.',
      },
      servers: {
        tcp: { host: 'localhost:1883', protocol: 'mqtt', description: 'Aedes TCP broker' },
      },
      port: 9000,
    }),
  )

await app.listen()
console.log('mqtt://localhost:1883 | docs: http://localhost:9000/docs')
ts
/**
 * **zod + AsyncAPI 集成示例。**
 *
 * zod 3.24+ 是原生 Standard Schema,运行时校验零配置;要让 schema 进入
 * AsyncAPI 文档,用 `jsonify(...)` 包一层 —— 它在 schema 上挂 `~jsonSchema`,
 * asyncapi builder 会优先读它。
 *
 * 对比 src/index.ts (typebox 版本),可以看到两种方案的对称性:
 *
 *   typebox: `app.addSchemaProvider(typeboxProvider)` + `schema: Type.Object(...)`
 *   zod:     `schema: jsonify(z.object(...))`
 *
 * 启动后:
 *   - MQTT:           mqtt://localhost:1885
 *   - AsyncAPI JSON:  http://localhost:9002/asyncapi.json
 *   - 渲染文档:        http://localhost:9002/docs
 */
import { aedes } from '@mqttkit/aedes'
import { asyncapi } from '@mqttkit/asyncapi'
import { MqttApp, router } from '@mqttkit/core'
import { jsonify } from '@mqttkit/zod'
import { z } from 'zod'

type Principal = { uid: string }
type State = { principal?: Principal }

const deviceEventSchema = jsonify(
  z.object({
    temperature: z.number().describe('Celsius reading'),
    humidity: z.number().optional(),
    ts: z.number().int().optional().describe('Unix ms'),
  }),
)

const notificationSchema = jsonify(
  z.object({
    kind: z.enum(['invoice', 'system', 'chat']),
    body: z.string(),
  }),
)

const app = new MqttApp<State>()
  .use(
    aedes({
      tcp: { port: 1885 },
      authenticate: ({ username }) => (username ? { uid: username } : false),
    }),
  )
  .use(
    router<State>()
      .topic('devices/:uid/events', {
        publish: ({ params, principal }) => params.uid === principal?.uid,
        qos: 1,
        schema: deviceEventSchema,
        async onMessage(ctx) {
          // ctx.body 推断为 { temperature: number; humidity?: number; ts?: number }
          console.log(`[device ${ctx.params.uid}] ${ctx.body.temperature}°C`)
        },
        meta: {
          summary: 'Device telemetry uplink',
          description: 'zod 做运行时校验,jsonify 让 schema 进 AsyncAPI 文档',
          tags: ['device', 'telemetry'],
        },
      })
      .topic('users/:uid/notifications', {
        subscribe: ({ params, principal }) => params.uid === principal?.uid,
        publish: false,
        qos: 1,
        retain: true,
        schema: notificationSchema,
        meta: {
          summary: 'User notifications',
          tags: ['notifications'],
        },
      }),
  )
  .use(
    asyncapi({
      info: {
        title: 'mqttkit + zod demo',
        version: '0.0.1',
        description: 'zod 做运行时校验 + jsonify 输出 JSON Schema 到 AsyncAPI 文档。',
      },
      servers: {
        tcp: { host: 'localhost:1885', protocol: 'mqtt', description: 'Aedes TCP broker' },
      },
      port: 9002,
    }),
  )

await app.listen()
console.log('mqtt://localhost:1885 | docs: http://localhost:9002/docs')

在 GitHub 查看

基于 MIT 协议发布