package org.elasticsearch.xpack.ml.process.logging;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Deque;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

/* loaded from: input_file:org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.class */
public class CppLogMessageHandler implements Closeable {
    private static final Logger LOGGER = LogManager.getLogger(CppLogMessageHandler.class);
    private static final int DEFAULT_READBUF_SIZE = 1024;
    private static final int DEFAULT_ERROR_STORE_SIZE = 5;
    private static final long MAX_MESSAGE_INTERVAL_SECONDS = 10;
    private final String jobId;
    private final InputStream inputStream;
    private final int readBufSize;
    private final int errorStoreSize;
    private final Deque<String> errorStore;
    private final CountDownLatch pidLatch;
    private final CountDownLatch cppCopyrightLatch;
    private final CountDownLatch logStreamClosedLatch;
    private MessageSummary lastMessageSummary;
    private volatile boolean seenFatalError;
    private volatile long pid;
    private volatile String cppCopyright;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler$MessageSummary.class */
    public static class MessageSummary {
        Instant timestamp = Instant.EPOCH;
        CppLogMessage message = null;
        int count = 0;
        Level level = Level.OFF;

        MessageSummary() {
        }

        void reset(Instant instant, CppLogMessage cppLogMessage, Level level) {
            this.timestamp = instant;
            this.message = cppLogMessage;
            this.count = 0;
            this.level = level;
        }
    }

    public CppLogMessageHandler(String str, InputStream inputStream) {
        this(inputStream, str, DEFAULT_READBUF_SIZE, DEFAULT_ERROR_STORE_SIZE);
    }

    CppLogMessageHandler(InputStream inputStream, String str, int i, int i2) {
        this.lastMessageSummary = new MessageSummary();
        this.jobId = str;
        this.inputStream = (InputStream) Objects.requireNonNull(inputStream);
        this.readBufSize = i;
        this.errorStoreSize = i2;
        this.errorStore = ConcurrentCollections.newDeque();
        this.pidLatch = new CountDownLatch(1);
        this.cppCopyrightLatch = new CountDownLatch(1);
        this.logStreamClosedLatch = new CountDownLatch(1);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.inputStream.close();
    }

    public void tailStream() throws IOException {
        XContent xContent = XContentFactory.xContent(XContentType.JSON);
        BytesReference bytesReference = null;
        try {
            byte[] bArr = new byte[this.readBufSize];
            int read = this.inputStream.read(bArr);
            while (read != -1) {
                bytesReference = parseMessages(xContent, bytesReference == null ? new BytesArray(bArr, 0, read) : CompositeBytesReference.of(new BytesReference[]{bytesReference, new BytesArray(bArr, 0, read)}));
                bArr = new byte[this.readBufSize];
                read = this.inputStream.read(bArr);
            }
        } finally {
            this.logStreamClosedLatch.countDown();
            if (this.lastMessageSummary.count > 0) {
                logSummarizedMessage();
            }
            if (bytesReference != null) {
                parseMessage(xContent, bytesReference);
            }
        }
    }

    public boolean hasLogStreamEnded() {
        return this.logStreamClosedLatch.getCount() == 0;
    }

    public boolean seenFatalError() {
        return this.seenFatalError;
    }

