Client Socket
The ClientSocket interface is the primary API for connecting to remote servers. It extends Reader, Writer, and SuspendCloseable.
Connecting
Use the ClientSocket.connect() extension function:
val socket = ClientSocket.connect(
port = 80,
hostname = "example.com", // defaults to localhost
timeout = 15.seconds, // connection timeout
socketOptions = SocketOptions(), // TCP and TLS configuration
)
Reading
// Read raw bytes as a ReadBuffer
val buffer = socket.read()
buffer.resetForRead()
// Read as a string (UTF-8 default)
val text = socket.readString()
// Stream reads as a Flow
socket.readFlow().collect { buffer ->
buffer.resetForRead()
// process buffer
}
// Stream string reads
socket.readFlowString().collect { value ->
// process string
}
// Stream complete lines (handles \n and \r\n, splits across chunks)
socket.readFlowLines().collect { line ->
// process line
}
Streaming Patterns
Persistent Streaming
Keep a connection open and process data as it arrives:
ClientSocket.connect(8883, hostname = "broker.example.com", socketOptions = SocketOptions.tlsDefault()) { socket ->
socket.writeString("SUBSCRIBE events\n")
socket.readFlowLines().collect { line ->
println(line)
}
}
Returning a Flow
Wrap a connection in a cold Flow — the socket opens when collection starts and closes when done:
fun streamEvents(host: String): Flow<String> = flow {
ClientSocket.connect(8883, hostname = host, socketOptions = SocketOptions.tlsDefault()) { socket ->
socket.writeString("SUBSCRIBE events\n")
emitAll(socket.readFlowLines())
}
}
// Compose with Flow operators
streamEvents("broker.example.com")
.filter { "critical" in it }
.take(100) // auto-closes socket after 100 lines
.collect { alert(it) }
Streaming with Compression
Compose mapBuffer, asStringFlow, and lines from buffer-flow:
socket.readFlow()
.mapBuffer { decompress(it, Gzip).getOrThrow() }
.asStringFlow()
.lines()
.collect { line -> process(line) }
Large Data with Constant Memory
Process millions of records without accumulating them in memory. Backpressure is built into Flow — a slow collector suspends read():
ClientSocket.connect(port, hostname = host) { socket ->
socket.readFlowLines()
.take(1_000_000) // stop after N records, socket auto-closes
.collect { line ->
db.insert(parseLine(line))
}
}
Writing
// Write a buffer
val bytesWritten = socket.write(buffer)
// Write a string (UTF-8 default)
val bytesWritten = socket.writeString("Hello, server!")
Connection State
val isOpen: Boolean = socket.isOpen()
val localPort: Int = socket.localPort()
val remotePort: Int = socket.remotePort()
Closing
socket.close()
Or use the lambda variant for automatic cleanup:
val result = ClientSocket.connect(port, hostname) { socket ->
socket.writeString("request")
socket.readString()
} // socket closed when lambda returns
SocketConnection (Pool + Stream)
For protocol implementations that need a buffer pool and stream processor, use SocketConnection:
SocketConnection.connect(
hostname = "example.com",
port = 443,
options = ConnectionOptions(
socketOptions = SocketOptions.tlsDefault(),
maxPoolSize = 64,
readTimeout = 10.seconds,
),
) { conn ->
// Use the buffer pool
conn.withBuffer { buffer ->
buffer.writeString("GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n")
buffer.resetForRead()
conn.write(buffer)
}
// Read into the stream processor
conn.readIntoStream()
}
Compression
For compression over socket connections, add the optional buffer-compression module:
dependencies {
implementation("com.ditchoom:buffer-compression:<version>")
}
It supports Gzip and Deflate and works with any ReadBuffer:
import com.ditchoom.buffer.compression.*
// Compress before writing
val payload = "Hello, World!".toReadBuffer()
val compressed = compress(payload, CompressionAlgorithm.Gzip).getOrThrow()
socket.write(compressed)
// Decompress after reading
val received = socket.read()
received.resetForRead()
val decompressed = decompress(received, CompressionAlgorithm.Gzip).getOrThrow()
This works with any socket configuration (plaintext, TLS, pooled connections).
Allocation
For more control, allocate a socket manually and then open it:
val socket = ClientSocket.allocate()
socket.open(port = 80, timeout = 15.seconds, hostname = "example.com")
// ... use socket ...
socket.close()