Skip to main content

Compression

The buffer-compression module provides cross-platform compression and decompression using Gzip, Deflate, and Raw formats.

Installation

dependencies {
implementation("com.ditchoom:buffer:<latest-version>")
implementation("com.ditchoom:buffer-compression:<latest-version>")
}

Quick Start (All Platforms)

These examples work on every supported platform, including browser JavaScript.

Compress and Decompress

import com.ditchoom.buffer.compression.*

// Compress
val data = "Hello, World!".toReadBuffer()
val compressed = compressAsync(data, CompressionAlgorithm.Gzip)

// Decompress
val decompressed = decompressAsync(compressed, CompressionAlgorithm.Gzip)
val text = decompressed.readString(decompressed.remaining())

Round-Trip Example

suspend fun roundTrip(input: String): String {
val compressed = compressAsync(input.toReadBuffer())
val decompressed = decompressAsync(compressed)
return decompressed.readString(decompressed.remaining())
}

Streaming API

For large data or network scenarios where data arrives in chunks:

Compress and Send Over Network

suspend fun sendCompressed(socket: Socket, data: String) {
SuspendingStreamingCompressor.create(CompressionAlgorithm.Gzip).use { compressor ->
compressor.compress(data.toReadBuffer()).forEach { socket.write(it) }
compressor.finish().forEach { socket.write(it) }
}
}

Flush vs Finish

The streaming compressor provides two ways to emit output:

  • flush() - Produces output that can be immediately decompressed, but keeps the stream open for more data. The output ends with a sync marker (00 00 FF FF).
  • finish() - Finalizes the stream and produces the final output. The stream cannot accept more data after this.

Use flush() when you need independently decompressible messages within a single compression context:

val compressor = StreamingCompressor.create(algorithm = CompressionAlgorithm.Raw)
try {
// First message - can be decompressed immediately
compressor.compress("message1".toReadBuffer()) {}
compressor.flush { socket.write(it) }

// Second message - still using same compression context
compressor.compress("message2".toReadBuffer()) {}
compressor.flush { socket.write(it) }

// When done, call finish
compressor.finish { socket.write(it) }
} finally {
compressor.close()
}
note

flush() is not supported in browser JavaScript. Use finish() instead, or check supportsSyncCompression before using flush().

Receive and Decompress from Network

suspend fun receiveDecompressed(socket: Socket): PlatformBuffer {
val output = mutableListOf<ReadBuffer>()
SuspendingStreamingDecompressor.create(CompressionAlgorithm.Gzip).use { decompressor ->
while (socket.hasData()) {
output += decompressor.decompress(socket.read())
}
output += decompressor.finish()
}
// Combine chunks
val totalSize = output.sumOf { it.remaining() }
val result = BufferFactory.Default.allocate(totalSize)
output.forEach { result.write(it) }
result.resetForRead()
return result
}

Compression Algorithms

// Gzip - most common for HTTP, includes headers and CRC
CompressionAlgorithm.Gzip

// Deflate - zlib format with header/trailer
CompressionAlgorithm.Deflate

// Raw - no headers, for custom protocols or when you manage framing yourself
CompressionAlgorithm.Raw

Sync Flush Marker Handling

Some protocols (like WebSocket permessage-deflate) require stripping or appending the deflate sync marker (00 00 FF FF). The buffer module provides utilities for this:

Convenience Functions

For most use cases, use the high-level functions:

// Compress with sync flush and strip the marker
val compressed = compressWithSyncFlush(data.toReadBuffer())

// Decompress data that had the marker stripped
val decompressed = decompressWithSyncFlush(compressed)

Low-Level Utilities

For more control, use the extension functions directly:

// Strip the sync marker from the end of compressed data
val stripped = compressedBuffer.stripSyncFlushMarker()

Manual Marker Handling

When decompressing data that had the sync marker stripped, feed the marker as a separate buffer to avoid copying the compressed data:

val decompressor = SuspendingStreamingDecompressor.create(CompressionAlgorithm.Raw)
try {
val output = mutableListOf<ReadBuffer>()
output += decompressor.decompress(compressedData)

// Feed the sync marker separately (no copy of compressed data)
val marker = BufferFactory.Default.allocate(4)
marker.writeInt(DeflateFormat.SYNC_FLUSH_MARKER)
marker.resetForRead()
output += decompressor.decompress(marker)
output += decompressor.finish()

// Combine output chunks...
} finally {
decompressor.close()
}

The Sync Marker Constant

The marker value is available as a constant:

