/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.client.transport.ServerParameters;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.util.Assert;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class StdioClientTransport
implements McpClientTransport {
    private static final Logger logger = LoggerFactory.getLogger(StdioClientTransport.class);
    private final Sinks.Many<McpSchema.JSONRPCMessage> inboundSink;
    private final Sinks.Many<McpSchema.JSONRPCMessage> outboundSink;
    private Process process;
    private ObjectMapper objectMapper;
    private Scheduler inboundScheduler;
    private Scheduler outboundScheduler;
    private Scheduler errorScheduler;
    private final ServerParameters params;
    private final Sinks.Many<String> errorSink;
    private volatile boolean isClosing = false;
    private Consumer<String> stdErrorHandler = error -> logger.info("STDERR Message received: {}", error);

    public StdioClientTransport(ServerParameters params) {
        this(params, new ObjectMapper());
    }

    public StdioClientTransport(ServerParameters params, ObjectMapper objectMapper) {
        Assert.notNull(params, "The params can not be null");
        Assert.notNull(objectMapper, "The ObjectMapper can not be null");
        this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
        this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
        this.params = params;
        this.objectMapper = objectMapper;
        this.errorSink = Sinks.many().unicast().onBackpressureBuffer();
        this.inboundScheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newSingleThreadExecutor(), (String)"inbound");
        this.outboundScheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newSingleThreadExecutor(), (String)"outbound");
        this.errorScheduler = Schedulers.fromExecutorService((ExecutorService)Executors.newSingleThreadExecutor(), (String)"error");
    }

    @Override
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        return Mono.fromRunnable(() -> {
            logger.info("MCP server starting.");
            this.handleIncomingMessages(handler);
            this.handleIncomingErrors();
            ArrayList<String> fullCommand = new ArrayList<String>();
            fullCommand.add(this.params.getCommand());
            fullCommand.addAll(this.params.getArgs());
            ProcessBuilder processBuilder = this.getProcessBuilder();
            processBuilder.command(fullCommand);
            processBuilder.environment().putAll(this.params.getEnv());
            try {
                this.process = processBuilder.start();
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to start process with command: " + String.valueOf(fullCommand), e);
            }
            if (this.process.getInputStream() == null || this.process.getOutputStream() == null) {
                this.process.destroy();
                throw new RuntimeException("Process input or output stream is null");
            }
            this.startInboundProcessing();
            this.startOutboundProcessing();
            this.startErrorProcessing();
            logger.info("MCP server started");
        }).subscribeOn(Schedulers.boundedElastic());
    }

    protected ProcessBuilder getProcessBuilder() {
        return new ProcessBuilder(new String[0]);
    }

    public void setStdErrorHandler(Consumer<String> errorHandler) {
        this.stdErrorHandler = errorHandler;
    }

    public void awaitForExit() {
        try {
            this.process.waitFor();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Process interrupted", e);
        }
    }

    private void startErrorProcessing() {
        this.errorScheduler.schedule(() -> {
            try (BufferedReader processErrorReader = new BufferedReader(new InputStreamReader(this.process.getErrorStream()));){
                String line;
                while (!this.isClosing && (line = processErrorReader.readLine()) != null) {
                    try {
                        if (this.errorSink.tryEmitNext((Object)line).isSuccess()) continue;
                        if (!this.isClosing) {
                            logger.error("Failed to emit error message");
                        }
                        break;
                    }
                    catch (Exception e) {
                        if (!this.isClosing) {
                            logger.error("Error processing error message", (Throwable)e);
                        }
                        break;
                    }
                }
            }
            catch (IOException e) {
                if (!this.isClosing) {
                    logger.error("Error reading from error stream", (Throwable)e);
                }
            }
            finally {
                this.isClosing = true;
                this.errorSink.tryEmitComplete();
            }
        });
    }

    private void handleIncomingMessages(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> inboundMessageHandler) {
        this.inboundSink.asFlux().flatMap(message -> Mono.just((Object)message).transform(inboundMessageHandler).contextWrite(ctx -> ctx.put((Object)"observation", (Object)"myObservation"))).subscribe();
    }

    private void handleIncomingErrors() {
        this.errorSink.asFlux().subscribe(e -> this.stdErrorHandler.accept((String)e));
    }

    @Override
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        if (this.outboundSink.tryEmitNext((Object)message).isSuccess()) {
            return Mono.empty();
        }
        return Mono.error((Throwable)new RuntimeException("Failed to enqueue message"));
    }

    private void startInboundProcessing() {
        this.inboundScheduler.schedule(() -> {
            try (BufferedReader processReader = new BufferedReader(new InputStreamReader(this.process.getInputStream()));){
                String line;
                while (!this.isClosing && (line = processReader.readLine()) != null) {
                    try {
                        McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, line);
                        if (this.inboundSink.tryEmitNext((Object)message).isSuccess()) continue;
                        if (!this.isClosing) {
                            logger.error("Failed to enqueue inbound message: {}", (Object)message);
                        }
                        break;
                    }
                    catch (Exception e) {
                        if (!this.isClosing) {
                            logger.error("Error processing inbound message for line: {}", (Object)line, (Object)e);
                        }
                        break;
                    }
                }
            }
            catch (IOException e) {
                if (!this.isClosing) {
                    logger.error("Error reading from input stream", (Throwable)e);
                }
            }
            finally {
                this.isClosing = true;
                this.inboundSink.tryEmitComplete();
            }
        });
    }

    private void startOutboundProcessing() {
        this.handleOutbound(messages -> messages.publishOn(this.outboundScheduler).handle((message, s) -> {
            if (message != null && !this.isClosing) {
                try {
                    OutputStream os;
                    String jsonMessage = this.objectMapper.writeValueAsString(message);
                    jsonMessage = jsonMessage.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n");
                    OutputStream outputStream = os = this.process.getOutputStream();
                    synchronized (outputStream) {
                        os.write(jsonMessage.getBytes(StandardCharsets.UTF_8));
                        os.write("\n".getBytes(StandardCharsets.UTF_8));
                        os.flush();
                    }
                    s.next(message);
                }
                catch (IOException e) {
                    s.error((Throwable)new RuntimeException(e));
                }
            }
        }));
    }

    protected void handleOutbound(Function<Flux<McpSchema.JSONRPCMessage>, Flux<McpSchema.JSONRPCMessage>> outboundConsumer) {
        outboundConsumer.apply((Flux<McpSchema.JSONRPCMessage>)this.outboundSink.asFlux()).doOnComplete(() -> {
            this.isClosing = true;
            this.outboundSink.tryEmitComplete();
        }).doOnError(e -> {
            if (!this.isClosing) {
                logger.error("Error in outbound processing", e);
                this.isClosing = true;
                this.outboundSink.tryEmitComplete();
            }
        }).subscribe();
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            logger.debug("Initiating graceful shutdown");
        }).then(Mono.defer(() -> {
            this.inboundSink.tryEmitComplete();
            this.outboundSink.tryEmitComplete();
            this.errorSink.tryEmitComplete();
            return Mono.delay((Duration)Duration.ofMillis(100L)).then();
        })).then(Mono.defer(() -> {
            logger.debug("Sending TERM to process");
            if (this.process != null) {
                this.process.destroy();
                return Mono.fromFuture(this.process.onExit());
            }
            logger.warn("Process not started");
            return Mono.empty();
        })).doOnNext(process -> {
            if (process.exitValue() != 0) {
                logger.warn("Process terminated with code {}", (Object)process.exitValue());
            } else {
                logger.info("MCP server process stopped");
            }
        }).then(Mono.fromRunnable(() -> {
            try {
                this.inboundScheduler.dispose();
                this.errorScheduler.dispose();
                this.outboundScheduler.dispose();
                logger.debug("Graceful shutdown completed");
            }
            catch (Exception e) {
                logger.error("Error during graceful shutdown", (Throwable)e);
            }
        })).then().subscribeOn(Schedulers.boundedElastic());
    }

    public Sinks.Many<String> getErrorSink() {
        return this.errorSink;
    }

    @Override
    public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
        return (T)this.objectMapper.convertValue(data, typeRef);
    }
}

