Skip to content

AsyncAPI + Elysia

一个 node:http server 同时挂 MQTT-over-WebSocket(aedes 处理)和 Elysia HTTP app(serve AsyncAPI 文档)。适合不想开两个端口的场景。

bash
bun run --cwd examples/asyncapi-elysia dev

所有东西都在 :3300——MQTT WS 在 /mqtt,AsyncAPI HTTP 路由并列。

源码

ts
import { createServer } from 'node:http'
import type { IncomingMessage, ServerResponse } from 'node:http'
import { Readable } from 'node:stream'
import { aedes } from '@mqttkit/aedes'
import { createAsyncApiHandlers } from '@mqttkit/asyncapi'
import { MqttApp, router } from '@mqttkit/core'
import { Elysia } from 'elysia'

type Principal = { uid: string }
type State = { principal?: Principal }

const PORT = 3300
const WS_PATH = '/mqtt'

// 1. A Node http.Server that both aedes (for MQTT-over-WS) and Elysia share.
const httpServer = createServer()

// 2. mqttkit app. aedes attaches MQTT-over-WS to `httpServer` on /mqtt
//    instead of starting its own ws server.
const mqttApp = new MqttApp<State>()
  .use(
    aedes({
      tcp: { port: 1883 },
      ws: { server: httpServer, path: WS_PATH },
      authenticate: ({ username }) => (username ? { uid: username } : false),
    }),
  )
  .use(
    router<State>()
      .topic('devices/:uid/events', {
        publish: ({ params, principal }) => params.uid === principal?.uid,
        qos: 1,
        schema: {
          type: 'object',
          required: ['temperature'],
          properties: {
            temperature: { type: 'number', description: 'Celsius' },
            ts: { type: 'integer' },
          },
        },
        async onMessage(ctx) {
          await ctx.publish(`server/${ctx.params.uid}/echo`, ctx.payload)
        },
        meta: { summary: 'Device telemetry uplink', tags: ['device'] },
      })
      .topic('server/:uid/echo', {
        subscribe: ({ params, principal }) => params.uid === principal?.uid,
        qos: 0,
        meta: { summary: 'Server echo channel', tags: ['device'] },
      })
      .topic('users/:uid/notifications', {
        subscribe: ({ params, principal }) => params.uid === principal?.uid,
        publish: false,
        qos: 1,
        retain: true,
        meta: { summary: 'User notifications', tags: ['notifications'] },
      }),
  )

await mqttApp.listen()

// 3. AsyncAPI handlers wired into Elysia routes.
const docs = createAsyncApiHandlers(mqttApp, {
  info: { title: 'mqttkit + Elysia', version: '0.1.0' },
  servers: {
    tcp: { host: `localhost:1883`, protocol: 'mqtt' },
    ws: { host: `localhost:${PORT}`, protocol: 'ws', pathname: WS_PATH },
  },
  prefix: '/docs/mqtt',
})

const elysia = new Elysia()
  .get('/', () => 'mqttkit + Elysia: HTTP + MQTT-over-WS on a single port')
  .get(docs.paths.json, ({ set }) => {
    set.headers['content-type'] = 'application/json; charset=utf-8'
    return docs.json()
  })
  .get(docs.paths.yaml, ({ set }) => {
    set.headers['content-type'] = 'application/yaml; charset=utf-8'
    return docs.yaml()
  })
  .get(docs.paths.docs, ({ set }) => {
    set.headers['content-type'] = 'text/html; charset=utf-8'
    return docs.html()
  })

// 4. Bridge Node http requests to Elysia.handle(Request) → Response.
httpServer.on('request', async (req: IncomingMessage, res: ServerResponse) => {
  try {
    const response = await elysia.handle(toFetchRequest(req))
    sendFetchResponse(res, response)
  } catch (error) {
    res.statusCode = 500
    res.end(String(error))
  }
})

await new Promise<void>((resolve, reject) => {
  httpServer.once('error', reject)
  httpServer.listen(PORT, () => {
    httpServer.off('error', reject)
    resolve()
  })
})

console.log(`mqtt://localhost:1883             (TCP)`)
console.log(`ws://localhost:${PORT}${WS_PATH}              (MQTT-over-WS, same port as docs)`)
console.log(`http://localhost:${PORT}${docs.paths.docs}    (docs page)`)
console.log(`http://localhost:${PORT}${docs.paths.json}    (AsyncAPI JSON)`)

function toFetchRequest(req: IncomingMessage): Request {
  const host = req.headers.host ?? `localhost:${PORT}`
  const url = `http://${host}${req.url ?? '/'}`
  const init: RequestInit & { duplex?: 'half' } = {
    method: req.method ?? 'GET',
    headers: req.headers as Record<string, string>,
  }
  if (req.method && req.method !== 'GET' && req.method !== 'HEAD') {
    init.body = Readable.toWeb(req) as unknown as ReadableStream
    init.duplex = 'half'
  }
  return new Request(url, init as RequestInit)
}

async function sendFetchResponse(res: ServerResponse, response: Response): Promise<void> {
  res.statusCode = response.status
  response.headers.forEach((value, key) => res.setHeader(key, value))
  if (!response.body) {
    res.end()
    return
  }
  const reader = response.body.getReader()
  while (true) {
    const { done, value } = await reader.read()
    if (done) break
    res.write(value)
  }
  res.end()
}

在 GitHub 查看

基于 MIT 协议发布