package org.elasticsearch.xpack.ml.process;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;

/* loaded from: input_file:org/elasticsearch/xpack/ml/process/NativeController.class */
public class NativeController {
    private static final Logger LOGGER;
    private static final String CONTROLLER = "controller";
    private static final Duration CONTROLLER_CONNECT_TIMEOUT;
    private static final String START_COMMAND = "start";
    private static final String KILL_COMMAND = "kill";
    public static final Map<String, Object> UNKNOWN_NATIVE_CODE_INFO;
    private final String localNodeName;
    private final CppLogMessageHandler cppLogHandler;
    private final OutputStream commandStream;
    private final InputStream responseStream;
    private final NamedXContentRegistry xContentRegistry;
    private final Map<Integer, ResponseTracker> responseTrackers = new ConcurrentHashMap();
    private final SetOnce<Iterator<ControllerResponse>> responseIteratorHolder = new SetOnce<>();
    private int nextCommandId = 1;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/process/NativeController$ResponseTracker.class */
    public static class ResponseTracker {
        private final CountDownLatch latch;
        private final SetOnce<ControllerResponse> responseHolder;

        private ResponseTracker() {
            this.latch = new CountDownLatch(1);
            this.responseHolder = new SetOnce<>();
        }

        boolean hasResponded() {
            return this.latch.getCount() < 1;
        }

        void setResponse(ControllerResponse controllerResponse) {
            this.responseHolder.set(controllerResponse);
            this.latch.countDown();
        }

        ControllerResponse getResponse() throws InterruptedException {
            this.latch.await();
            return (ControllerResponse) this.responseHolder.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NativeController(String str, Environment environment, NamedPipeHelper namedPipeHelper, NamedXContentRegistry namedXContentRegistry) throws IOException {
        this.localNodeName = str;
        ProcessPipes processPipes = new ProcessPipes(environment, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, null, true, false, true, false, false);
        processPipes.connectLogStream();
        this.cppLogHandler = processPipes.getLogStreamHandler();
        tailLogsInThread(this.cppLogHandler);
        processPipes.connectOtherStreams();
        this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
        this.responseStream = processPipes.getProcessOutStream().get();
        this.xContentRegistry = namedXContentRegistry;
    }

    static void tailLogsInThread(CppLogMessageHandler cppLogMessageHandler) {
        Thread thread = new Thread(() -> {
            try {
                try {
                    cppLogMessageHandler.tailStream();
                    if (cppLogMessageHandler != null) {
                        cppLogMessageHandler.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                LOGGER.error("Error tailing C++ controller logs", e);
            }
            LOGGER.info("Native controller process has stopped - no new native processes can be started");
        }, "ml-cpp-log-tail-thread");
        thread.setDaemon(true);
        thread.start();
    }

    public long getPid() throws TimeoutException {
        return this.cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT);
    }

    public Map<String, Object> getNativeCodeInfo() throws TimeoutException {
        return this.cppLogHandler.getNativeCodeInfo(CONTROLLER_CONNECT_TIMEOUT);
    }

    public void startProcess(List<String> list) throws IOException, InterruptedException {
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Cannot start process: no command supplied");
        }
        for (String str : list) {
            if (str.contains("\t")) {
                throw new IllegalArgumentException("argument contains a tab character: " + str + " in " + list);
            }
            if (str.contains("\n")) {
                throw new IllegalArgumentException("argument contains a newline character: " + str + " in " + list);
            }
        }
        if (this.cppLogHandler.hasLogStreamEnded()) {
            String str2 = "Cannot start process [" + list.get(0) + "]: native controller process has stopped on node [" + this.localNodeName + "]";
            LOGGER.error(str2);
            throw new ElasticsearchException(str2, new Object[0]);
        }
        int i = -1;
        try {
            synchronized (this.commandStream) {
                int i2 = this.nextCommandId;
                this.nextCommandId = i2 + 1;
                i = i2;
                setupResponseTracker(i);
                LOGGER.debug("Command [{}]: starting process with command {}", Integer.valueOf(i), list);
                this.commandStream.write(Integer.toString(i).getBytes(StandardCharsets.UTF_8));
                this.commandStream.write(9);
                this.commandStream.write("start".getBytes(StandardCharsets.UTF_8));
                for (String str3 : list) {
                    this.commandStream.write(9);
                    this.commandStream.write(str3.getBytes(StandardCharsets.UTF_8));
                }
                this.commandStream.write(10);
                this.commandStream.flush();
            }
            awaitCompletion(i);
            removeResponseTracker(i);
        } catch (Throwable th) {
            removeResponseTracker(i);
            throw th;
        }
    }

    public void killProcess(long j, boolean z) throws TimeoutException, IOException, InterruptedException {
        if (j <= 0) {
            throw new IllegalArgumentException("invalid PID to kill: " + j);
        }
        if (j == getPid()) {
            throw new IllegalArgumentException("native controller will not kill self: " + j);
        }
        if (this.cppLogHandler.hasLogStreamEnded()) {
            String str = "Cannot kill process with PID [" + j + "]: native controller process has stopped on node [" + this.localNodeName + "]";
            LOGGER.error(str);
            throw new ElasticsearchException(str, new Object[0]);
        }
        int i = -1;
        try {
            synchronized (this.commandStream) {
                int i2 = this.nextCommandId;
                this.nextCommandId = i2 + 1;
                i = i2;
                if (z) {
                    setupResponseTracker(i);
                }
                LOGGER.debug("Command [{}]: killing process with PID [{}]", Integer.valueOf(i), Long.valueOf(j));
                this.commandStream.write(Integer.toString(i).getBytes(StandardCharsets.UTF_8));
                this.commandStream.write(9);
                this.commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8));
                this.commandStream.write(9);
                this.commandStream.write(Long.toString(j).getBytes(StandardCharsets.UTF_8));
                this.commandStream.write(10);
                this.commandStream.flush();
            }
            if (z) {
                awaitCompletion(i);
            }
            if (z) {
                removeResponseTracker(i);
            }
        } catch (Throwable th) {
            if (z) {
                removeResponseTracker(i);
            }
            throw th;
        }
    }

