package org.elasticsearch.xpack.ml.dataframe.process;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.dataframe.stats.DataCountsTracker;
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsPersister;
import org.elasticsearch.xpack.ml.dataframe.steps.StepResponse;
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

/* loaded from: input_file:org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.class */
public class AnalyticsProcessManager {
    private static final Logger LOGGER = LogManager.getLogger(AnalyticsProcessManager.class);
    private final Settings settings;
    private final Client client;
    private final ExecutorService executorServiceForJob;
    private final ExecutorService executorServiceForProcess;
    private final AnalyticsProcessFactory<AnalyticsResult> processFactory;
    private final ConcurrentMap<Long, ProcessContext> processContextByAllocation;
    private final DataFrameAnalyticsAuditor auditor;
    private final TrainedModelProvider trainedModelProvider;
    private final ResultsPersisterService resultsPersisterService;
    private final int numAllocatedProcessors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager$ProcessContext.class */
    public class ProcessContext {
        private final DataFrameAnalyticsConfig config;
        private final SetOnce<AnalyticsProcess<AnalyticsResult>> process = new SetOnce<>();
        private final SetOnce<DataFrameDataExtractor> dataExtractor = new SetOnce<>();
        private final SetOnce<AnalyticsResultProcessor> resultProcessor = new SetOnce<>();
        private final SetOnce<String> failureReason = new SetOnce<>();

        ProcessContext(DataFrameAnalyticsConfig dataFrameAnalyticsConfig) {
            this.config = (DataFrameAnalyticsConfig) Objects.requireNonNull(dataFrameAnalyticsConfig);
        }

        String getFailureReason() {
            return (String) this.failureReason.get();
        }

        void setFailureReason(String str) {
            if (str == null) {
                return;
            }
            this.failureReason.trySet(str);
        }

        synchronized void stop() {
            AnalyticsProcessManager.LOGGER.debug("[{}] Stopping process", this.config.getId());
            if (this.dataExtractor.get() != null) {
                ((DataFrameDataExtractor) this.dataExtractor.get()).cancel();
            }
            if (this.resultProcessor.get() != null) {
                ((AnalyticsResultProcessor) this.resultProcessor.get()).cancel();
            }
            if (this.process.get() != null) {
                try {
                    ((AnalyticsProcess) this.process.get()).kill(true);
                } catch (IOException e) {
                    AnalyticsProcessManager.LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", this.config.getId()), e);
                }
            }
        }

        synchronized boolean startProcess(DataFrameDataExtractorFactory dataFrameDataExtractorFactory, DataFrameAnalyticsTask dataFrameAnalyticsTask, boolean z) {
            if (dataFrameAnalyticsTask.isStopping()) {
                return false;
            }
            this.dataExtractor.set(dataFrameDataExtractorFactory.newExtractor(false));
            AnalyticsProcessConfig createProcessConfig = createProcessConfig((DataFrameDataExtractor) this.dataExtractor.get(), dataFrameDataExtractorFactory.getExtractedFields());
            AnalyticsProcessManager.LOGGER.debug("[{}] creating analytics process with config [{}]", this.config.getId(), Strings.toString(createProcessConfig));
            if (createProcessConfig.rows() == 0) {
                AnalyticsProcessManager.LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", this.config.getId());
                return false;
            }
            this.process.set(AnalyticsProcessManager.this.createProcess(dataFrameAnalyticsTask, this.config, createProcessConfig, z));
            this.resultProcessor.set(createResultProcessor(dataFrameAnalyticsTask, dataFrameDataExtractorFactory));
            return true;
        }

        private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataFrameDataExtractor, ExtractedFields extractedFields) {
            DataFrameDataExtractor.DataSummary collectDataSummary = dataFrameDataExtractor.collectDataSummary();
            Set<String> categoricalFields = dataFrameDataExtractor.getCategoricalFields(this.config.getAnalysis());
            return new AnalyticsProcessConfig(this.config.getId(), collectDataSummary.rows, collectDataSummary.cols, this.config.getModelMemoryLimit(), Math.min(this.config.getMaxNumThreads().intValue(), AnalyticsProcessManager.this.numAllocatedProcessors), this.config.getDest().getResultsField(), categoricalFields, this.config.getAnalysis(), extractedFields);
        }