    public boolean waitForLogStreamClose(Duration duration) {
        try {
            return this.logStreamClosedLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public long getPid(Duration duration) throws TimeoutException {
        if (this.pid == 0) {
            try {
                this.pidLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.pid == 0) {
                throw new TimeoutException("Timed out waiting for C++ process PID");
            }
        }
        return this.pid;
    }

    public long tryGetPid() {
        if (this.pid == 0) {
            return -1L;
        }
        return this.pid;
    }

    public String getCppCopyright(Duration duration) throws TimeoutException {
        if (this.cppCopyright == null) {
            try {
                this.cppCopyrightLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.cppCopyright == null) {
                throw new TimeoutException("Timed out waiting for C++ process copyright");
            }
        }
        return this.cppCopyright;
    }

    public Map<String, Object> getNativeCodeInfo(Duration duration) throws TimeoutException {
        String cppCopyright = getCppCopyright(duration);
        Matcher matcher = Pattern.compile("Version (.+) \\(Build ([^)]+)\\) Copyright ").matcher(cppCopyright);
        if (!matcher.find()) {
            String str = "Unexpected native process copyright format: " + cppCopyright;
            LOGGER.error(str);
            throw new ElasticsearchException(str, new Object[0]);
        }
        HashMap hashMap = new HashMap(2);
        hashMap.put("version", matcher.group(1));
        hashMap.put("build_hash", matcher.group(2));
        return hashMap;
    }

    public String getErrors() {
        String[] strArr = (String[]) this.errorStore.toArray(new String[0]);
        StringBuilder sb = new StringBuilder();
        for (String str : strArr) {
            sb.append(str).append('\n');
        }
        return sb.toString();
    }

    private BytesReference parseMessages(XContent xContent, BytesReference bytesReference) {
        byte streamSeparator = xContent.streamSeparator();
        int i = 0;
        while (true) {
            int findNextMarker = findNextMarker(streamSeparator, bytesReference, i);
            if (findNextMarker == -1) {
                break;
            }
            if (findNextMarker > i) {
                parseMessage(xContent, bytesReference.slice(i, findNextMarker - i));
            }
            i = findNextMarker + 1;
            if (i < bytesReference.length() && bytesReference.get(i) == 0) {
                i++;
            }
        }
        if (i >= bytesReference.length()) {
            return null;
        }
        return bytesReference.slice(i, bytesReference.length() - i);
    }

    private void parseMessage(XContent xContent, BytesReference bytesReference) {
        try {
            StreamInput streamInput = bytesReference.streamInput();
            try {
                XContentParser createParser = xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, streamInput);
                try {
                    CppLogMessage cppLogMessage = (CppLogMessage) CppLogMessage.PARSER.apply(createParser, (Object) null);
                    Level level = Level.getLevel(cppLogMessage.getLevel());
                    if (level == null) {
                        level = Level.WARN;
                    } else if (level.isMoreSpecificThan(Level.ERROR)) {
                        storeError(cppLogMessage.getMessage());
                        if (level.isMoreSpecificThan(Level.FATAL)) {
                            this.seenFatalError = true;
                        }
                    }
                    long pid = cppLogMessage.getPid();
                    if (this.pid != pid) {
                        this.pid = pid;
                        this.pidLatch.countDown();
                    }
                    String message = cppLogMessage.getMessage();
                    if (this.cppCopyright == null && message.contains("Copyright")) {
                        this.cppCopyright = message;
                        this.cppCopyrightLatch.countDown();
                    }
                    if (!LOGGER.isEnabled(level)) {
                        if (createParser != null) {
                            createParser.close();
                        }
                        if (streamInput != null) {
                            streamInput.close();
                            return;
                        }
                        return;
                    }
                    if (!LOGGER.isDebugEnabled()) {
                        if (cppLogMessage.isSimilarTo(this.lastMessageSummary.message) && this.lastMessageSummary.timestamp.until(cppLogMessage.getTimestamp(), ChronoUnit.SECONDS) < MAX_MESSAGE_INTERVAL_SECONDS) {
                            this.lastMessageSummary.count++;
                            this.lastMessageSummary.message = cppLogMessage;
                            if (createParser != null) {
                                createParser.close();
                            }
                            if (streamInput != null) {
                                streamInput.close();
                                return;
                            }
                            return;
                        }
                        if (this.lastMessageSummary.count > 0) {
                            logSummarizedMessage();
                        }
                        this.lastMessageSummary.reset(cppLogMessage.getTimestamp(), cppLogMessage, level);
                    }
                    if (this.jobId != null) {
                        LOGGER.log(level, "[{}] [{}/{}] [{}@{}] {}", this.jobId, cppLogMessage.getLogger(), Long.valueOf(pid), cppLogMessage.getFile(), Long.valueOf(cppLogMessage.getLine()), message);
                    } else {
                        LOGGER.log(level, "[{}/{}] [{}@{}] {}", cppLogMessage.getLogger(), Long.valueOf(pid), cppLogMessage.getFile(), Long.valueOf(cppLogMessage.getLine()), message);
                    }
                    if (createParser != null) {
                        createParser.close();
                    }
                    if (streamInput != null) {
                        streamInput.close();
                    }
                } catch (Throwable th) {
                    if (createParser != null) {
                        try {
                            createParser.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (streamInput != null) {
                    try {
                        streamInput.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (IOException e) {
            if (this.jobId != null) {
                LOGGER.warn(new ParameterizedMessage("[{}] IO failure receiving C++ log message: {}", new Object[]{this.jobId, bytesReference.utf8ToString()}), e);
            } else {
                LOGGER.warn(new ParameterizedMessage("IO failure receiving C++ log message: {}", new Object[]{bytesReference.utf8ToString()}), e);
            }
        } catch (XContentParseException e2) {
            String str = "Fatal error: '" + bytesReference.utf8ToString() + "'";
            if (str.contains("bad_alloc")) {
                str = str + ", process ran out of memory";
            }
            String str2 = str + ", version: ";
            try {
                Map<String, Object> nativeCodeInfo = getNativeCodeInfo(Duration.ofMillis(MAX_MESSAGE_INTERVAL_SECONDS));
                str2 = str2 + String.format(Locale.ROOT, "%s (build %s)", nativeCodeInfo.get("version"), nativeCodeInfo.get("build_hash"));
            } catch (TimeoutException e3) {
                str2 = str2 + "failed to retrieve";
            }
            storeError(str2);
            this.seenFatalError = true;
        }
    }

    private void logSummarizedMessage() {
        if (this.lastMessageSummary.count > 1) {
            if (this.jobId != null) {
                LOGGER.log(this.lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {} | repeated [{}]", this.jobId, this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage(), Integer.valueOf(this.lastMessageSummary.count));
                return;
            } else {
                LOGGER.log(this.lastMessageSummary.level, "[{}/{}] [{}@{}] {} | repeated [{}]", this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage(), Integer.valueOf(this.lastMessageSummary.count));
                return;
            }
        }
        if (this.jobId != null) {
            LOGGER.log(this.lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {}", this.jobId, this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage());
        } else {
            LOGGER.log(this.lastMessageSummary.level, "[{}/{}] [{}@{}] {}", this.lastMessageSummary.message.getLogger(), Long.valueOf(this.lastMessageSummary.message.getPid()), this.lastMessageSummary.message.getFile(), Long.valueOf(this.lastMessageSummary.message.getLine()), this.lastMessageSummary.message.getMessage());
        }
    }

    private void storeError(String str) {
        if (Strings.isNullOrEmpty(str) || this.errorStoreSize <= 0) {
            return;
        }
        if (this.errorStore.size() >= this.errorStoreSize) {
            this.errorStore.removeFirst();
        }
        this.errorStore.offerLast(str);
    }

    private static int findNextMarker(byte b, BytesReference bytesReference, int i) {
        for (int i2 = i; i2 < bytesReference.length(); i2++) {
            if (bytesReference.get(i2) == b) {
                return i2;
            }
        }
        return -1;
    }
}