DeflateFormat.SYNC_FLUSH_MARKER  // 0x0000FFFF (00 00 FF FF)
note

These utilities work with raw deflate (CompressionAlgorithm.Raw). The sync marker is a deflate format concept, not specific to any protocol.

Compression Levels

Control the speed/size tradeoff:

// Fastest compression
compressAsync(data, level = CompressionLevel.BestSpeed)

// Smallest output
compressAsync(data, level = CompressionLevel.BestCompression)

// Balanced (default)
compressAsync(data, level = CompressionLevel.Default)

// Custom level 0-9
compressAsync(data, level = CompressionLevel.Custom(4))

Handling Large Files

For large files, process in chunks to avoid loading everything into memory:

suspend fun compressFile(input: FileChannel, output: FileChannel) {
val chunkSize = 64 * 1024 // 64KB chunks

SuspendingStreamingCompressor.create(CompressionAlgorithm.Gzip).use { compressor ->
val buffer = BufferFactory.Default.allocate(chunkSize)

while (input.read(buffer) > 0) {
buffer.resetForRead()
compressor.compress(buffer).forEach { chunk ->
output.write(chunk)
}
buffer.clear()
}

compressor.finish().forEach { chunk ->
output.write(chunk)
}
}
}

HTTP Response Decompression

Common pattern for handling gzipped HTTP responses:

suspend fun fetchAndDecompress(url: String): String {
val response = httpClient.get(url) {
header("Accept-Encoding", "gzip")
}

val contentEncoding = response.headers["Content-Encoding"]
val body = response.body

return if (contentEncoding == "gzip") {
val decompressed = decompressAsync(body, CompressionAlgorithm.Gzip)
decompressed.readString(decompressed.remaining())
} else {
body.readString(body.remaining())
}
}

Platform Optimizations

The examples above use the async API which works everywhere. On platforms with synchronous compression support, you can use more efficient APIs.

Platform Support Matrix

PlatformSync APIAsync APIEngine
JVMjava.util.zip
Androidjava.util.zip
iOS/macOSzlib
Linux Nativezlib
JS (Node.js)zlib module
JS (Browser)CompressionStream
WasmJS

Check Platform Support at Runtime

if (supportsSyncCompression) {
// Use faster sync API
} else {
// Fall back to async API (browser JS)
}

One-Shot API (Non-Browser Only)

For simple cases where you have all data upfront:

// Only available when supportsSyncCompression == true
val original = "Hello, World!".toReadBuffer()
val compressed = compress(original, CompressionAlgorithm.Gzip).getOrThrow()
val decompressed = decompress(compressed, CompressionAlgorithm.Gzip).getOrThrow()

Handle errors explicitly:

when (val result = compress(buffer, CompressionAlgorithm.Gzip)) {
is CompressionResult.Success -> println("Compressed: ${result.buffer.remaining()} bytes")
is CompressionResult.Failure -> println("Failed: ${result.message}")
}

// Or use convenience methods
val buffer = compress(data, algorithm).getOrThrow() // throws on failure
val buffer = compress(data, algorithm).getOrNull() // null on failure

Synchronous Streaming API (Non-Browser Only)

The sync API avoids coroutine overhead and is slightly faster:

// Compression with callback
StreamingCompressor.create(CompressionAlgorithm.Gzip).use(
onOutput = { compressedChunk ->
socket.write(compressedChunk)
}
) { compress ->
compress("First chunk".toReadBuffer())
compress("Second chunk".toReadBuffer())
compress("Third chunk".toReadBuffer())
}

// Decompression with callback
StreamingDecompressor.create(CompressionAlgorithm.Gzip).use(
onOutput = { decompressedChunk ->
processData(decompressedChunk)
}
) { decompress ->
decompress(compressedChunk1)
decompress(compressedChunk2)
}

Sync Compression with Suspending I/O

When reading from a network socket (suspending) but using sync compression:

StreamingCompressor.create(CompressionAlgorithm.Gzip).useSuspending(
onOutput = { chunk -> channel.send(chunk) }
) { compress ->
while (socket.hasData()) {
val data = socket.read() // suspend call is OK here
compress(data) // sync compression
}
}

Choosing the Right API

suspend fun compressEfficiently(data: ReadBuffer): List<ReadBuffer> {
return if (supportsSyncCompression) {
// Platform supports sync - use one-shot for simplicity
listOf(compress(data, CompressionAlgorithm.Gzip).getOrThrow())
} else {
// Browser - must use async
val output = mutableListOf<ReadBuffer>()
SuspendingStreamingCompressor.create(CompressionAlgorithm.Gzip).use { compressor ->
output += compressor.compress(data)
output += compressor.finish()
}
output
}
}