        private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask dataFrameAnalyticsTask, DataFrameDataExtractorFactory dataFrameDataExtractorFactory) {
            return new AnalyticsResultProcessor(this.config, new DataFrameRowsJoiner(this.config.getId(), AnalyticsProcessManager.this.settings, dataFrameAnalyticsTask.getParentTaskId(), dataFrameDataExtractorFactory.newExtractor(true), AnalyticsProcessManager.this.resultsPersisterService), dataFrameAnalyticsTask.getStatsHolder(), AnalyticsProcessManager.this.trainedModelProvider, AnalyticsProcessManager.this.auditor, new StatsPersister(this.config.getId(), AnalyticsProcessManager.this.resultsPersisterService, AnalyticsProcessManager.this.auditor), ((DataFrameDataExtractor) this.dataExtractor.get()).getExtractedFields());
        }
    }

    public AnalyticsProcessManager(Settings settings, Client client, ThreadPool threadPool, AnalyticsProcessFactory<AnalyticsResult> analyticsProcessFactory, DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor, TrainedModelProvider trainedModelProvider, ResultsPersisterService resultsPersisterService, int i) {
        this(settings, client, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), analyticsProcessFactory, dataFrameAnalyticsAuditor, trainedModelProvider, resultsPersisterService, i);
    }

    public AnalyticsProcessManager(Settings settings, Client client, ExecutorService executorService, ExecutorService executorService2, AnalyticsProcessFactory<AnalyticsResult> analyticsProcessFactory, DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor, TrainedModelProvider trainedModelProvider, ResultsPersisterService resultsPersisterService, int i) {
        this.processContextByAllocation = new ConcurrentHashMap();
        this.settings = (Settings) Objects.requireNonNull(settings);
        this.client = (Client) Objects.requireNonNull(client);
        this.executorServiceForJob = (ExecutorService) Objects.requireNonNull(executorService);
        this.executorServiceForProcess = (ExecutorService) Objects.requireNonNull(executorService2);
        this.processFactory = (AnalyticsProcessFactory) Objects.requireNonNull(analyticsProcessFactory);
        this.auditor = (DataFrameAnalyticsAuditor) Objects.requireNonNull(dataFrameAnalyticsAuditor);
        this.trainedModelProvider = (TrainedModelProvider) Objects.requireNonNull(trainedModelProvider);
        this.resultsPersisterService = (ResultsPersisterService) Objects.requireNonNull(resultsPersisterService);
        this.numAllocatedProcessors = i;
    }

    public void runJob(DataFrameAnalyticsTask dataFrameAnalyticsTask, DataFrameAnalyticsConfig dataFrameAnalyticsConfig, DataFrameDataExtractorFactory dataFrameDataExtractorFactory, ActionListener<StepResponse> actionListener) {
        this.executorServiceForJob.execute(() -> {
            ProcessContext processContext = new ProcessContext(dataFrameAnalyticsConfig);
            synchronized (this.processContextByAllocation) {
                if (dataFrameAnalyticsTask.isStopping()) {
                    LOGGER.debug("[{}] task is stopping. Marking as complete before creating process context.", dataFrameAnalyticsTask.getParams().getId());
                    this.auditor.info(dataFrameAnalyticsConfig.getId(), "Finished analysis");
                    actionListener.onResponse(new StepResponse(true));
                    return;
                }
                if (this.processContextByAllocation.putIfAbsent(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()), processContext) != null) {
                    actionListener.onFailure(ExceptionsHelper.serverError("[" + dataFrameAnalyticsConfig.getId() + "] Could not create process as one already exists"));
                    return;
                }
                boolean hasModelState = hasModelState(dataFrameAnalyticsConfig);
                try {
                    if (processContext.startProcess(dataFrameDataExtractorFactory, dataFrameAnalyticsTask, hasModelState)) {
                        this.executorServiceForProcess.execute(() -> {
                            ((AnalyticsResultProcessor) processContext.resultProcessor.get()).process((AnalyticsProcess) processContext.process.get());
                        });
                        this.executorServiceForProcess.execute(() -> {
                            processData(dataFrameAnalyticsTask, processContext, hasModelState, actionListener);
                        });
                    } else {
                        this.processContextByAllocation.remove(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
                        this.auditor.info(dataFrameAnalyticsConfig.getId(), "Finished analysis");
                        actionListener.onResponse(new StepResponse(true));
                    }
                } catch (Exception e) {
                    processContext.stop();
                    this.processContextByAllocation.remove(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
                    actionListener.onFailure(processContext.getFailureReason() == null ? e : ExceptionsHelper.serverError(processContext.getFailureReason()));
                }
            }
        });
    }

    private boolean hasModelState(DataFrameAnalyticsConfig dataFrameAnalyticsConfig) {
        if (!dataFrameAnalyticsConfig.getAnalysis().persistsState()) {
            return false;
        }
        ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
        try {
            boolean z = this.client.prepareSearch(new String[]{AnomalyDetectorsIndex.jobStateIndexPattern()}).setSize(1).setFetchSource(false).setQuery(QueryBuilders.idsQuery().addIds(new String[]{new StringBuilder().append(dataFrameAnalyticsConfig.getAnalysis().getStateDocIdPrefix(dataFrameAnalyticsConfig.getId())).append("1").toString()})).get().getHits().getHits().length == 1;
            if (stashWithOrigin != null) {
                stashWithOrigin.close();
            }
            return z;
        } catch (Throwable th) {
            if (stashWithOrigin != null) {
                try {
                    stashWithOrigin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void processData(DataFrameAnalyticsTask dataFrameAnalyticsTask, ProcessContext processContext, boolean z, ActionListener<StepResponse> actionListener) {
        LOGGER.info("[{}] Started loading data", processContext.config.getId());
        this.auditor.info(processContext.config.getId(), Messages.getMessage("Started loading data"));
        DataFrameAnalyticsConfig dataFrameAnalyticsConfig = processContext.config;
        DataFrameDataExtractor dataFrameDataExtractor = (DataFrameDataExtractor) processContext.dataExtractor.get();
        AnalyticsProcess<AnalyticsResult> analyticsProcess = (AnalyticsProcess) processContext.process.get();
        AnalyticsResultProcessor analyticsResultProcessor = (AnalyticsResultProcessor) processContext.resultProcessor.get();
        try {
            try {
                writeHeaderRecord(dataFrameDataExtractor, analyticsProcess, dataFrameAnalyticsTask);
                writeDataRows(dataFrameDataExtractor, analyticsProcess, dataFrameAnalyticsTask);
                analyticsProcess.writeEndOfDataMessage();
                analyticsProcess.flushStream();
                restoreState(dataFrameAnalyticsConfig, analyticsProcess, z);
                LOGGER.info("[{}] Started analyzing", processContext.config.getId());
                this.auditor.info(processContext.config.getId(), Messages.getMessage("Started analyzing"));
                LOGGER.info("[{}] Waiting for result processor to complete", dataFrameAnalyticsConfig.getId());
                analyticsResultProcessor.awaitForCompletion();
                processContext.setFailureReason(analyticsResultProcessor.getFailure());
                LOGGER.info("[{}] Result processor has completed", dataFrameAnalyticsConfig.getId());
                closeProcess(dataFrameAnalyticsTask);
                this.processContextByAllocation.remove(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
                LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", dataFrameAnalyticsConfig.getId(), Integer.valueOf(this.processContextByAllocation.size()));
                if (processContext.getFailureReason() == null) {
                    this.auditor.info(dataFrameAnalyticsConfig.getId(), "Finished analysis");
                    actionListener.onResponse(new StepResponse(false));
                } else {
                    LOGGER.error("[{}] Marking task failed; {}", dataFrameAnalyticsConfig.getId(), processContext.getFailureReason());
                    actionListener.onFailure(ExceptionsHelper.serverError(processContext.getFailureReason()));
                }
            } catch (Exception e) {
                if (dataFrameAnalyticsTask.isStopping()) {
                    LOGGER.debug(new ParameterizedMessage("[{}] Error while processing data [{}]; task is stopping", dataFrameAnalyticsConfig.getId(), e.getMessage()).getFormattedMessage(), e);
                } else {
                    String formattedMessage = new ParameterizedMessage("[{}] Error while processing data [{}]", dataFrameAnalyticsConfig.getId(), e.getMessage()).getFormattedMessage();
                    LOGGER.error(formattedMessage, e);
                    processContext.setFailureReason(formattedMessage);
                }
                closeProcess(dataFrameAnalyticsTask);
                this.processContextByAllocation.remove(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
                LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", dataFrameAnalyticsConfig.getId(), Integer.valueOf(this.processContextByAllocation.size()));
                if (processContext.getFailureReason() == null) {
                    this.auditor.info(dataFrameAnalyticsConfig.getId(), "Finished analysis");
                    actionListener.onResponse(new StepResponse(false));
                } else {
                    LOGGER.error("[{}] Marking task failed; {}", dataFrameAnalyticsConfig.getId(), processContext.getFailureReason());
                    actionListener.onFailure(ExceptionsHelper.serverError(processContext.getFailureReason()));
                }
            }
        } catch (Throwable th) {
            closeProcess(dataFrameAnalyticsTask);
            this.processContextByAllocation.remove(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
            LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", dataFrameAnalyticsConfig.getId(), Integer.valueOf(this.processContextByAllocation.size()));
            if (processContext.getFailureReason() == null) {
                this.auditor.info(dataFrameAnalyticsConfig.getId(), "Finished analysis");
                actionListener.onResponse(new StepResponse(false));
            } else {
                LOGGER.error("[{}] Marking task failed; {}", dataFrameAnalyticsConfig.getId(), processContext.getFailureReason());
                actionListener.onFailure(ExceptionsHelper.serverError(processContext.getFailureReason()));
            }
            throw th;
        }
    }

    private void writeDataRows(DataFrameDataExtractor dataFrameDataExtractor, AnalyticsProcess<AnalyticsResult> analyticsProcess, DataFrameAnalyticsTask dataFrameAnalyticsTask) throws IOException {
        ProgressTracker progressTracker = dataFrameAnalyticsTask.getStatsHolder().getProgressTracker();
        DataCountsTracker dataCountsTracker = dataFrameAnalyticsTask.getStatsHolder().getDataCountsTracker();
        String[] strArr = new String[dataFrameDataExtractor.getFieldNames().size() + 2];
        strArr[strArr.length - 1] = "";
        long rows = analyticsProcess.getConfig().rows();
        long j = 0;
        while (dataFrameDataExtractor.hasNext()) {
            Optional<List<DataFrameDataExtractor.Row>> next = dataFrameDataExtractor.next();
            if (next.isPresent()) {
                for (DataFrameDataExtractor.Row row : next.get()) {
                    if (row.shouldSkip()) {
                        dataCountsTracker.incrementSkippedDocsCount();
                    } else {
                        String[] values = row.getValues();
                        System.arraycopy(values, 0, strArr, 0, values.length);
                        strArr[strArr.length - 2] = String.valueOf(row.getChecksum());
                        if (row.isTraining()) {
                            dataCountsTracker.incrementTrainingDocsCount();
                            analyticsProcess.writeRecord(strArr);
                        }
                    }
                }
                j += next.get().size();
                progressTracker.updateLoadingDataProgress(j >= rows ? 100 : (int) ((j * 100.0d) / rows));
            }
        }
    }

    private void writeHeaderRecord(DataFrameDataExtractor dataFrameDataExtractor, AnalyticsProcess<AnalyticsResult> analyticsProcess, DataFrameAnalyticsTask dataFrameAnalyticsTask) throws IOException {
        List<String> fieldNames = dataFrameDataExtractor.getFieldNames();
        LOGGER.debug(() -> {
            return new ParameterizedMessage("[{}] header row fields {}", dataFrameAnalyticsTask.getParams().getId(), fieldNames);
        });
        String[] strArr = new String[fieldNames.size() + 2];
        for (int i = 0; i < fieldNames.size(); i++) {
            strArr[i] = fieldNames.get(i);
        }
        strArr[strArr.length - 2] = ".";
        strArr[strArr.length - 1] = ".";
        analyticsProcess.writeRecord(strArr);
    }

    private void restoreState(DataFrameAnalyticsConfig dataFrameAnalyticsConfig, AnalyticsProcess<AnalyticsResult> analyticsProcess, boolean z) {
        if (!dataFrameAnalyticsConfig.getAnalysis().persistsState()) {
            LOGGER.debug("[{}] Analysis does not support state", dataFrameAnalyticsConfig.getId());
            return;
        }
        if (!z) {
            LOGGER.debug("[{}] No model state available to restore", dataFrameAnalyticsConfig.getId());
            return;
        }
        LOGGER.debug("[{}] Restoring from previous model state", dataFrameAnalyticsConfig.getId());
        this.auditor.info(dataFrameAnalyticsConfig.getId(), "Restoring from previous model state");
        try {
            ThreadContext.StoredContext stashWithOrigin = this.client.threadPool().getThreadContext().stashWithOrigin("ml");
            try {
                analyticsProcess.restoreState(this.client, dataFrameAnalyticsConfig.getAnalysis().getStateDocIdPrefix(dataFrameAnalyticsConfig.getId()));
                if (stashWithOrigin != null) {
                    stashWithOrigin.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", analyticsProcess.getConfig().jobId()), e);
            throw ExceptionsHelper.serverError("Failed to restore state: " + e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AnalyticsProcess<AnalyticsResult> createProcess(DataFrameAnalyticsTask dataFrameAnalyticsTask, DataFrameAnalyticsConfig dataFrameAnalyticsConfig, AnalyticsProcessConfig analyticsProcessConfig, boolean z) {
        AnalyticsProcess<AnalyticsResult> createAnalyticsProcess = this.processFactory.createAnalyticsProcess(dataFrameAnalyticsConfig, analyticsProcessConfig, z, this.executorServiceForProcess, onProcessCrash(dataFrameAnalyticsTask));
        if (createAnalyticsProcess.isProcessAlive()) {
            return createAnalyticsProcess;
        }
        throw ExceptionsHelper.serverError("Failed to start data frame analytics process");
    }

    private Consumer<String> onProcessCrash(DataFrameAnalyticsTask dataFrameAnalyticsTask) {
        return str -> {
            ProcessContext processContext = this.processContextByAllocation.get(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
            if (processContext != null) {
                processContext.setFailureReason(str);
                processContext.stop();
            }
        };
    }

    private void closeProcess(DataFrameAnalyticsTask dataFrameAnalyticsTask) {
        String id = dataFrameAnalyticsTask.getParams().getId();
        LOGGER.info("[{}] Closing process", id);
        ProcessContext processContext = this.processContextByAllocation.get(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
        try {
            ((AnalyticsProcess) processContext.process.get()).close();
            LOGGER.info("[{}] Closed process", id);
        } catch (Exception e) {
            if (dataFrameAnalyticsTask.isStopping()) {
                LOGGER.debug(() -> {
                    return new ParameterizedMessage("[{}] Process closing was interrupted by kill request due to the task being stopped", id);
                }, e);
                LOGGER.info("[{}] Closed process", id);
            } else {
                LOGGER.error("[" + id + "] Error closing data frame analyzer process", e);
                processContext.setFailureReason(new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]", id, e.getMessage()).getFormattedMessage());
            }
        }
    }

    public void stop(DataFrameAnalyticsTask dataFrameAnalyticsTask) {
        ProcessContext processContext;
        synchronized (this.processContextByAllocation) {
            processContext = this.processContextByAllocation.get(Long.valueOf(dataFrameAnalyticsTask.getAllocationId()));
        }
        if (processContext == null) {
            LOGGER.debug("[{}] No process context to stop", dataFrameAnalyticsTask.getParams().getId());
        } else {
            LOGGER.debug("[{}] Stopping process", dataFrameAnalyticsTask.getParams().getId());
            processContext.stop();
        }
    }

    int getProcessContextCount() {
        return this.processContextByAllocation.size();
    }
}
