Skip to main content

Typed Stream Multiplexing

Working with raw QuicByteStream buffers is fine for byte protocols, but most applications speak in messages. StreamMux<T> layers a Codec<T> over a QUIC connection so each stream sends and receives typed values — framing and buffer management happen for you.

The Entry Point

withQuicMux is withQuicConnection plus a codec. The block receives a StreamMux<T>:

suspend fun <T, R> withQuicMux(
hostname: String,
port: Int,
quicOptions: QuicOptions,
codec: Codec<T>,
connectionOptions: ConnectionOptions = ConnectionOptions(),
timeout: Duration = 15.seconds,
block: suspend StreamMux<T>.() -> R,
): R

StreamMux<T> opens and accepts typed streams:

MethodReturnsUse
openBidirectional()Connection<T>Open a send+receive typed stream.
openUnidirectional()Sender<T>Open a send-only typed stream.
acceptBidirectional()Connection<T>Accept a peer-opened send+receive stream.
acceptUnidirectional()Receiver<T>Accept a peer-opened send-only stream.

A Connection<T> is send(value), receive(): Flow<T>, close(), and an id (the QUIC stream id).

Defining a Codec

A Codec<T> encodes a value to a buffer, decodes one back, and — because streams are byte streams, not message streams — tells the framing layer how big the next frame is so it can be split out of the incoming bytes. Here is a length-prefixed UTF-8 string codec:

object StringCodec : Codec<String> {
override fun encode(buffer: WriteBuffer, value: String, context: EncodeContext) {
val bytes = value.encodeToByteArray()
buffer.writeShort(bytes.size.toShort()) // 2-byte length prefix
buffer.writeBytes(bytes)
}

override fun decode(buffer: ReadBuffer, context: DecodeContext): String {
val length = buffer.readShort().toInt() and 0xFFFF
return buffer.readString(length)
}

override fun wireSize(value: String, context: EncodeContext): WireSize = WireSize.BackPatch

override fun peekFrameSize(stream: StreamProcessor, baseOffset: Int): PeekResult {
// Need at least the 2-byte length to know the frame size.
if (stream.available() < baseOffset + 2) return PeekResult.NeedsMoreData
val length = stream.peekShort(baseOffset).toInt() and 0xFFFF
return PeekResult.Complete(2 + length)
}
}

peekFrameSize returns PeekResult.Complete(n) once a full frame is buffered, or PeekResult.NeedsMoreData to wait for more bytes. wireSize lets the encoder pre-size its buffer (WireSize.Exact(n) for fixed sizes, WireSize.BackPatch for variable ones).

Client and Server

The client uses withQuicMux. The server gets a raw QuicScope from withQuicServer, so it wraps it in a QuicStreamMux directly:

// --- Server: accept a typed bidi stream and echo with a prefix ---
withQuicServer(port = 0, tlsConfig = tlsConfig, quicOptions = quicOptions) {
connections {
val mux = QuicStreamMux(this, StringCodec, ConnectionOptions())
val conn = mux.acceptBidirectional()
val msg = conn.receive().first()
conn.send("echo: $msg")
conn.close()
}
}

// --- Client: open a typed bidi stream, send, receive ---
val response = withQuicMux("localhost", port, quicOptions, StringCodec) {
val conn = openBidirectional()
conn.send("hello")
val reply = conn.receive().first()
conn.close()
reply
}
// response == "echo: hello"

receive() is a cold Flow<T> — collect it for a stream of messages, or take .first() for a single request/response exchange as above.

Next Steps