StreamProcessor Integration

For protocol parsing scenarios where you receive compressed data and need to parse structured content:

withPool { pool ->
val processor = StreamProcessor.builder(pool)
.decompress(CompressionAlgorithm.Gzip)
.build()

try {
// Append compressed chunks as they arrive
processor.append(compressedChunk1)
processor.append(compressedChunk2)
processor.finish() // Signal no more input

// Parse the decompressed data
val messageType = processor.peekByte()
val length = processor.peekInt(offset = 1)
processor.skip(5) // Skip header
val payload = processor.readBuffer(length)
} finally {
processor.release()
}
}

With Network I/O

withPool { pool ->
val processor = StreamProcessor.builder(pool)
.decompress(CompressionAlgorithm.Gzip)
.build()

try {
// Read from socket and decompress on the fly
while (socket.hasData()) {
processor.append(socket.read()) // socket.read() suspends
}
processor.finish()

// Parse multiple messages from the stream
while (processor.available() >= 4) {
val length = processor.peekInt()
if (processor.available() < 4 + length) break

processor.skip(4)
val message = processor.readBuffer(length)
handleMessage(message)
}
} finally {
processor.release()
}
}

Browser JavaScript (Async-Only)

For browser JS, use buildSuspending():

val processor = StreamProcessor.builder(pool)
.decompress(CompressionAlgorithm.Gzip)
.buildSuspending() // Returns SuspendingStreamProcessor

processor.append(chunk) // suspend function
processor.finish()
val data = processor.readBuffer(processor.available())
processor.release()

Memory Management

Choosing a Buffer Factory

Compressor and decompressor output buffers are allocated through a BufferFactory. Pass bufferFactory to control the allocation strategy — every create function accepts it, including the suspending variants:

// Direct/native memory (default) - best for I/O
SuspendingStreamingCompressor.create(
algorithm = CompressionAlgorithm.Gzip,
bufferFactory = BufferFactory.Default,
)

// Heap memory - faster allocation, no native cleanup
SuspendingStreamingCompressor.create(
algorithm = CompressionAlgorithm.Gzip,
bufferFactory = BufferFactory.managed(),
)

Output Buffer Size

The synchronous StreamingCompressor / StreamingDecompressor let you tune the output chunk size (the suspending variants do not expose this):

// Smaller chunks - lower latency, more overhead
StreamingCompressor.create(
algorithm = CompressionAlgorithm.Gzip,
outputBufferSize = 4096,
)

// Larger chunks - higher throughput, more memory
StreamingCompressor.create(
algorithm = CompressionAlgorithm.Gzip,
outputBufferSize = 65536,
)

Window Size (WindowBits)

StreamingCompressor.create accepts a windowBits argument that sets the log2 size of zlib's LZ77 sliding window. A smaller window uses less memory at some cost to the compression ratio.

// Algorithm default - 15-bit / 32 KB window
StreamingCompressor.create(algorithm = CompressionAlgorithm.Raw)

// Smaller 512-byte window (9 bits) - lower memory
StreamingCompressor.create(
algorithm = CompressionAlgorithm.Raw,
windowBits = WindowBits(9),
)

WindowBits is range-checked: only WindowBits.Default or WindowBits(9..15) are representable (WindowBits.Min is 9, WindowBits.Max is 15). zlib rejects a window size of 8, so it is unrepresentable by construction.

Platform support

Only Linux native currently honors a custom windowBits. java.util.zip.Deflater (JVM/Android) has no window-size knob, and Apple/JS/Wasm don't yet wire it through — on those platforms the argument is silently ignored and the algorithm default is used. Round-trips still succeed everywhere because the decompressor also falls back to the default. Check supportsCustomWindowBits before relying on a specific window size:

if (supportsCustomWindowBits) {
StreamingCompressor.create(
algorithm = CompressionAlgorithm.Raw,
windowBits = WindowBits(12),
)
}

Best Practices

  1. Start with the async API - Works everywhere, including browser JS
  2. Use use extensions - Automatically calls finish() and close()
  3. Use Gzip for HTTP - Standard compression for web
  4. Check supportsSyncCompression - Before using sync APIs
  5. Process in chunks - For large files, stream data through
  6. Use StreamProcessor for protocols - Combines decompression with parsing

Unsupported Platforms

WasmJS doesn't currently support compression because it would require bundling a WASM-compiled zlib library.

Contributions to add support are welcome!