Skip to content

Schema 校验

TypeBox 与 zod payload 校验对照演示,覆盖入站/出站校验模式,以及 phase: 'validation' 失败的 onError 处理。

bash
bun run --cwd examples/schema-validation dev

源码

ts
/**
 * Standard Schema with **zod** (3.24+).
 *
 * zod 3.24 起,`z.*()` 返回的 schema 直接实现 Standard Schema 接口,
 * 可以原样作为 `topic({ schema })` 的参数;`ctx.body` 自动推断为 zod 静态类型。
 *
 * 唯一额外步骤:如果你还想让 `@mqttkit/asyncapi` 把这个 schema 输出到文档里,
 * 用 `jsonify(...)` 包一层 —— 它会在 schema 上挂 `~jsonSchema`,运行时校验
 * 仍走 zod 原生 Standard Schema。
 *
 * 如果你不需要 AsyncAPI 文档,直接 `schema: z.object(...)` 即可。
 */
import { router } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
import { jsonify } from '@mqttkit/zod'
import { z } from 'zod'

const readingSchema = jsonify(
  z.object({
    temperature: z.number(),
    ts: z.number().optional(),
  }),
)

const { app, broker } = createTestApp()

app
  .onError((payload) => {
    if (payload.phase === 'validation') {
      console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
    }
  })
  .use(
    router().topic('devices/:uid/readings', {
      schema: readingSchema,
      async onMessage(ctx) {
        // ctx.body 自动推断为 { temperature: number; ts?: number }
        console.log(`[accept] ${ctx.params.uid}: temp=${ctx.body.temperature}°C ts=${ctx.body.ts ?? 'n/a'}`)
      },
    }),
  )

await app.listen()

const cases = [
  { label: '合法', payload: { temperature: 21.5, ts: Date.now() } },
  { label: '字段缺失', payload: { ts: Date.now() } },
  { label: '字段类型错误', payload: { temperature: 'hot', ts: 'now' } },
  { label: '非 JSON', payload: 'plain text' as unknown },
]

for (const c of cases) {
  console.log(`\n--- ${c.label} ---`)
  await broker.dispatch({
    topic: 'devices/alpha/readings',
    payload: typeof c.payload === 'string' ? c.payload : JSON.stringify(c.payload),
  })
}

// 顺手把挂上的 JSON Schema 打印一下,证明 @mqttkit/asyncapi 能读到完整 schema
console.log('\n--- ~jsonSchema attached ---')
console.log(JSON.stringify((readingSchema as unknown as { '~jsonSchema': unknown })['~jsonSchema'], null, 2))

await app.stop()
ts
/**
 * Standard Schema with **TypeBox**.
 *
 * TypeBox 本身不实现 Standard Schema 接口,但 `@mqttkit/typebox` 提供了一个
 * `typeboxProvider`,注册一次后就可以把任意 `Type.X(...)` schema 直接传给
 * `topic({ schema })`,`ctx.body` 也会自动推断为 `Static<T>`。
 */
import { router } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
import { typeboxProvider } from '@mqttkit/typebox'
import { Type } from '@sinclair/typebox'

const readingSchema = Type.Object({
  temperature: Type.Number(),
  ts: Type.Optional(Type.Number()),
})

const { app, broker } = createTestApp()

app
  .addSchemaProvider(typeboxProvider)
  .onError((payload) => {
    if (payload.phase === 'validation') {
      console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
    }
  })
  .use(
    router().topic('devices/:uid/readings', {
      schema: readingSchema,
      async onMessage(ctx) {
        // ctx.body 自动推断为 { temperature: number; ts?: number }
        console.log(`[accept] ${ctx.params.uid}: temp=${ctx.body.temperature}°C ts=${ctx.body.ts ?? 'n/a'}`)
      },
    }),
  )

await app.listen()

const cases = [
  { label: '合法', payload: { temperature: 21.5, ts: Date.now() } },
  { label: '字段缺失', payload: { ts: Date.now() } },
  { label: '字段类型错误', payload: { temperature: 'hot', ts: 'now' } },
  { label: '非 JSON', payload: 'plain text' as unknown },
]

for (const c of cases) {
  console.log(`\n--- ${c.label} ---`)
  await broker.dispatch({
    topic: 'devices/alpha/readings',
    payload: typeof c.payload === 'string' ? c.payload : JSON.stringify(c.payload),
  })
}

await app.stop()
ts
/**
 * Standard Schema 运行时验证 + 错误处理钩子。
 *
 * 给 topic 加上 `schema` 字段就会在 PUBLISH 时自动验证 payload,验证后的
 * 结构化数据落到 `ctx.body`,类型自动推断。失败时通过 `onError` 集中处理。
 *
 * 这里手写一个最小的 Standard Schema 适配(zod / valibot / arktype /
 * typebox-validator 都内置 ~standard 字段,可以直接 `schema: myZodSchema`
 * 这样用)。
 */
