Skip to main content

Recipe: Building a Protocol Client

This guide walks through building a real protocol client using the full DitchOoM stack. Each layer adds a capability — start with the simplest approach and add complexity only when needed.

The Stack

┌──────────────────────────────────┐
│ Your Protocol (MQTT, WS, ...) │
├──────────────────────────────────┤
│ buffer-compression (optional) │ compress()/decompress() on ReadBuffer
├──────────────────────────────────┤
│ buffer-flow │ mapBuffer(), asStringFlow(), lines()
├──────────────────────────────────┤
│ SocketConnection │ socket + pool + stream processor
│ ├─ BufferPool ← reuse buffers, no GC pressure
│ ├─ StreamProcessor ← accumulate partial reads for framing
│ └─ ClientSocket ← platform-native I/O
├──────────────────────────────────┤
│ ReadBuffer / WriteBuffer │ same types everywhere
└──────────────────────────────────┘

Dependencies

// build.gradle.kts
dependencies {
implementation("com.ditchoom:socket:<version>")
implementation("com.ditchoom:buffer:<version>")
// Optional: streaming transforms
implementation("com.ditchoom:buffer-flow:<version>")
// Optional: compression
implementation("com.ditchoom:buffer-compression:<version>")
}

Layer 1: Request/Response

The simplest use — connect, write, read, close. Lambda scope handles cleanup:

val response = ClientSocket.connect(
port = 443,
hostname = "api.example.com",
socketOptions = SocketOptions.tlsDefault(),
) { socket ->
socket.writeString("GET /health HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n")
socket.readString()
}
// Socket closed automatically. Same code on JVM, iOS, Linux, Node.js.

What the library handles for you:

  • TLS handshake (SSLEngine on JVM, Network.framework on Apple, OpenSSL on Linux)
  • SNI (Server Name Indication) for virtual hosting
  • Certificate validation against system trust stores
  • Coroutine-friendly suspend I/O (no callbacks, no thread pools)
  • Resource cleanup on lambda return

Layer 2: Persistent Streaming

For connections that stay open — event streams, pub/sub, log tailing — use readFlowLines():

ClientSocket.connect(8883, hostname = "broker.example.com", socketOptions = SocketOptions.tlsDefault()) { socket ->
socket.writeString("SUBSCRIBE events\n")
socket.readFlowLines().collect { line ->
println(line)
}
}

readFlowLines() handles \n and \r\n line endings, splits lines correctly across TCP chunk boundaries, and emits one line at a time in constant memory.

Layer 3: Return a Flow

Wrap the connection in a cold Flow. The socket opens when collection starts and closes when done:

fun streamEvents(host: String): Flow<String> = flow {
ClientSocket.connect(8883, hostname = host, socketOptions = SocketOptions.tlsDefault()) { socket ->
socket.writeString("SUBSCRIBE events\n")
emitAll(socket.readFlowLines())
}
}

Now you get the full power of Flow composition:

// Filter, limit, compose
streamEvents("broker.example.com")
.filter { "critical" in it }
.take(100) // auto-closes socket after 100 lines
.collect { alert(it) }

// Launch in a scope
streamEvents("broker.example.com")
.onEach { process(it) }
.launchIn(scope)

// Combine multiple streams
merge(
streamEvents("broker-1.example.com"),
streamEvents("broker-2.example.com"),
).collect { event -> handle(event) }

What the streaming API gives you:

ConcernHow
BackpressureSlow collector suspends read() — no unbounded buffering
Cancellation.take(N), scope cancel, or exception closes the socket
Constant memoryLines emitted one at a time, never accumulated
Compositionfilter, map, take, zip, combine — all Flow operators
Lazy connectionCold Flow — socket only opens when collection starts
CleanupLambda-scoped connect closes socket on any exit path

Layer 4: Bidirectional Streaming

Read and write simultaneously using coroutines:

ClientSocket.connect(port, hostname = host, socketOptions = SocketOptions.tlsDefault()) { socket ->
coroutineScope {
// Writer: send commands
launch {
for (command in commandChannel) {
socket.writeString("$command\n")
}
}

// Reader: process responses
socket.readFlowLines().collect { line ->
handleResponse(line)
}
}
}

Layer 5: Full Stack — Buffer Pool + Stream Processor + Compression

