diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 8e0e06cd4..03675c320 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -30,17 +30,7 @@ import io.modelcontextprotocol.json.McpJsonDefaults; import io.modelcontextprotocol.json.McpJsonMapper; import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.spec.ClosedMcpTransportSession; -import io.modelcontextprotocol.spec.DefaultMcpTransportSession; -import io.modelcontextprotocol.spec.DefaultMcpTransportStream; -import io.modelcontextprotocol.spec.HttpHeaders; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpTransportException; -import io.modelcontextprotocol.spec.McpTransportSession; -import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; -import io.modelcontextprotocol.spec.McpTransportStream; -import io.modelcontextprotocol.spec.ProtocolVersions; +import io.modelcontextprotocol.spec.*; import io.modelcontextprotocol.util.Assert; import io.modelcontextprotocol.util.Utils; import org.reactivestreams.Publisher; @@ -189,14 +179,6 @@ private McpTransportSession createTransportSession() { return new DefaultMcpTransportSession(onClose); } - private McpTransportSession createClosedSession(McpTransportSession existingSession) { - var existingSessionId = Optional.ofNullable(existingSession) - .filter(session -> !(session instanceof ClosedMcpTransportSession)) - .flatMap(McpTransportSession::sessionId) - .orElse(null); - return new ClosedMcpTransportSession<>(existingSessionId); - } - private Publisher createDelete(String sessionId) { var uri = Utils.resolveUri(this.baseUri, this.endpoint); @@ -240,7 +222,8 @@ private void handleException(Throwable t) { public Mono closeGracefully() { return Mono.defer(() -> { logger.debug("Graceful close triggered"); - McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession); + McpTransportSession currentSession = this.activeSession + .getAndSet(ClosedMcpTransportSession.INSTANCE); if (currentSession != null) { return Mono.from(currentSession.closeGracefully()); } @@ -250,6 +233,19 @@ public Mono closeGracefully() { private Mono reconnect(McpTransportStream stream) { return Mono.deferContextual(ctx -> { + var rh = this.handler.get(); + if (rh == null) { + logger.warn("Transport has no request handler registered. Remember to call connect!"); + } + + final Function, Mono> requestHandler = rh != null + ? rh : msg -> Mono.error(new IllegalStateException("No request handler")); + + final McpTransportSession transportSession = this.activeSession.get(); + + if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) { + throw new McpTransportSessionClosedException(); + } if (stream != null) { logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId()); @@ -259,7 +255,7 @@ private Mono reconnect(McpTransportStream stream) { } final AtomicReference disposableRef = new AtomicReference<>(); - final McpTransportSession transportSession = this.activeSession.get(); + var uri = Utils.resolveUri(this.baseUri, this.endpoint); Disposable connection = Mono.deferContextual(connectionCtx -> { @@ -389,18 +385,18 @@ else if (statusCode == BAD_REQUEST) { "Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event())); }) .retryWhen(authorizationErrorRetrySpec()) - .flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage))) + .flatMap(jsonrpcMessage -> requestHandler.apply(Mono.just(jsonrpcMessage))) .onErrorMap(CompletionException.class, t -> t.getCause()) - .onErrorComplete(t -> { - this.handleException(t); - return true; - }) .doFinally(s -> { Disposable ref = disposableRef.getAndSet(null); if (ref != null) { transportSession.removeConnection(ref); } })) + .onErrorComplete(t -> { + this.handleException(t); + return true; + }) .contextWrite(ctx) .subscribe(); @@ -467,10 +463,23 @@ public String toString(McpSchema.JSONRPCMessage message) { public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { return Mono.create(deliveredSink -> { + var rh = this.handler.get(); + if (rh == null) { + logger.warn("Transport has no request handler registered. Remember to call connect!"); + } + + final Function, Mono> requestHandler = rh != null + ? rh : msg -> Mono.error(new IllegalStateException("No request handler")); + + var transportSession = this.activeSession.get(); + + if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) { + throw new McpTransportSessionClosedException(); + } + logger.debug("Sending message {}", sentMessage); final AtomicReference disposableRef = new AtomicReference<>(); - final McpTransportSession transportSession = this.activeSession.get(); var uri = Utils.resolveUri(this.baseUri, this.endpoint); String jsonBody = this.toString(sentMessage); @@ -643,22 +652,26 @@ else if (statusCode == BAD_REQUEST) { new RuntimeException("Failed to send message: " + responseEvent)); }) .retryWhen(authorizationErrorRetrySpec()) - .flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))) + .flatMap(jsonRpcMessage -> requestHandler.apply(Mono.just(jsonRpcMessage))) .onErrorMap(CompletionException.class, t -> t.getCause()) - .onErrorComplete(t -> { - // handle the error first - this.handleException(t); - // inform the caller of sendMessage - deliveredSink.error(t); - return true; - }) .doFinally(s -> { logger.debug("SendMessage finally: {}", s); Disposable ref = disposableRef.getAndSet(null); if (ref != null) { transportSession.removeConnection(ref); } - })).contextWrite(deliveredSink.contextView()).subscribe(); + })).onErrorComplete(t -> { + // handle the error first + try { + this.handleException(t); + } + catch (Exception e) { + logger.error("Error handling exception {}", t.getMessage(), e); + } + // inform the caller of sendMessage + deliveredSink.error(t); + return true; + }).contextWrite(deliveredSink.contextView()).subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/ClosedMcpTransportSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/ClosedMcpTransportSession.java index b18364abb..6ed01dee3 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/ClosedMcpTransportSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/ClosedMcpTransportSession.java @@ -6,43 +6,41 @@ import java.util.Optional; import org.reactivestreams.Publisher; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.util.annotation.Nullable; /** - * Represents a closed MCP session, which may not be reused. All calls will throw a - * {@link McpTransportSessionClosedException}. + * Represents a closed MCP session, which may not be reused. * - * @param the resource representing the connection that the transport - * manages. * @author Daniel Garnier-Moiroux + * @author Dariusz Jędrzejczyk */ -public class ClosedMcpTransportSession implements McpTransportSession { +public final class ClosedMcpTransportSession implements McpTransportSession { - private final String sessionId; + public static final ClosedMcpTransportSession INSTANCE = new ClosedMcpTransportSession(); - public ClosedMcpTransportSession(@Nullable String sessionId) { - this.sessionId = sessionId; + private ClosedMcpTransportSession() { } @Override public Optional sessionId() { - throw new McpTransportSessionClosedException(sessionId); + return Optional.empty(); } @Override public boolean markInitialized(String sessionId) { - throw new McpTransportSessionClosedException(sessionId); + throw new IllegalStateException("MCP Session is already closed"); } @Override - public void addConnection(CONNECTION connection) { - throw new McpTransportSessionClosedException(sessionId); + public void addConnection(Disposable connection) { + throw new IllegalStateException("MCP Session is already closed"); } @Override - public void removeConnection(CONNECTION connection) { - throw new McpTransportSessionClosedException(sessionId); + public void removeConnection(Disposable connection) { + throw new IllegalStateException("MCP Session is already closed"); } @Override diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java index d6f6c0083..3d7154278 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java @@ -119,7 +119,8 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport, this.requestHandlers.putAll(requestHandlers); this.notificationHandlers.putAll(notificationHandlers); - this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(); + this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(ignored -> { + }, error -> logger.warn("Client failed during connect", error)); } private void dismissPendingResponses() { @@ -160,7 +161,15 @@ else if (message instanceof McpSchema.JSONRPCRequest request) { var errorResponse = McpSchema.JSONRPCResponse.error(request.id(), jsonRpcError); return Mono.just(errorResponse); }).flatMap(this.transport::sendMessage).onErrorComplete(t -> { - logger.warn("Issue sending response to the client, ", t); + if (t instanceof McpTransportSessionClosedException) { + logger.debug("Can't send response to request {} when the transport is closed", request.id()); + } + else if (McpTransport.isPeerClosed(t)) { + logger.debug("Can't send response to request {}: connection closed by peer", request.id(), t); + } + else { + logger.warn("Failed to send response to the server", t); + } return true; }).subscribe(); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransport.java index 0a732bab6..658dc6af5 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransport.java @@ -8,6 +8,8 @@ import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage; import io.modelcontextprotocol.json.TypeRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** @@ -39,6 +41,8 @@ */ public interface McpTransport { + Logger logger = LoggerFactory.getLogger(McpTransport.class); + /** * Closes the transport connection and releases any associated resources. * @@ -48,7 +52,24 @@ public interface McpTransport { *

*/ default void close() { - this.closeGracefully().subscribe(); + this.closeGracefully().subscribe(ignored -> { + }, error -> { + if (isPeerClosed(error)) { + logger.debug("Error during asynchronous close", error); + } + else { + logger.warn("Error during asynchronous close", error); + } + }); + } + + static boolean isPeerClosed(Throwable t) { + for (Throwable c = t; c != null; c = c.getCause()) { + if (c instanceof java.io.EOFException) { + return true; + } + } + return false; } /** diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionClosedException.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionClosedException.java index 60e2850b9..9e9e4616b 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionClosedException.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionClosedException.java @@ -13,8 +13,14 @@ * @see ClosedMcpTransportSession * @author Daniel Garnier-Moiroux */ + public class McpTransportSessionClosedException extends RuntimeException { + public McpTransportSessionClosedException() { + super("Transport has already been closed."); + } + + @Deprecated(forRemoval = true) public McpTransportSessionClosedException(@Nullable String sessionId) { super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId) : "MCP session has been closed"); diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index 09c32ecbf..678cd9a0e 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -463,7 +463,7 @@ void testRootsListChanged() { void testInitializeWithRootsListProviders() { withClient(createMcpTransport(), builder -> builder.roots(Root.builder("file:///test/path").name("test-root").build()), client -> { - StepVerifier.create(client.initialize().then(client.closeGracefully())).verifyComplete(); + StepVerifier.create(client.initialize()).expectNextCount(1).verifyComplete(); }); } @@ -725,8 +725,6 @@ void testLoggingConsumer() { builder -> builder.loggingConsumer(notification -> Mono.fromRunnable(() -> logReceived.set(true))), client -> { StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete(); - StepVerifier.create(client.closeGracefully()).verifyComplete(); - }); } diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java index c3e85814c..4e35301bf 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java @@ -8,11 +8,14 @@ import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpTransportSessionClosedException; import io.modelcontextprotocol.spec.ProtocolVersions; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Function; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -139,13 +142,14 @@ void testCloseUninitialized() { var testMessage = new McpSchema.JSONRPCRequest(McpSchema.METHOD_INITIALIZE, "test-id", initializeRequest); StepVerifier.create(transport.sendMessage(testMessage)) - .expectErrorMessage("MCP session has been closed") + .expectErrorMessage("Transport has already been closed.") .verify(); } @Test void testCloseInitialized() { var transport = HttpClientStreamableHttpTransport.builder(host).build(); + // transport.connect(Function.identity()).block(); var initializeRequest = McpSchema.InitializeRequest .builder(ProtocolVersions.MCP_2025_11_25, McpSchema.ClientCapabilities.builder().roots(true).build(), @@ -157,7 +161,8 @@ void testCloseInitialized() { StepVerifier.create(transport.closeGracefully()).verifyComplete(); StepVerifier.create(transport.sendMessage(testMessage)) - .expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed")) + .expectErrorMatches(err -> err instanceof McpTransportSessionClosedException + && err.getMessage().contains("Transport has already been closed")) .verify(); } diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/server/transport/ServerTransportSecurityIntegrationTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/server/transport/ServerTransportSecurityIntegrationTests.java index 10bb30568..c1dcc7c19 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/server/transport/ServerTransportSecurityIntegrationTests.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/server/transport/ServerTransportSecurityIntegrationTests.java @@ -81,6 +81,7 @@ void setUp() { @AfterEach void tearDown() { + requestCustomizer.reset(); mcpClient.close(); }