import { router, type StandardSchemaV1 } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'

// ---- 最小 Standard Schema:要求 { temperature: number, ts?: number } ----
type Reading = { temperature: number; ts?: number }

const readingSchema: StandardSchemaV1<unknown, Reading> = {
  '~standard': {
    version: 1,
    vendor: 'mqttkit-example',
    validate(value) {
      const issues: StandardSchemaV1.Issue[] = []
      if (typeof value !== 'object' || value === null) {
        return { issues: [{ message: 'expected object', path: [] }] }
      }
      const obj = value as Record<string, unknown>
      if (typeof obj.temperature !== 'number') {
        issues.push({ message: 'expected number', path: ['temperature'] })
      }
      if (obj.ts !== undefined && typeof obj.ts !== 'number') {
        issues.push({ message: 'expected number', path: ['ts'] })
      }
      if (issues.length > 0) return { issues }
      return { value: obj as Reading }
    },
    types: { input: undefined as unknown, output: undefined as unknown as Reading },
  },
}

const { app, broker } = createTestApp()

app
  .onError((payload) => {
    if (payload.phase === 'validation') {
      console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
    } else {
      console.error('[error]', payload.phase, payload.error)
    }
  })
  .use(
    router().topic('devices/:uid/readings', {
      schema: readingSchema,
      async onMessage(ctx) {
        // ctx.body 的类型自动从 schema 推断为 Reading
        console.log(`[accept] ${ctx.params.uid}: temp=${ctx.body.temperature}°C ts=${ctx.body.ts ?? 'n/a'}`)
      },
    }),
  )

await app.listen()

console.log('--- 合法 payload ---')
await broker.dispatch({
  topic: 'devices/alpha/readings',
  payload: JSON.stringify({ temperature: 21.5, ts: Date.now() }),
})

console.log('\n--- 字段缺失 ---')
await broker.dispatch({
  topic: 'devices/alpha/readings',
  payload: JSON.stringify({ ts: Date.now() }),
})

console.log('\n--- 字段类型错误 ---')
await broker.dispatch({
  topic: 'devices/alpha/readings',
  payload: JSON.stringify({ temperature: 'hot', ts: 'now' }),
})

console.log('\n--- 不是 JSON ---')
await broker.dispatch({
  topic: 'devices/alpha/readings',
  payload: 'plain text',
})

await app.stop()
ts
/**
 * 真实 zod + 真实 typebox 在同一个 MqttApp 内同时使用。
 *
 * - zod 3.24+ 是原生 Standard Schema (`~standard`),core 直接识别。
 * - typebox 不实现 Standard Schema,需要 `app.addSchemaProvider(typeboxProvider)`。
 * - 两者完全互不干扰:core 永远先匹配 Standard Schema,再轮询 providers。
 */
import { router } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
import { typeboxProvider } from '@mqttkit/typebox'
import { jsonify } from '@mqttkit/zod'
import { Type } from '@sinclair/typebox'
import { z } from 'zod'

// zod schema —— 用于 users/:id(注册路由);jsonify 让它能进 AsyncAPI 文档
const userSchema = jsonify(
  z.object({
    name: z.string(),
    age: z.number().int().positive(),
  }),
)

// typebox schema —— 用于 devices/:uid/readings(设备遥测)
const readingSchema = Type.Object({
  temperature: Type.Number(),
  ts: Type.Optional(Type.Number()),
})

const { app, broker } = createTestApp()

app
  .addSchemaProvider(typeboxProvider)
  .onError((payload) => {
    if (payload.phase === 'validation') {
      console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
    }
  })
  .use(
    router()
      .topic('users/:id', {
        schema: userSchema,
        onMessage(ctx) {
          // ctx.body 来自 zod 的 InferOutput:{ name: string; age: number }
          console.log(`[zod] user ${ctx.params.id}: ${ctx.body.name}, age ${ctx.body.age}`)
        },
      })
      .topic('devices/:uid/readings', {
        schema: readingSchema,
        onMessage(ctx) {
          // ctx.body 来自 typebox 的 Static<T>:{ temperature: number; ts?: number }
          console.log(`[typebox] device ${ctx.params.uid}: ${ctx.body.temperature}°C`)
        },
      }),
  )

await app.listen()

console.log('--- zod 路径 ---')
await broker.dispatch({
  topic: 'users/u1',
  payload: JSON.stringify({ name: 'alice', age: 30 }),
})
await broker.dispatch({
  topic: 'users/u1',
  payload: JSON.stringify({ name: 'bob', age: -1 }),
})

console.log('\n--- typebox 路径 ---')
await broker.dispatch({
  topic: 'devices/alpha/readings',
  payload: JSON.stringify({ temperature: 21.5 }),
})
await broker.dispatch({
  topic: 'devices/alpha/readings',
  payload: JSON.stringify({ temperature: 'hot' }),
})

await app.stop()

在 GitHub 查看 · Schema 指南

基于 MIT 协议发布