Schema 校验
TypeBox 与 zod payload 校验对照演示,覆盖入站/出站校验模式,以及 phase: 'validation' 失败的 onError 处理。
bash
bun run --cwd examples/schema-validation dev源码
ts
/**
* Standard Schema with **zod** (3.24+).
*
* zod 3.24 起,`z.*()` 返回的 schema 直接实现 Standard Schema 接口,
* 可以原样作为 `topic({ schema })` 的参数;`ctx.body` 自动推断为 zod 静态类型。
*
* 唯一额外步骤:如果你还想让 `@mqttkit/asyncapi` 把这个 schema 输出到文档里,
* 用 `jsonify(...)` 包一层 —— 它会在 schema 上挂 `~jsonSchema`,运行时校验
* 仍走 zod 原生 Standard Schema。
*
* 如果你不需要 AsyncAPI 文档,直接 `schema: z.object(...)` 即可。
*/
import { router } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
import { jsonify } from '@mqttkit/zod'
import { z } from 'zod'
const readingSchema = jsonify(
z.object({
temperature: z.number(),
ts: z.number().optional(),
}),
)
const { app, broker } = createTestApp()
app
.onError((payload) => {
if (payload.phase === 'validation') {
console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
}
})
.use(
router().topic('devices/:uid/readings', {
schema: readingSchema,
async onMessage(ctx) {
// ctx.body 自动推断为 { temperature: number; ts?: number }
console.log(`[accept] ${ctx.params.uid}: temp=${ctx.body.temperature}°C ts=${ctx.body.ts ?? 'n/a'}`)
},
}),
)
await app.listen()
const cases = [
{ label: '合法', payload: { temperature: 21.5, ts: Date.now() } },
{ label: '字段缺失', payload: { ts: Date.now() } },
{ label: '字段类型错误', payload: { temperature: 'hot', ts: 'now' } },
{ label: '非 JSON', payload: 'plain text' as unknown },
]
for (const c of cases) {
console.log(`\n--- ${c.label} ---`)
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: typeof c.payload === 'string' ? c.payload : JSON.stringify(c.payload),
})
}
// 顺手把挂上的 JSON Schema 打印一下,证明 @mqttkit/asyncapi 能读到完整 schema
console.log('\n--- ~jsonSchema attached ---')
console.log(JSON.stringify((readingSchema as unknown as { '~jsonSchema': unknown })['~jsonSchema'], null, 2))
await app.stop()ts
/**
* Standard Schema with **TypeBox**.
*
* TypeBox 本身不实现 Standard Schema 接口,但 `@mqttkit/typebox` 提供了一个
* `typeboxProvider`,注册一次后就可以把任意 `Type.X(...)` schema 直接传给
* `topic({ schema })`,`ctx.body` 也会自动推断为 `Static<T>`。
*/
import { router } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
import { typeboxProvider } from '@mqttkit/typebox'
import { Type } from '@sinclair/typebox'
const readingSchema = Type.Object({
temperature: Type.Number(),
ts: Type.Optional(Type.Number()),
})
const { app, broker } = createTestApp()
app
.addSchemaProvider(typeboxProvider)
.onError((payload) => {
if (payload.phase === 'validation') {
console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
}
})
.use(
router().topic('devices/:uid/readings', {
schema: readingSchema,
async onMessage(ctx) {
// ctx.body 自动推断为 { temperature: number; ts?: number }
console.log(`[accept] ${ctx.params.uid}: temp=${ctx.body.temperature}°C ts=${ctx.body.ts ?? 'n/a'}`)
},
}),
)
await app.listen()
const cases = [
{ label: '合法', payload: { temperature: 21.5, ts: Date.now() } },
{ label: '字段缺失', payload: { ts: Date.now() } },
{ label: '字段类型错误', payload: { temperature: 'hot', ts: 'now' } },
{ label: '非 JSON', payload: 'plain text' as unknown },
]
for (const c of cases) {
console.log(`\n--- ${c.label} ---`)
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: typeof c.payload === 'string' ? c.payload : JSON.stringify(c.payload),
})
}
await app.stop()ts
/**
* Standard Schema 运行时验证 + 错误处理钩子。
*
* 给 topic 加上 `schema` 字段就会在 PUBLISH 时自动验证 payload,验证后的
* 结构化数据落到 `ctx.body`,类型自动推断。失败时通过 `onError` 集中处理。
*
* 这里手写一个最小的 Standard Schema 适配(zod / valibot / arktype /
* typebox-validator 都内置 ~standard 字段,可以直接 `schema: myZodSchema`
* 这样用)。
*/
import { router, type StandardSchemaV1 } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
// ---- 最小 Standard Schema:要求 { temperature: number, ts?: number } ----
type Reading = { temperature: number; ts?: number }
const readingSchema: StandardSchemaV1<unknown, Reading> = {
'~standard': {
version: 1,
vendor: 'mqttkit-example',
validate(value) {
const issues: StandardSchemaV1.Issue[] = []
if (typeof value !== 'object' || value === null) {
return { issues: [{ message: 'expected object', path: [] }] }
}
const obj = value as Record<string, unknown>
if (typeof obj.temperature !== 'number') {
issues.push({ message: 'expected number', path: ['temperature'] })
}
if (obj.ts !== undefined && typeof obj.ts !== 'number') {
issues.push({ message: 'expected number', path: ['ts'] })
}
if (issues.length > 0) return { issues }
return { value: obj as Reading }
},
types: { input: undefined as unknown, output: undefined as unknown as Reading },
},
}
const { app, broker } = createTestApp()
app
.onError((payload) => {
if (payload.phase === 'validation') {
console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
} else {
console.error('[error]', payload.phase, payload.error)
}
})
.use(
router().topic('devices/:uid/readings', {
schema: readingSchema,
async onMessage(ctx) {
// ctx.body 的类型自动从 schema 推断为 Reading
console.log(`[accept] ${ctx.params.uid}: temp=${ctx.body.temperature}°C ts=${ctx.body.ts ?? 'n/a'}`)
},
}),
)
await app.listen()
console.log('--- 合法 payload ---')
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: JSON.stringify({ temperature: 21.5, ts: Date.now() }),
})
console.log('\n--- 字段缺失 ---')
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: JSON.stringify({ ts: Date.now() }),
})
console.log('\n--- 字段类型错误 ---')
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: JSON.stringify({ temperature: 'hot', ts: 'now' }),
})
console.log('\n--- 不是 JSON ---')
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: 'plain text',
})
await app.stop()ts
/**
* 真实 zod + 真实 typebox 在同一个 MqttApp 内同时使用。
*
* - zod 3.24+ 是原生 Standard Schema (`~standard`),core 直接识别。
* - typebox 不实现 Standard Schema,需要 `app.addSchemaProvider(typeboxProvider)`。
* - 两者完全互不干扰:core 永远先匹配 Standard Schema,再轮询 providers。
*/
import { router } from '@mqttkit/core'
import { createTestApp } from '@mqttkit/core/testing'
import { typeboxProvider } from '@mqttkit/typebox'
import { jsonify } from '@mqttkit/zod'
import { Type } from '@sinclair/typebox'
import { z } from 'zod'
// zod schema —— 用于 users/:id(注册路由);jsonify 让它能进 AsyncAPI 文档
const userSchema = jsonify(
z.object({
name: z.string(),
age: z.number().int().positive(),
}),
)
// typebox schema —— 用于 devices/:uid/readings(设备遥测)
const readingSchema = Type.Object({
temperature: Type.Number(),
ts: Type.Optional(Type.Number()),
})
const { app, broker } = createTestApp()
app
.addSchemaProvider(typeboxProvider)
.onError((payload) => {
if (payload.phase === 'validation') {
console.log(`[reject] ${payload.topic}: ${(payload.error as Error).message}`)
}
})
.use(
router()
.topic('users/:id', {
schema: userSchema,
onMessage(ctx) {
// ctx.body 来自 zod 的 InferOutput:{ name: string; age: number }
console.log(`[zod] user ${ctx.params.id}: ${ctx.body.name}, age ${ctx.body.age}`)
},
})
.topic('devices/:uid/readings', {
schema: readingSchema,
onMessage(ctx) {
// ctx.body 来自 typebox 的 Static<T>:{ temperature: number; ts?: number }
console.log(`[typebox] device ${ctx.params.uid}: ${ctx.body.temperature}°C`)
},
}),
)
await app.listen()
console.log('--- zod 路径 ---')
await broker.dispatch({
topic: 'users/u1',
payload: JSON.stringify({ name: 'alice', age: 30 }),
})
await broker.dispatch({
topic: 'users/u1',
payload: JSON.stringify({ name: 'bob', age: -1 }),
})
console.log('\n--- typebox 路径 ---')
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: JSON.stringify({ temperature: 21.5 }),
})
await broker.dispatch({
topic: 'devices/alpha/readings',
payload: JSON.stringify({ temperature: 'hot' }),
})
await app.stop()