Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@
import io.modelcontextprotocol.json.McpJsonDefaults;
import io.modelcontextprotocol.json.McpJsonMapper;
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportException;
import io.modelcontextprotocol.spec.McpTransportSession;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.McpTransportStream;
import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.spec.*;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - we typically don't do star imports.

import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -189,14 +179,6 @@ private McpTransportSession<Disposable> createTransportSession() {
return new DefaultMcpTransportSession(onClose);
}

private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
var existingSessionId = Optional.ofNullable(existingSession)
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
.flatMap(McpTransportSession::sessionId)
.orElse(null);
return new ClosedMcpTransportSession<>(existingSessionId);
}

private Publisher<Void> createDelete(String sessionId) {

var uri = Utils.resolveUri(this.baseUri, this.endpoint);
Expand Down Expand Up @@ -240,7 +222,8 @@ private void handleException(Throwable t) {
public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
logger.debug("Graceful close triggered");
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
McpTransportSession<Disposable> currentSession = this.activeSession
.getAndSet(ClosedMcpTransportSession.INSTANCE);
if (currentSession != null) {
return Mono.from(currentSession.closeGracefully());
}
Expand All @@ -250,6 +233,19 @@ public Mono<Void> closeGracefully() {

private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
return Mono.deferContextual(ctx -> {
var rh = this.handler.get();
if (rh == null) {
logger.warn("Transport has no request handler registered. Remember to call connect!");
}

final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));

final McpTransportSession<Disposable> transportSession = this.activeSession.get();

if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
throw new McpTransportSessionClosedException();
}

if (stream != null) {
logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
Expand All @@ -259,7 +255,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
}

final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
final McpTransportSession<Disposable> transportSession = this.activeSession.get();

var uri = Utils.resolveUri(this.baseUri, this.endpoint);

Disposable connection = Mono.deferContextual(connectionCtx -> {
Expand Down Expand Up @@ -389,18 +385,18 @@ else if (statusCode == BAD_REQUEST) {
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
})
.retryWhen(authorizationErrorRetrySpec())
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
.flatMap(jsonrpcMessage -> requestHandler.apply(Mono.just(jsonrpcMessage)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this leak?

.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
this.handleException(t);
return true;
})
.doFinally(s -> {
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
}
}))
.onErrorComplete(t -> {
this.handleException(t);
return true;
})
.contextWrite(ctx)
.subscribe();

Expand Down Expand Up @@ -467,10 +463,23 @@ public String toString(McpSchema.JSONRPCMessage message) {

public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
return Mono.create(deliveredSink -> {
var rh = this.handler.get();
if (rh == null) {
logger.warn("Transport has no request handler registered. Remember to call connect!");
}

final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));

var transportSession = this.activeSession.get();

if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
throw new McpTransportSessionClosedException();
}

logger.debug("Sending message {}", sentMessage);

final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
final McpTransportSession<Disposable> transportSession = this.activeSession.get();

var uri = Utils.resolveUri(this.baseUri, this.endpoint);
String jsonBody = this.toString(sentMessage);
Expand Down Expand Up @@ -643,22 +652,26 @@ else if (statusCode == BAD_REQUEST) {
new RuntimeException("Failed to send message: " + responseEvent));
})
.retryWhen(authorizationErrorRetrySpec())
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
.flatMap(jsonRpcMessage -> requestHandler.apply(Mono.just(jsonRpcMessage)))
.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
// handle the error first
this.handleException(t);
// inform the caller of sendMessage
deliveredSink.error(t);
return true;
})
.doFinally(s -> {
logger.debug("SendMessage finally: {}", s);
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
}
})).contextWrite(deliveredSink.contextView()).subscribe();
})).onErrorComplete(t -> {
// handle the error first
try {
this.handleException(t);
}
catch (Exception e) {
logger.error("Error handling exception {}", t.getMessage(), e);
}
// inform the caller of sendMessage
deliveredSink.error(t);
return true;
}).contextWrite(deliveredSink.contextView()).subscribe();

