Skip to content

Conversation

@ok2c
Copy link
Member

@ok2c ok2c commented Feb 9, 2026

I have added four convenience classes: AsyncClientPipeline, AsyncServerPipeline, AsyncJsonClientPipeline, andAsyncJsonServerPipeline that help put together message processing pipelines using a very simple, fluent DSL.

The pipeline assembly DSL makes quite a big difference for JSON processing especially.

I am open to ideas of better naming or other improvements. If I hear no objections I will commit the changes in a couple of days. There is still plenty of time to polish or even revert.

AsyncJsonClientPipeline.assemble(objectMapper)
        .request()
        .post(uri)
        .asObject(new BasicNameValuePair("name", "value"))
        .response()
        .asObject(RequestData.class)
        .result(new FutureCallback<Message<HttpResponse, RequestData>>() {

            @Override
            public void completed(final Message<HttpResponse, RequestData> m) {
                System.out.println("Response status: " + m.head().getCode());
                System.out.println(m.body());
                latch.countDown();
            }

            @Override
            public void failed(final Exception ex) {
                ex.printStackTrace(System.out);
                latch.countDown();
            }

            @Override
            public void cancelled() {
                latch.countDown();
            }

        })
        .create(),
AsyncJsonServerPipeline.assemble(objectMapper)
        // Read GET / HEAD requests by consuming content stream as JSON nodes
        .request(Method.GET, Method.HEAD, Method.POST, Method.PUT, Method.PATCH)
        .asJsonNode()
        // Write out responses by streaming out content of JSON object
        .response()
        .asObject(RequestData.class)
        // Map exceptions to a response message
        .errorMessage(Throwable::getMessage)
        // Generate a response to a request
        .handle((m, context) -> {
            final HttpRequest request = m.head();
            final RequestData rd = new RequestData();
            try {
                rd.setUrl(request.getUri());
            } catch (final URISyntaxException ex) {
                throw new ProtocolException("Invalid request URI");
            }
            rd.generateHeaders(request.getHeaders());
            rd.setJson(m.body());
            rd.setData(Objects.toString(m.error()));

            final HttpCoreContext coreContext = HttpCoreContext.cast(context);
            final EndpointDetails endpointDetails = coreContext.getEndpointDetails();

            final InetSocketAddress remoteAddress = (InetSocketAddress) endpointDetails.getRemoteAddress();
            rd.setOrigin(Objects.toString(remoteAddress.getAddress()));

            return Message.of(new BasicHttpResponse(HttpStatus.SC_OK), rd);
        })
        .supplier();

I am aware of some code duplication. The choice was between a very ugly coupling between classes with lots unnecessary details spilling into the public API and loose coupling with some code duplication. I opted for the latter.

@ok2c ok2c force-pushed the async_pipeline_assembly branch from fe4c7aa to e182b6f Compare February 9, 2026 15:13
@ok2c ok2c force-pushed the async_pipeline_assembly branch from e182b6f to 06a75b3 Compare February 9, 2026 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant