package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.JobSnapshotUpgraderResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTask;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;

/* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.class */
public final class JobModelSnapshotUpgrader {
    private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
    private static final Logger logger = LogManager.getLogger(JobModelSnapshotUpgrader.class);
    private final SnapshotUpgradeTask task;
    private final Job job;
    private final String jobId;
    private final String snapshotId;
    private final AutodetectParams params;
    private final Client client;
    private final Consumer<Exception> onFinish;
    private final Supplier<Boolean> continueRunning;
    private final ThreadPool threadPool;
    private final AutodetectProcessFactory autodetectProcessFactory;
    private final JobResultsPersister jobResultsPersister;
    private final NativeStorageProvider nativeStorageProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader$Executor.class */
    public class Executor {
        private final StateStreamer stateStreamer;
        private final JobSnapshotUpgraderResultProcessor processor;
        private final ExecutorService autodetectWorkerExecutor;
        private final AutodetectProcess process;

        Executor(StateStreamer stateStreamer, JobSnapshotUpgraderResultProcessor jobSnapshotUpgraderResultProcessor, ExecutorService executorService, AutodetectProcess autodetectProcess) {
            this.stateStreamer = stateStreamer;
            this.processor = jobSnapshotUpgraderResultProcessor;
            this.autodetectWorkerExecutor = executorService;
            this.process = autodetectProcess;
        }

        void execute() {
            restoreState();
        }

        protected final Map<String, Integer> outputFieldIndexes() {
            HashMap hashMap = new HashMap();
            hashMap.put(JobModelSnapshotUpgrader.this.job.getDataDescription().getTimeField(), 0);
            int i = 1;
            for (String str : JobModelSnapshotUpgrader.this.job.getAnalysisConfig().analysisFields()) {
                if (!"mlcategory".equals(str)) {
                    int i2 = i;
                    i++;
                    hashMap.put(str, Integer.valueOf(i2));
                }
            }
            if (JobModelSnapshotUpgrader.this.job.getAnalysisConfig().getCategorizationFieldName() != null) {
                int i3 = i;
                i++;
                hashMap.put("...", Integer.valueOf(i3));
            }
            int i4 = i;
            int i5 = i + 1;
            hashMap.put(".", Integer.valueOf(i4));
            return hashMap;
        }

        void writeHeader() throws IOException {
            Map<String, Integer> outputFieldIndexes = outputFieldIndexes();
            String[] strArr = new String[outputFieldIndexes.size()];
            for (Map.Entry<String, Integer> entry : outputFieldIndexes.entrySet()) {
                strArr[entry.getValue().intValue()] = entry.getKey();
            }
            this.process.writeRecord(strArr);
        }

        FlushAcknowledgement waitFlushToCompletion(String str) throws Exception {
            JobModelSnapshotUpgrader.logger.debug(() -> {
                return new ParameterizedMessage("[{}] [{}] waiting for flush [{}]", new Object[]{JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId, str});
            });
            try {
                FlushAcknowledgement waitForFlushAcknowledgement = this.processor.waitForFlushAcknowledgement(str, JobModelSnapshotUpgrader.FLUSH_PROCESS_CHECK_FREQUENCY);
                while (waitForFlushAcknowledgement == null) {
                    checkProcessIsAlive();
                    checkResultsProcessorIsAlive();
                    waitForFlushAcknowledgement = this.processor.waitForFlushAcknowledgement(str, JobModelSnapshotUpgrader.FLUSH_PROCESS_CHECK_FREQUENCY);
                }
                JobModelSnapshotUpgrader.logger.debug(() -> {
                    return new ParameterizedMessage("[{}] [{}] flush completed [{}]", new Object[]{JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId, str});
                });
                return waitForFlushAcknowledgement;
            } finally {
                this.processor.clearAwaitingFlush(str);
            }
        }

        void restoreState() {
            try {
                this.process.restoreState(this.stateStreamer, JobModelSnapshotUpgrader.this.params.modelSnapshot());
                submitOperation(() -> {
                    writeHeader();
                    return waitFlushToCompletion(this.process.flushJob(FlushJobParams.builder().waitForNormalization(false).build()));
                }, (flushAcknowledgement, exc) -> {
                    Runnable runnable;
                    if (exc != null) {
                        JobModelSnapshotUpgrader.logger.error(() -> {
                            return new ParameterizedMessage("[{}] [{}] failed to flush after writing old state", JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId);
                        }, exc);
                        runnable = () -> {
                            JobModelSnapshotUpgrader.this.setTaskToFailed("Failed to flush after writing old state due to: " + exc.getMessage(), ActionListener.wrap(persistentTask -> {
                                shutdown(exc);
                            }, exc -> {
                                shutdown(exc);
                            }));
                        };
                    } else {
                        JobModelSnapshotUpgrader.logger.debug(() -> {
                            return new ParameterizedMessage("[{}] [{}] flush [{}] acknowledged requesting state write", new Object[]{JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId, flushAcknowledgement.getId()});
                        });
                        runnable = this::requestStateWrite;
                    }
                    JobModelSnapshotUpgrader.this.threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(runnable);
                });
            } catch (Exception e) {
                JobModelSnapshotUpgrader.logger.error(() -> {
                    return new ParameterizedMessage("[{}] [{}] failed to write old state", JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId);
                }, e);
                JobModelSnapshotUpgrader.this.setTaskToFailed("Failed to write old state due to: " + e.getMessage(), ActionListener.wrap(persistentTask -> {
                    shutdown(e);
                }, exc2 -> {
                    shutdown(e);
                }));
            }
        }

        private void requestStateWrite() {
            JobModelSnapshotUpgrader.this.task.updatePersistentTaskState(new SnapshotUpgradeTaskState(SnapshotUpgradeState.SAVING_NEW_STATE, JobModelSnapshotUpgrader.this.task.getAllocationId(), ""), ActionListener.wrap(persistentTask -> {
                if (!((Boolean) JobModelSnapshotUpgrader.this.continueRunning.get()).booleanValue()) {
                    shutdown(null);
                } else {
                    submitOperation(() -> {
                        this.process.persistState(JobModelSnapshotUpgrader.this.params.modelSnapshot().getTimestamp().getTime(), JobModelSnapshotUpgrader.this.params.modelSnapshot().getSnapshotId(), JobModelSnapshotUpgrader.this.params.modelSnapshot().getDescription());
                        return null;
                    }, (obj, exc) -> {
                        JobModelSnapshotUpgrader.this.threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
                            shutdown(exc);
                        });
                    });
                    JobModelSnapshotUpgrader.logger.info("asked for state to be persisted");
                }
            }, exc -> {
                JobModelSnapshotUpgrader.logger.error(() -> {
                    return new ParameterizedMessage("[{}] [{}] failed to update snapshot upgrader task to started", JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId);
                }, exc);
                shutdown(new ElasticsearchStatusException("Failed to start snapshot upgrade [{}] for job [{}]", RestStatus.INTERNAL_SERVER_ERROR, exc, new Object[]{JobModelSnapshotUpgrader.this.snapshotId, JobModelSnapshotUpgrader.this.jobId}));
            }));
        }

        private <T> void submitOperation(final CheckedSupplier<T, Exception> checkedSupplier, final BiConsumer<T, Exception> biConsumer) {
            this.autodetectWorkerExecutor.execute(new AbstractRunnable() { // from class: org.elasticsearch.xpack.ml.job.process.autodetect.JobModelSnapshotUpgrader.Executor.1
                public void onFailure(Exception exc) {
                    if (!((Boolean) JobModelSnapshotUpgrader.this.continueRunning.get()).booleanValue()) {
                        biConsumer.accept(null, ExceptionsHelper.conflictStatusException("[{}] Could not submit operation to process as it has been killed", new Object[]{JobModelSnapshotUpgrader.this.job.getId()}));
                    } else {
                        JobModelSnapshotUpgrader.logger.error(new ParameterizedMessage("[{}] Unexpected exception writing to process", JobModelSnapshotUpgrader.this.job.getId()), exc);
                        biConsumer.accept(null, exc);
                    }
                }

                protected void doRun() throws Exception {
                    if (!((Boolean) JobModelSnapshotUpgrader.this.continueRunning.get()).booleanValue()) {
                        biConsumer.accept(null, ExceptionsHelper.conflictStatusException("[{}] Could not submit operation to process as it has been killed", new Object[]{JobModelSnapshotUpgrader.this.job.getId()}));
                    } else {
                        Executor.this.checkProcessIsAlive();
                        biConsumer.accept(checkedSupplier.get(), null);
                    }
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void checkProcessIsAlive() {
            if (!this.process.isProcessAlive()) {
                throw new ElasticsearchException("[{}] Unexpected death of autodetect: {}", new Object[]{JobModelSnapshotUpgrader.this.job.getId(), this.process.readError()});
            }
        }

        private void checkResultsProcessorIsAlive() {
            if (this.processor.isFailed()) {
                throw new ElasticsearchException("[{}] Unexpected death of the result processor", new Object[]{JobModelSnapshotUpgrader.this.job.getId()});
            }
        }

        void shutdown(Exception exc) {
            if (this.process.isProcessAlive()) {
                this.autodetectWorkerExecutor.execute(() -> {
                    try {
                        if (this.process.isReady()) {
                            this.process.close();
                        } else {
                            this.processor.setProcessKilled();
                            this.process.kill(true);
                            this.processor.awaitCompletion();
                        }
                    } catch (IOException | TimeoutException e) {
                        JobModelSnapshotUpgrader.logger.warn(() -> {
                            return new ParameterizedMessage("[{}] [{}] failed to shutdown process", JobModelSnapshotUpgrader.this.jobId, JobModelSnapshotUpgrader.this.snapshotId);
                        }, e);
                    } finally {
                        JobModelSnapshotUpgrader.this.onFinish.accept(exc);
                    }
                });
                this.autodetectWorkerExecutor.shutdown();
                this.stateStreamer.cancel();
            } else {
                JobModelSnapshotUpgrader.this.onFinish.accept(exc);
                this.autodetectWorkerExecutor.shutdown();
                this.stateStreamer.cancel();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobModelSnapshotUpgrader(SnapshotUpgradeTask snapshotUpgradeTask, Job job, AutodetectParams autodetectParams, ThreadPool threadPool, AutodetectProcessFactory autodetectProcessFactory, JobResultsPersister jobResultsPersister, Client client, NativeStorageProvider nativeStorageProvider, Consumer<Exception> consumer, Supplier<Boolean> supplier) {
        this.task = (SnapshotUpgradeTask) Objects.requireNonNull(snapshotUpgradeTask);
        this.job = (Job) Objects.requireNonNull(job);
        this.params = (AutodetectParams) Objects.requireNonNull(autodetectParams);
        this.threadPool = (ThreadPool) Objects.requireNonNull(threadPool);
        this.autodetectProcessFactory = (AutodetectProcessFactory) Objects.requireNonNull(autodetectProcessFactory);
        this.jobResultsPersister = (JobResultsPersister) Objects.requireNonNull(jobResultsPersister);
        this.nativeStorageProvider = (NativeStorageProvider) Objects.requireNonNull(nativeStorageProvider);
        this.client = (Client) Objects.requireNonNull(client);
        this.onFinish = (Consumer) Objects.requireNonNull(consumer);
        this.continueRunning = (Supplier) Objects.requireNonNull(supplier);
        this.jobId = snapshotUpgradeTask.getJobId();
        this.snapshotId = snapshotUpgradeTask.getSnapshotId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        ExecutorService executor = this.threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
        AutodetectProcess createAutodetectProcess = this.autodetectProcessFactory.createAutodetectProcess(this.jobId + "-" + this.snapshotId, this.job, this.params, executor, str -> {
            setTaskToFailed(str, ActionListener.wrap(persistentTask -> {
            }, exc -> {
            }));
            try {
                this.nativeStorageProvider.cleanupLocalTmpStorage(this.task.getDescription());
            } catch (IOException e) {
                logger.error(new ParameterizedMessage("[{}] [{}] failed to delete temporary files snapshot upgrade", this.jobId, this.snapshotId), e);
            }
        });
        JobSnapshotUpgraderResultProcessor jobSnapshotUpgraderResultProcessor = new JobSnapshotUpgraderResultProcessor(this.jobId, this.snapshotId, this.jobResultsPersister, createAutodetectProcess);
        try {
            ThreadContext.StoredContext stashContext = this.threadPool.getThreadContext().stashContext();
            try {
                AutodetectWorkerExecutorService autodetectWorkerExecutorService = new AutodetectWorkerExecutorService(this.threadPool.getThreadContext());
                Objects.requireNonNull(autodetectWorkerExecutorService);
                executor.submit(autodetectWorkerExecutorService::start);
                Objects.requireNonNull(jobSnapshotUpgraderResultProcessor);
                executor.submit(jobSnapshotUpgraderResultProcessor::process);
                if (stashContext != null) {
                    stashContext.close();
                }
                Executor executor2 = new Executor(new StateStreamer(this.client), jobSnapshotUpgraderResultProcessor, autodetectWorkerExecutorService, createAutodetectProcess);
                if (this.continueRunning.get().booleanValue()) {
                    executor2.execute();
                } else {
                    this.onFinish.accept(null);
                }
            } finally {
            }
        } catch (EsRejectedExecutionException e) {
            try {
                IOUtils.close(createAutodetectProcess);
            } catch (IOException e2) {
                logger.error("Can't close autodetect", e2);
            }
            this.onFinish.accept(e);
        }
    }

    void setTaskToFailed(String str, ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> actionListener) {
        PersistentTaskState snapshotUpgradeTaskState = new SnapshotUpgradeTaskState(SnapshotUpgradeState.FAILED, this.task.getAllocationId(), str);
        SnapshotUpgradeTask snapshotUpgradeTask = this.task;
        Objects.requireNonNull(actionListener);
        snapshotUpgradeTask.updatePersistentTaskState(snapshotUpgradeTaskState, ActionListener.wrap((v1) -> {
            r2.onResponse(v1);
        }, exc -> {
            logger.warn(() -> {
                return new ParameterizedMessage("[{}] [{}] failed to set task to failed", this.task.getJobId(), this.task.getSnapshotId());
            }, exc);
            actionListener.onFailure(exc);
        }));
    }
}