disposableRef.set(connection);
transportSession.addConnection(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,41 @@
import java.util.Optional;

import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Represents a closed MCP session, which may not be reused. All calls will throw a
* {@link McpTransportSessionClosedException}.
* Represents a closed MCP session, which may not be reused.
*
* @param <CONNECTION> the resource representing the connection that the transport
* manages.
* @author Daniel Garnier-Moiroux
* @author Dariusz Jędrzejczyk
*/
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {
public final class ClosedMcpTransportSession implements McpTransportSession<Disposable> {

private final String sessionId;
public static final ClosedMcpTransportSession INSTANCE = new ClosedMcpTransportSession();

public ClosedMcpTransportSession(@Nullable String sessionId) {
this.sessionId = sessionId;
private ClosedMcpTransportSession() {
}

@Override
public Optional<String> sessionId() {
throw new McpTransportSessionClosedException(sessionId);
return Optional.empty();
}

@Override
public boolean markInitialized(String sessionId) {
throw new McpTransportSessionClosedException(sessionId);
throw new IllegalStateException("MCP Session is already closed");
}

@Override
public void addConnection(CONNECTION connection) {
throw new McpTransportSessionClosedException(sessionId);
public void addConnection(Disposable connection) {
throw new IllegalStateException("MCP Session is already closed");
}

@Override
public void removeConnection(CONNECTION connection) {
throw new McpTransportSessionClosedException(sessionId);
public void removeConnection(Disposable connection) {
throw new IllegalStateException("MCP Session is already closed");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
this.requestHandlers.putAll(requestHandlers);
this.notificationHandlers.putAll(notificationHandlers);

this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(ignored -> {
}, error -> logger.warn("Client failed during connect", error));
}

private void dismissPendingResponses() {
Expand Down Expand Up @@ -160,7 +161,15 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
var errorResponse = McpSchema.JSONRPCResponse.error(request.id(), jsonRpcError);
return Mono.just(errorResponse);
}).flatMap(this.transport::sendMessage).onErrorComplete(t -> {
logger.warn("Issue sending response to the client, ", t);
if (t instanceof McpTransportSessionClosedException) {
logger.debug("Can't send response to request {} when the transport is closed", request.id());
}
else if (McpTransport.isPeerClosed(t)) {
logger.debug("Can't send response to request {}: connection closed by peer", request.id(), t);
}
else {
logger.warn("Failed to send response to the server", t);
}
return true;
}).subscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
import io.modelcontextprotocol.json.TypeRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -39,6 +41,8 @@
*/
public interface McpTransport {

Logger logger = LoggerFactory.getLogger(McpTransport.class);

/**
* Closes the transport connection and releases any associated resources.
*
Expand All @@ -48,7 +52,24 @@ public interface McpTransport {
* </p>
*/
default void close() {
this.closeGracefully().subscribe();
this.closeGracefully().subscribe(ignored -> {
}, error -> {
if (isPeerClosed(error)) {
logger.debug("Error during asynchronous close", error);
}
else {
logger.warn("Error during asynchronous close", error);
}
});
}

static boolean isPeerClosed(Throwable t) {
for (Throwable c = t; c != null; c = c.getCause()) {
if (c instanceof java.io.EOFException) {
return true;
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
* @see ClosedMcpTransportSession
* @author Daniel Garnier-Moiroux
*/

public class McpTransportSessionClosedException extends RuntimeException {

public McpTransportSessionClosedException() {
super("Transport has already been closed.");
}

@Deprecated(forRemoval = true)
public McpTransportSessionClosedException(@Nullable String sessionId) {
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
: "MCP session has been closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void testRootsListChanged() {
void testInitializeWithRootsListProviders() {
withClient(createMcpTransport(),
builder -> builder.roots(Root.builder("file:///test/path").name("test-root").build()), client -> {
StepVerifier.create(client.initialize().then(client.closeGracefully())).verifyComplete();
StepVerifier.create(client.initialize()).expectNextCount(1).verifyComplete();
});
}

Expand Down Expand Up @@ -725,8 +725,6 @@ void testLoggingConsumer() {
builder -> builder.loggingConsumer(notification -> Mono.fromRunnable(() -> logReceived.set(true))),
client -> {
StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete();
StepVerifier.create(client.closeGracefully()).verifyComplete();

});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
import io.modelcontextprotocol.spec.ProtocolVersions;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -139,13 +142,14 @@ void testCloseUninitialized() {
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.METHOD_INITIALIZE, "test-id", initializeRequest);

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMessage("MCP session has been closed")
.expectErrorMessage("Transport has already been closed.")
.verify();
}

@Test
void testCloseInitialized() {
var transport = HttpClientStreamableHttpTransport.builder(host).build();
// transport.connect(Function.identity()).block();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we don't need to call "connect" anymore?
If we are not "connect"-ed, shouldn't sendMessage error rather than complete?


var initializeRequest = McpSchema.InitializeRequest
.builder(ProtocolVersions.MCP_2025_11_25, McpSchema.ClientCapabilities.builder().roots(true).build(),
Expand All @@ -157,7 +161,8 @@ void testCloseInitialized() {
StepVerifier.create(transport.closeGracefully()).verifyComplete();

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
.expectErrorMatches(err -> err instanceof McpTransportSessionClosedException
&& err.getMessage().contains("Transport has already been closed"))
.verify();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void setUp() {

@AfterEach
void tearDown() {
requestCustomizer.reset();
mcpClient.close();
}

Expand Down
Loading