Skip to main content

Stream Processing

Handle data that arrives in chunks, like network packets or file reads.

The Problem

Network data doesn't arrive in neat message boundaries. You're parsing an MQTT protocol over TCP — the server sends a 300-byte message, but your socket returns it in two chunks: 256 bytes, then 44 bytes. Without StreamProcessor, you need manual accumulator code to track partial reads and reassemble messages. With it, you append() chunks and peekInt()/readBuffer() across boundaries transparently.

Stream Fragmentation Problem

Buffer's StreamProcessor handles this transparently.

BufferStream

Split a buffer into chunks for processing:

import com.ditchoom.buffer.stream.BufferStream

val largeBuffer = PlatformBuffer.allocate(1024 * 1024) // 1MB
// ... fill buffer ...
largeBuffer.resetForRead()

val stream = BufferStream(largeBuffer, chunkSize = 8192)

stream.forEachChunk { chunk ->
// Process each 8KB chunk
processChunk(chunk.buffer)

if (chunk.isLast) {
println("Processing complete at offset ${chunk.offset}")
}
}

StreamProcessor

Parse protocols that span chunk boundaries:

import com.ditchoom.buffer.stream.StreamProcessor
import com.ditchoom.buffer.stream.builder
import com.ditchoom.buffer.pool.BufferPool

val pool = BufferPool(defaultBufferSize = 8192)

// Using the builder pattern
val processor = StreamProcessor.builder(pool).build()

// Or directly
val processor2 = StreamProcessor.create(pool)

// Simulate network data arriving
processor.append(networkPacket1)
processor.append(networkPacket2)

// Parse messages with length-prefix protocol
while (processor.available() >= 4) {
val length = processor.peekInt() // Peek without consuming

if (processor.available() >= 4 + length) {
processor.skip(4) // Skip length header
val payload = processor.readBuffer(length)
handleMessage(payload)
} else {
break // Wait for more data
}
}

processor.release()

StreamProcessor Operations

Peek Operations (don't consume data)

All peek operations accept an optional offset parameter to look ahead without consuming:

// Peek at bytes without consuming
val firstByte = processor.peekByte()
val byteAt5 = processor.peekByte(offset = 5)

// Peek multi-byte values (all support offset)
val shortValue = processor.peekShort()
val intValue = processor.peekInt()
val longValue = processor.peekLong()

// Peek at offset - useful for parsing headers
val messageType = processor.peekByte() // byte 0: message type
val messageLength = processor.peekInt(offset = 1) // bytes 1-4: length
val timestamp = processor.peekLong(offset = 5) // bytes 5-12: timestamp

Pattern Matching

// Check for magic bytes (e.g., gzip header 0x1f 0x8b)
val gzipMagic = PlatformBuffer.wrap(byteArrayOf(0x1f, 0x8b.toByte()))
if (processor.peekMatches(gzipMagic)) {
// It's gzip compressed
}

// Find first mismatch index (-1 if matches)
val mismatchIndex = processor.peekMismatch(expectedPattern)

Read Operations (consume data)

val byte = processor.readByte()
val short = processor.readShort()
val int = processor.readInt()
val long = processor.readLong()

// Read into a buffer (zero-copy when possible)
val payload = processor.readBuffer(100)

Skip

// Skip bytes efficiently
processor.skip(4) // Skip header

Processing Flow

Stream Processing Flow

Zero-Copy vs Copy

StreamProcessor optimizes for zero-copy:

Zero-Copy vs Copy

Example: HTTP Chunked Transfer

class HttpChunkedParser(private val pool: BufferPool) {
private val processor = StreamProcessor.create(pool)
private val CRLF = "\r\n".toReadBuffer() // No intermediate ByteArray

fun append(data: ReadBuffer) {
processor.append(data)
}

fun parseChunks(): List<ReadBuffer> {
val chunks = mutableListOf<ReadBuffer>()

while (true) {
// Find chunk size line
val sizeLine = readLine() ?: break
val chunkSize = sizeLine.trim().toIntOrNull(16) ?: break

if (chunkSize == 0) break // End of chunks

if (processor.available() < chunkSize + 2) break // Need more data

val chunk = processor.readBuffer(chunkSize)
chunks.add(chunk)

processor.skip(2) // Skip trailing CRLF
}

return chunks
}

private fun readLine(): String? {
// Scan for CRLF
for (i in 0 until processor.available() - 1) {
if (processor.peekByte(i) == '\r'.code.toByte() &&
processor.peekByte(i + 1) == '\n'.code.toByte()) {

val line = processor.readBuffer(i)
processor.skip(2) // Skip CRLF
return line.readString(i)
}
}
return null
}

fun release() {
processor.release()
}
}

Builder Pattern with Transforms

The StreamProcessor.builder() API allows composing transforms like decompression:

// With compression module
val processor = StreamProcessor.builder(pool)
.decompress(CompressionAlgorithm.Gzip) // From buffer-compression
.build()

// Append compressed data
processor.append(compressedChunk)
processor.finish() // Signal no more input

// Read decompressed data
val data = processor.readBuffer(processor.available())
processor.release()

See Compression for full details.

Auto-Filling Stream Processor

For network protocols, you typically need a loop that reads from the socket and appends to the stream processor until enough data is available. AutoFillingSuspendingStreamProcessor eliminates this boilerplate by automatically calling a refill callback when peek/read operations need more data:

val processor = StreamProcessor.builder(pool)
.buildSuspendingWithAutoFill { stream ->
val buffer = pool.acquire(bufferSize)
val bytesRead = socket.read(buffer, timeout)
if (bytesRead <= 0) {
buffer.freeIfNeeded()
throw EndOfStreamException()
}
buffer.setLimit(buffer.position())
buffer.position(0)
stream.append(buffer)
}

// Now peek/read operations automatically trigger socket reads!
val messageType = processor.peekByte() // triggers refill if empty
val length = processor.peekInt(offset = 1) // triggers refill if < 5 bytes
val payload = processor.readBuffer(length) // triggers refill if needed

Before vs After

Before (manual ensureAvailable loop):

// This pattern is repeated everywhere data is consumed
while (stream.available() < neededBytes) {
val buf = pool.acquire()
val n = socket.read(buf, timeout)
if (n <= 0) break
buf.resetForRead()
stream.append(buf)
}
val data = stream.readBuffer(neededBytes)

After (auto-filling):

// Just read — refill happens automatically
val data = processor.readBuffer(neededBytes)

EndOfStreamException

When the data source is exhausted (socket closed, file ended), the refill callback should throw EndOfStreamException. Callers can catch this to handle clean disconnection:

try {
while (true) {
val message = parseNextMessage(processor)
handleMessage(message)
}
} catch (e: EndOfStreamException) {
// Clean shutdown — peer closed connection
}

Best Practices

  1. Use peek before read - check data availability first
  2. Use pools - StreamProcessor works best with pooled buffers
  3. Call release() - clean up when done
  4. Handle fragmentation - always check available() before reading
  5. Prefer peekMatches - for magic byte detection
  6. Call finish() for transforms - signals end of input for decompression etc.