Stream Processing
Handle data that arrives in chunks, like network packets or file reads.
The Problem
Network data doesn't arrive in neat message boundaries. A server sends a 300-byte message, but your socket returns it in two chunks: 256 bytes, then 44 bytes. Without StreamProcessor, you need manual accumulator code to track partial reads and reassemble messages. With it, you append() chunks and peekInt()/readBuffer() across boundaries transparently.
Buffer's StreamProcessor handles this transparently.
BufferStream
Split a buffer into chunks for processing:
import com.ditchoom.buffer.stream.BufferStream
val largeBuffer = BufferFactory.Default.allocate(1024 * 1024) // 1MB
// ... fill buffer ...
largeBuffer.resetForRead()
val stream = BufferStream(largeBuffer, chunkSize = 8192)
stream.forEachChunk { chunk ->
// Process each 8KB chunk
processChunk(chunk.buffer)
if (chunk.isLast) {
println("Processing complete at offset ${chunk.offset}")
}
}
StreamProcessor
Parse protocols that span chunk boundaries:
import com.ditchoom.buffer.stream.StreamProcessor
import com.ditchoom.buffer.stream.builder
import com.ditchoom.buffer.pool.BufferPool
val pool = BufferPool(defaultBufferSize = 8192)
// Using the builder pattern
val processor = StreamProcessor.builder(pool).build()
// Or directly
val processor2 = StreamProcessor.create(pool)
// Simulate network data arriving
processor.append(networkPacket1)
processor.append(networkPacket2)
// Parse messages with length-prefix protocol
while (processor.available() >= 4) {
val length = processor.peekInt() // Peek without consuming
if (processor.available() >= 4 + length) {
processor.skip(4) // Skip length header
val payload = processor.readBuffer(length)
handleMessage(payload)
} else {
break // Wait for more data
}
}
processor.release()
StreamProcessor Operations
Peek Operations (don't consume data)
All peek operations accept an optional offset parameter to look ahead without consuming:
// Peek at bytes without consuming
val firstByte = processor.peekByte()
val byteAt5 = processor.peekByte(offset = 5)
// Peek multi-byte values (all support offset)
val shortValue = processor.peekShort()
val intValue = processor.peekInt()
val longValue = processor.peekLong()
// Peek at offset - useful for parsing headers
val messageType = processor.peekByte() // byte 0: message type
val messageLength = processor.peekInt(offset = 1) // bytes 1-4: length
val timestamp = processor.peekLong(offset = 5) // bytes 5-12: timestamp
Pattern Matching
// Check for magic bytes (e.g., gzip header 0x1f 0x8b)
val gzipMagic = BufferFactory.Default.wrap(byteArrayOf(0x1f, 0x8b.toByte()))
if (processor.peekMatches(gzipMagic)) {
// It's gzip compressed
}
// Find first mismatch index (-1 if matches)
val mismatchIndex = processor.peekMismatch(expectedPattern)
Read Operations (consume data)
val byte = processor.readByte()
val short = processor.readShort()
val int = processor.readInt()
val long = processor.readLong()
// Read into a buffer (zero-copy when possible)
val payload = processor.readBuffer(100)
Skip
// Skip bytes efficiently
processor.skip(4) // Skip header
Processing Flow
Zero-Copy vs Copy
StreamProcessor optimizes for zero-copy:
Example: HTTP Chunked Transfer
class HttpChunkedParser(private val pool: BufferPool) {
private val processor = StreamProcessor.create(pool)
private val CRLF = "\r\n".toReadBuffer() // No intermediate ByteArray
fun append(data: ReadBuffer) {
processor.append(data)
}
fun parseChunks(): List<ReadBuffer> {
val chunks = mutableListOf<ReadBuffer>()
while (true) {
// Find chunk size line
val sizeLine = readLine() ?: break
val chunkSize = sizeLine.trim().toIntOrNull(16) ?: break
if (chunkSize == 0) break // End of chunks
if (processor.available() < chunkSize + 2) break // Need more data
val chunk = processor.readBuffer(chunkSize)
chunks.add(chunk)
processor.skip(2) // Skip trailing CRLF
}
return chunks
}
private fun readLine(): String? {
// Scan for CRLF
for (i in 0 until processor.available() - 1) {
if (processor.peekByte(i) == '\r'.code.toByte() &&
processor.peekByte(i + 1) == '\n'.code.toByte()) {
val line = processor.readBuffer(i)
processor.skip(2) // Skip CRLF
return line.readString(i)
}
}
return null
}
fun release() {
processor.release()
}
}
Builder Pattern with Transforms
The StreamProcessor.builder() API allows composing transforms like decompression:
// With compression module
val processor = StreamProcessor.builder(pool)
.decompress(CompressionAlgorithm.Gzip) // From buffer-compression
.build()
// Append compressed data
processor.append(compressedChunk)
processor.finish() // Signal no more input
// Read decompressed data
val data = processor.readBuffer(processor.available())
processor.release()
See Compression for full details.
Auto-Filling Stream Processor
For network protocols, you typically need a loop that reads from the socket and appends to the stream processor until enough data is available. AutoFillingSuspendingStreamProcessor eliminates this boilerplate by automatically calling a refill callback when peek/read operations need more data:
val processor = StreamProcessor.builder(pool)
.buildSuspendingWithAutoFill { stream ->
val buffer = pool.acquire(bufferSize)
val bytesRead = socket.read(buffer, timeout)
if (bytesRead <= 0) {
buffer.freeIfNeeded()
throw EndOfStreamException()
}
buffer.setLimit(buffer.position())
buffer.position(0)
stream.append(buffer)
}
// Now peek/read operations automatically trigger socket reads!
val messageType = processor.peekByte() // triggers refill if empty
val length = processor.peekInt(offset = 1) // triggers refill if < 5 bytes
val payload = processor.readBuffer(length) // triggers refill if needed
Before vs After
Before (manual ensureAvailable loop):
// This pattern is repeated everywhere data is consumed
while (stream.available() < neededBytes) {
val buf = pool.acquire()
val n = socket.read(buf, timeout)
if (n <= 0) break
buf.resetForRead()
stream.append(buf)
}
val data = stream.readBuffer(neededBytes)
After (auto-filling):
// Just read — refill happens automatically
val data = processor.readBuffer(neededBytes)
EndOfStreamException
When the data source is exhausted (socket closed, file ended), the refill callback should throw EndOfStreamException. Callers can catch this to handle clean disconnection:
try {
while (true) {
val message = parseNextMessage(processor)
handleMessage(message)
}
} catch (e: EndOfStreamException) {
// Clean shutdown — peer closed connection
}
Best Practices
- Use peek before read - check data availability first
- Use pools - StreamProcessor works best with pooled buffers
- Call release() - clean up when done
- Handle fragmentation - always check
available()before reading - Prefer peekMatches - for magic byte detection
- Call finish() for transforms - signals end of input for decompression etc.