For protocols that exchange many messages (MQTT, WebSocket, custom binary protocols), SocketConnection bundles a socket with a reusable BufferPool and StreamProcessor:

SocketConnection.connect(
hostname = "telemetry.example.com",
port = 8883,
options = ConnectionOptions(
socketOptions = SocketOptions.tlsDefault(),
maxPoolSize = 64,
),
) { conn ->
// Write compressed frames with pooled buffers
sensorReadings.collect { reading ->
conn.withBuffer { buf ->
val payload = reading.toJson().toReadBuffer()
val compressed = compress(payload, CompressionAlgorithm.Gzip).getOrThrow()
buf.writeInt(compressed.remaining())
buf.write(compressed)
buf.resetForRead()
conn.write(buf)
}
}
}

Read with decompression using buffer-flow:

socket.readFlow()
.mapBuffer { decompress(it, Gzip).getOrThrow() }
.asStringFlow()
.lines()
.collect { line -> process(line) }

Threading Mode

If your protocol reads and writes from different coroutine contexts (e.g., a read loop on Dispatchers.Default while writes come from the caller's context), use ThreadingMode.MultiThreaded:

SocketConnection.connect(
hostname = "broker.example.com",
port = 8883,
options = ConnectionOptions(
threadingMode = ThreadingMode.MultiThreaded, // concurrent read + write
),
) { conn ->
// Read loop in one coroutine
launch { conn.readIntoStream(timeout) }
// Write from another
conn.write(buffer)
}

The default is SingleThreaded, which avoids synchronization overhead when reads and writes happen sequentially.

What the buffer pool gives you:

  • Buffers are allocated once and reused — no GC pressure on JVM, no malloc churn on native
  • withBuffer automatically returns the buffer to the pool
  • StreamProcessor accumulates partial reads for protocols with framed messages
  • maxPoolSize caps memory usage
  • readIntoStream() uses zero-copy pooled reads: acquires a buffer from the pool, reads directly into it from the socket, and transfers ownership to the stream processor

Layer 6: Large Data with Constant Memory

Process millions of records with backpressure. A slow collector suspends read() — no unbounded buffering:

ClientSocket.connect(port, hostname = host) { socket ->
socket.readFlowLines()
.take(1_000_000) // stop after N records, socket auto-closes
.collect { line ->
db.insert(parseLine(line))
}
}

Server Side: Accept + Echo

The same APIs work for servers:

val server = ServerSocket.allocate()
val clients = server.bind(port = 9000)

clients.collect { client ->
launch {
client.readFlowLines().collect { line ->
client.writeString("Echo: $line\n")
}
client.close()
}
}

What You Get for Free

The key value of this stack is what you don't have to write:

ConcernWithout this libraryWith socket + buffer
Platform I/OSeparate implementations for NIO, NWConnection, io_uring, net.SocketOne ClientSocket.connect() call
TLSConfigure SSLEngine, SecureTransport, OpenSSL, and Node tls module separatelySocketOptions.tlsDefault()
Buffer managementPlatform-specific ByteBuffer / NSData / Uint8ArrayReadBuffer / WriteBuffer everywhere
MemoryManual pool management or GC pressureBufferPool with withBuffer
Stream parsingRoll your own accumulator for partial readsStreamProcessor built in
CompressionPlatform-specific zlib bindingscompress() / decompress()
Line splittingManual StringBuilder accumulatorreadFlowLines()
Streaming transformsManual resetForRead + map boilerplatemapBuffer(), asStringFlow()
BackpressureManual flow controlBuilt into Flow
CoroutinesWrap callbacks in suspendCancellableCoroutineNative suspend functions
Resource cleanupTry-finally everywhereLambda-scoped connections, SuspendCloseable

Platform-Native Performance

Each platform uses its fastest available I/O primitive — no abstraction penalty:

  • Linux: io_uring for kernel-level async I/O with zero-copy buffer submission. OpenSSL 3.0 statically linked for glibc compatibility.
  • Apple: NWConnection via Network.framework — the same API that Safari and system services use. Zero-copy NSData buffer integration.
  • JVM/Android: NIO2 AsynchronousSocketChannel with NIO SocketChannel fallback. Direct ByteBuffer allocation for zero-copy transfers.
  • Node.js: Native net.Socket and tls module with proper backpressure handling.