    public void stop() throws IOException {
        this.commandStream.close();
    }

    private void setupResponseTracker(int i) {
        ResponseTracker put = this.responseTrackers.put(Integer.valueOf(i), new ResponseTracker());
        if (!$assertionsDisabled && put != null) {
            throw new AssertionError();
        }
    }

    private void removeResponseTracker(int i) {
        this.responseTrackers.remove(Integer.valueOf(i));
    }

    private void awaitCompletion(int i) throws IOException, InterruptedException {
        ResponseTracker responseTracker = this.responseTrackers.get(Integer.valueOf(i));
        if (!$assertionsDisabled && responseTracker == null) {
            throw new AssertionError();
        }
        if (!responseTracker.hasResponded()) {
            synchronized (this.responseIteratorHolder) {
                Iterator it = (Iterator) this.responseIteratorHolder.get();
                if (it == null) {
                    it = new ProcessResultsParser(ControllerResponse.PARSER, this.xContentRegistry).parseResults(this.responseStream);
                    this.responseIteratorHolder.set(it);
                }
                while (!responseTracker.hasResponded()) {
                    if (!it.hasNext()) {
                        throw new IOException("ML controller response stream ended while awaiting response for command [" + i + "]");
                    }
                    ControllerResponse controllerResponse = (ControllerResponse) it.next();
                    ResponseTracker responseTracker2 = this.responseTrackers.get(Integer.valueOf(controllerResponse.getCommandId()));
                    if (responseTracker2 != null) {
                        responseTracker2.setResponse(controllerResponse);
                    }
                }
            }
        }
        ControllerResponse response = responseTracker.getResponse();
        if (!$assertionsDisabled && response.getCommandId() != i) {
            throw new AssertionError();
        }
        if (!response.isSuccess()) {
            throw new IOException("ML controller failed to execute command [" + i + "]: [" + response.getReason() + "]");
        }
        LOGGER.debug("ML controller successfully executed command [" + i + "]: [" + response.getReason() + "]");
    }

    static {
        $assertionsDisabled = !NativeController.class.desiredAssertionStatus();
        LOGGER = LogManager.getLogger(NativeController.class);
        CONTROLLER_CONNECT_TIMEOUT = Duration.ofSeconds(10L);
        HashMap hashMap = new HashMap(2);
        hashMap.put("version", "N/A");
        hashMap.put("build_hash", "N/A");
        UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(hashMap);
    }
}
