Skip to content

feat: implement WebSocket streaming protocol for large payloads #2256#58

Merged
iduartgomez merged 5 commits intomainfrom
feat/ws-message-streaming
Mar 8, 2026
Merged

feat: implement WebSocket streaming protocol for large payloads #2256#58
iduartgomez merged 5 commits intomainfrom
feat/ws-message-streaming

Conversation

@netsirius
Copy link
Contributor

@netsirius netsirius commented Feb 28, 2026

Problem

WebSocket frame size limits cause silent failures or disconnections when transmitting large contract states (up to 50 MiB) between the Freenet node and clients. The entire payload had to fit in a single WebSocket frame, requiring oversized max_message_size limits and causing excessive memory pressure.

Solution

Add an application-level chunking and streaming protocol to freenet-stdlib, used by both the native (tokio-tungstenite) and browser (WASM) WebSocket clients. The protocol operates in two layers:

Layer 1 — Transparent chunking (all message types)

Any serialized payload exceeding CHUNK_THRESHOLD (512 KiB) is automatically split into 256 KiB StreamChunk frames. The receiver's ReassemblyBuffer reassembles them transparently before deserialization. This applies to all HostResponse and ClientRequest variants — no type-specific logic needed.

  • chunk_request(data, stream_id)Vec<ClientRequest::StreamChunk>
  • chunk_response(data, stream_id)Vec<HostResponse::StreamChunk>
  • ReassemblyBuffer::receive_chunk()Option<Vec<u8>> when complete
  • Zero-copy fragmentation via Bytes::slice() — chunks share the original allocation

Layer 2 — Incremental streaming with StreamHeader (opt-in, Native encoding only)

For responses where the client benefits from knowing the payload size and content type upfront (e.g., large GetResponse with WASM contracts), the server can send a StreamHeader before the chunks:

HostResponse::StreamHeader {
    stream_id: u32,
    total_bytes: u64,
    content: StreamContent,
}

This enables two client consumption modes via WsStreamHandle:

  • handle.into_stream() — async Stream<Item = Bytes> for chunk-by-chunk processing (progress bars, incremental writes)
  • handle.assemble() — wait for all chunks and return the complete payload

StreamContent currently has two variants:

  • GetResponse { key, includes_contract } — the primary use case
  • Raw — reserved for future extension

When recv() encounters a StreamHeader, it automatically calls assemble() and returns the complete HostResponse — callers that don't need incremental consumption see no difference. Only callers that explicitly use recv_stream() get the incremental API.

Client integration

Native client (regular.rs):

  • process_request() chunks outbound requests over the threshold
  • handle_response_payload() routes StreamHeaderWsStreamHandle channel, StreamChunk with known sender → incremental delivery, StreamChunk without header → transparent ReassemblyBuffer reassembly
  • recv() auto-assembles streamed responses; recv_stream() returns the handle for incremental consumption

Browser client (browser.rs):

  • ReassemblyBuffer integrated into the onmessage callback
  • StreamChunk responses are reassembled before dispatching to the result handler
  • Outbound requests are chunked when over threshold
  • No StreamHeader support (Flatbuffers encoding only; header is bincode-specific)

Protocol constants

Constant Value Purpose
CHUNK_SIZE 256 KiB Payload size per chunk
CHUNK_THRESHOLD 512 KiB Minimum size to trigger chunking
MAX_TOTAL_CHUNKS 256 Cap per stream (~64 MiB, covers MAX_STATE_SIZE + overhead)
MAX_CONCURRENT_STREAMS 8 Concurrent streams per ReassemblyBuffer

Wire format additions

New variants added to the Flatbuffers schemas and Rust enums:

ClientRequest::StreamChunk{ stream_id, index, total, data }
HostResponse::StreamChunk{ stream_id, index, total, data }
HostResponse::StreamHeader{ stream_id, total_bytes, content } (bincode only; returns error over Flatbuffers)

Breaking change justification (0.1.40 → 0.2.0)

This is a breaking change because:

  • New variants added to ClientRequest and HostResponse (both #[non_exhaustive], but the wire protocol changes)
  • WebApi::recv() behavior changes (now handles StreamHeader + StreamChunk internally via select!)
  • WebApi struct has a new stream_rx field
  • New bytes dependency added

Review feedback addressed

All blocking and important issues from the code review have been resolved:

Blocking fixes

  1. FlatBuffers union ordering (was: silent data corruption)StreamChunk moved to the end of both HostResponseType and ClientRequestType unions, preserving existing variant indices. Old browser clients will correctly handle Ok/Error responses without misinterpreting them.

  2. TTL on ReassemblyBuffer entries (was: permanent slot exhaustion) — Added STREAM_TTL (60s) with created_at timestamps on each StreamState. Stale entries are evicted via evict_stale() before the concurrent stream limit is checked. WASM targets use cfg to skip the timestamp (no Instant available).

  3. Deadlock in recv() with 5+ concurrent streams (was: channel backpressure deadlock)stream_tx capacity increased to MAX_CONCURRENT_STREAMS. The request handler now evicts the oldest sender when at capacity and cleans up the corresponding reassembly entry via reassembly.remove_stream(id), preventing both the deadlock and orphan reassembly slots.

Important fixes

  1. assemble() overflow protection (was: unbounded allocation) — Two guards added: (a) reject total_bytes exceeding MAX_TOTAL_CHUNKS * CHUNK_SIZE before allocating, and (b) check buf.len() + chunk.len() > total_bytes inside the receive loop, returning StreamError::Overflow on violation.

  2. Evicted stream_senders orphan cleanup (was: permanent reassembly slot leak) — When a sender is evicted from stream_senders, reassembly.remove_stream(id) is called in the same operation, preventing orphan partial streams from permanently consuming reassembly slots.

  3. MAX_CHUNKS_PER_BATCH removed (was: dead constant) — The unused constant has been removed entirely. Flow control is handled by MAX_TOTAL_CHUNKS at the protocol level.

Testing

Unit tests (streaming.rs)

  • chunk_request_small / chunk_request_large_roundtrip / chunk_response_roundtrip — roundtrip chunking + reassembly
  • chunk_empty — empty payload handling
  • chunk_exact_boundary — payloads at exact chunk size
  • reassembly_reset_across_messages — buffer cleanup between streams
  • reassembly_error_truncated_header / error_unknown_type — malformed input rejection
  • reassembly_error_zero_total / reassembly_error_too_large — bounds validation
  • reassembly_error_mismatch / reassembly_error_duplicate — consistency checks
  • concurrent_streams — multiple interleaved streams
  • max_concurrent_streams_limit — stream count cap enforcement

Integration tests (regular.rs)

  • test_recv_chunked — server sends chunks without header, client recv() reassembles transparently
  • test_recv_stream_header — server sends header + chunks, client uses recv_stream() + assemble()
  • test_recv_transparent_stream_header — server sends header + chunks, client uses recv() which auto-assembles

Files changed

File Change
rust/Cargo.toml Version bump to 0.2.0, add bytes dependency
rust/src/client_api.rs Expose streaming module under net feature
rust/src/client_api/streaming.rs New — Core streaming protocol: chunking, reassembly, WsStreamHandle, WsStreamSender, TTL eviction
rust/src/client_api/client_events.rs Add StreamChunk, StreamHeader, StreamContent to enums + FBS serialization
rust/src/client_api/regular.rs Integrate streaming into native client: chunked send, header/chunk receive routing, recv_stream() API, sender eviction with orphan cleanup
rust/src/client_api/browser.rs Integrate ReassemblyBuffer into WASM client, chunk outbound requests
schemas/flatbuffers/client_request.fbs Add StreamChunk table and union variant (appended at end)
schemas/flatbuffers/host_response.fbs Add StreamChunk table and union variant (appended at end)
rust/src/generated/client_request_generated.rs Regenerated from FBS schema
rust/src/generated/host_response_generated.rs Regenerated from FBS schema

Downstream dependency

freenet-core#3346 depends on this PR. That PR integrates the server-side streaming into the node's WebSocket handler. This PR must be published to crates.io before the core PR can pass CI.

Fixes

Closes #2256

@netsirius
Copy link
Contributor Author

@claude review this pr

@claude
Copy link

claude bot commented Feb 28, 2026

Claude encountered an error —— View job


I'll analyze this and get back to you.

@iduartgomez
Copy link
Contributor

Can we make this change backwards compatible instead?

Copy link
Contributor

@iduartgomez iduartgomez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

The code quality is high and the two-layer design (transparent reassembly + optional incremental streaming) is sound. With ?streaming=true gating in the companion PR (freenet-core#3346), old applications using the non-streaming protocol are protected from seeing unknown bincode variants. Most backward compatibility concerns are addressed by that opt-in mechanism.

However, there are a few issues that need attention before merging.


Must Fix (Blocking)

1. FlatBuffers HostResponseType union ordering breaks old browser clients

In schemas/flatbuffers/host_response.fbs, StreamChunk is inserted between GenerateRandData and Ok in the union:

union HostResponseType {
  ContractResponse,
  DelegateResponse,
  GenerateRandData,
  StreamChunk,      // <-- inserted here, shifts Ok and Error
  Ok,
  Error,
}

This shifts Ok from index 4→5 and Error from 5→6. FlatBuffers forward-compatibility only works for appended variants. Old browser clients (using FBS encoding) will misinterpret every Ok and Error response from the server, regardless of whether streaming is enabled. This is a silent data corruption bug, not a deserialization error.

Fix: Move StreamChunk to the end of the union (after Error). The client_request.fbs union is fine — StreamChunk is already appended after Authenticate.

2. No TTL/timeout on ReassemblyBuffer entries

If a stream starts (first chunk received) but never completes (sender crashes, network drops remaining chunks), the partial StreamState entry stays in the HashMap forever. After MAX_CONCURRENT_STREAMS (8) abandoned streams, all new streams are permanently rejected.

Per the project's AGENTS.md: "Cleanup exemptions MUST be time-bounded. Any condition that exempts an entry from garbage collection MUST either expire via TTL or be overridden by an absolute age threshold."

Fix: Add a creation timestamp to StreamState and evict entries older than a reasonable timeout (e.g., 30-60 seconds) when new chunks arrive or when the concurrent stream limit is hit.

3. Deadlock in recv() with 5+ concurrent streamed responses

stream_tx has capacity 4. The recv() method calls handle.assemble().await inline, blocking until all chunks arrive. During assembly, recv() is not draining stream_rx. If the server sends 5+ StreamHeaders rapidly:

  1. Handler sends handles 1-4 to stream_tx (fills capacity)
  2. Client's recv() picks up handle 1, starts assemble().await
  3. Handler tries to send handle 5 to stream_txblocks
  4. Handler is now stuck and can't forward chunks from the WebSocket
  5. Client's assemble() waits for chunks that will never arrive
  6. Deadlock

Fix options:

  • Increase stream_tx capacity significantly, or
  • Make recv() non-blocking: spawn assembly in a background task and return a future/handle, or
  • Buffer stream handles in request_handler instead of blocking on stream_tx.send().await

Should Fix (Important)

4. assemble() has no upper bound check on received bytes

The truncation check (buf.len() < total_bytes) catches under-delivery, but there is no check for over-delivery. A malicious server claiming total_bytes=100 could stream unlimited data into the unbounded channel. The buf.extend_from_slice loop has no cap.

Fix: Add if buf.len() as u64 > self.total_bytes { return Err(...) } inside the receive loop.

5. Evicted stream_senders entries cause orphan reassembly entries

When stream_senders evicts an entry at capacity, subsequent chunks for that evicted stream fall through to the ReassemblyBuffer (since the sender is no longer in the map), creating an orphan partial stream that will never complete — permanently consuming a reassembly slot. This compounds issue #2.

6. MAX_CHUNKS_PER_BATCH is defined but never used

The constant is declared and documented in the PR description as enabling "cooperative yielding," but no yielding logic exists in the code. Either implement the yield logic or remove the dead constant.


Consider (Suggestions)

  • recv() / recv_stream() interaction: Both consume from stream_rx. If a caller alternates between them, behavior is unpredictable. Worth documenting the mutual exclusivity.
  • Browser stream_id = 0 hardcoded: All browser chunked sends use the same stream ID. If concurrent large sends were possible, this would cause DuplicateChunk errors. Mitigated by &mut self, but worth a comment.
  • Test gaps: Out-of-order chunk delivery, exact CHUNK_SIZE boundaries, assemble() truncation error path, and mixed streamed + non-streamed response interleaving are all untested.
  • Test stability: Integration tests use 100ms timeouts and AtomicU16 port allocation — consider port 0 (OS-assigned) and longer timeouts for CI.

What's Done Well

  • Thorough error handling in ReassemblyBuffer with specific error variants for every failure case
  • Zero-copy chunking via Bytes::slice()
  • Clean separation between transparent reassembly (Layer 1) and incremental streaming (Layer 2)
  • Comprehensive unit test suite for the core chunking/reassembly logic
  • Good integration tests covering both chunked-only and header+chunked paths
  • FlatBuffers StreamHeader correctly returns an error rather than silently failing

Copy link
Contributor

@iduartgomez iduartgomez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: All blocking and important issues addressed ✅

Checked every issue from the previous review against the current diff:

Blocking issues — all fixed

  1. FlatBuffers union orderingStreamChunk moved to end of both unions. Existing variant indices preserved. ✅
  2. TTL on ReassemblyBufferSTREAM_TTL (60s) with created_at timestamps, evict_stale() before concurrent limit check. WASM correctly cfg-gated. ✅
  3. Deadlock with 5+ concurrent streamsstream_tx capacity = MAX_CONCURRENT_STREAMS. Full channel falls back to transparent ReassemblyBuffer reassembly instead of blocking. recv() uses tokio::select!, Stream impl uses FuturesUnordered. ✅

Important issues — all fixed

  1. assemble() overflow protection — Rejects total_bytes > MAX_TOTAL_CHUNKS * CHUNK_SIZE upfront, checks buf.len() + chunk.len() > total_bytes + CHUNK_SIZE per chunk. Test validates. ✅
  2. Evicted stream_senders orphan cleanupreassembly.remove_stream(id) called on eviction. Full-channel fallback removes sender so chunks go through ReassemblyBuffer. ✅
  3. MAX_CHUNKS_PER_BATCH dead constant — Removed entirely. ✅

Suggestions — mostly addressed

  • recv()/recv_stream() mutual exclusivity documented with # Important sections ✅
  • Browser stream_id no longer hardcoded to 0 — uses next_stream_id with wrapping_add
  • Test gaps filled: out-of-order chunks, exact boundary, overflow, truncation ✅
  • Test stability: OS-assigned ports via bind_free_port(), timeouts increased to 5s ✅

Backwards compatibility

The change is backwards compatible despite the 0.2.0 bump:

  • FlatBuffers: New variants appended at end of unions — existing indices unchanged
  • Bincode (server→client): StreamChunk/StreamHeader only sent to opt-in streaming clients (gated by ?streaming=true in freenet-core#3346)
  • Bincode (client→server): StreamChunk only sent for payloads >512 KiB, which is virtually never for client requests
  • Rust enums: New variants added after last existing variant — bincode variant indices preserved for all existing types

The 0.2.0 version bump is fine as a signal of the scope of the change, but nothing here breaks existing clients or servers.

@iduartgomez iduartgomez merged commit 7000cba into main Mar 8, 2026
9 checks passed
@iduartgomez iduartgomez deleted the feat/ws-message-streaming branch March 8, 2026 22:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants