package org.elasticsearch.xpack.ml.dataframe.process;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;
import org.elasticsearch.xpack.ml.utils.persistence.LimitAwareBulkIndexer;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.class */
public class DataFrameRowsJoiner implements AutoCloseable {
    private static final Logger LOGGER = LogManager.getLogger(DataFrameRowsJoiner.class);
    private static final int RESULTS_BATCH_SIZE = 1000;
    private final String analyticsId;
    private final Settings settings;
    private final TaskId parentTaskId;
    private final DataFrameDataExtractor dataExtractor;
    private final ResultsPersisterService resultsPersisterService;
    private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator = new ResultMatchingDataFrameRows();
    private LinkedList<RowResults> currentResults = new LinkedList<>();
    private volatile String failure;
    private volatile boolean isCancelled;

    /* loaded from: input_file:org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner$ResultMatchingDataFrameRows.class */
    private class ResultMatchingDataFrameRows implements Iterator<DataFrameDataExtractor.Row> {
        private List<DataFrameDataExtractor.Row> currentDataFrameRows;
        private int currentDataFrameRowsIndex;

        private ResultMatchingDataFrameRows() {
            this.currentDataFrameRows = Collections.emptyList();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return DataFrameRowsJoiner.this.dataExtractor.hasNext() || this.currentDataFrameRowsIndex < this.currentDataFrameRows.size();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public DataFrameDataExtractor.Row next() {
            DataFrameDataExtractor.Row row;
            DataFrameDataExtractor.Row row2 = null;
            while (true) {
                row = row2;
                if (!hasNoMatch(row) || !hasNext()) {
                    break;
                }
                advanceToNextBatchIfNecessary();
                List<DataFrameDataExtractor.Row> list = this.currentDataFrameRows;
                int i = this.currentDataFrameRowsIndex;
                this.currentDataFrameRowsIndex = i + 1;
                row2 = list.get(i);
            }
            if (hasNoMatch(row)) {
                throw ExceptionsHelper.serverError("no more data frame rows could be found while joining results");
            }
            return row;
        }

        private boolean hasNoMatch(DataFrameDataExtractor.Row row) {
            return row == null || row.shouldSkip() || !row.isTraining();
        }

        private void advanceToNextBatchIfNecessary() {
            if (this.currentDataFrameRowsIndex >= this.currentDataFrameRows.size()) {
                this.currentDataFrameRows = getNextDataRowsBatch().orElse(Collections.emptyList());
                this.currentDataFrameRowsIndex = 0;
            }
        }

        private Optional<List<DataFrameDataExtractor.Row>> getNextDataRowsBatch() {
            try {
                return DataFrameRowsJoiner.this.dataExtractor.next();
            } catch (IOException e) {
                throw ExceptionsHelper.serverError("error reading next batch of data frame rows [" + e.getMessage() + "]");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataFrameRowsJoiner(String str, Settings settings, TaskId taskId, DataFrameDataExtractor dataFrameDataExtractor, ResultsPersisterService resultsPersisterService) {
        this.analyticsId = (String) Objects.requireNonNull(str);
        this.settings = (Settings) Objects.requireNonNull(settings);
        this.parentTaskId = (TaskId) Objects.requireNonNull(taskId);
        this.dataExtractor = (DataFrameDataExtractor) Objects.requireNonNull(dataFrameDataExtractor);
        this.resultsPersisterService = (ResultsPersisterService) Objects.requireNonNull(resultsPersisterService);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public String getFailure() {
        return this.failure;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processRowResults(RowResults rowResults) {
        if (this.failure != null) {
            return;
        }
        try {
            addResultAndJoinIfEndOfBatch(rowResults);
        } catch (Exception e) {
            LOGGER.error(new ParameterizedMessage("[{}] Failed to join results ", this.analyticsId), e);
            this.failure = "[" + this.analyticsId + "] Failed to join results: " + e.getMessage();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel() {
        this.isCancelled = true;
    }

    private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
        this.currentResults.add(rowResults);
        if (this.currentResults.size() == 1000) {
            joinCurrentResults();
        }
    }

    private void joinCurrentResults() {
        LimitAwareBulkIndexer limitAwareBulkIndexer = new LimitAwareBulkIndexer(this.settings, (Consumer<BulkRequest>) this::executeBulkRequest);
        while (!this.currentResults.isEmpty()) {
            try {
                RowResults pop = this.currentResults.pop();
                DataFrameDataExtractor.Row next = this.dataFrameRowsIterator.next();
                checkChecksumsMatch(next, pop);
                limitAwareBulkIndexer.addAndExecuteIfNeeded(createIndexRequest(pop, next.getHit()));
            } catch (Throwable th) {
                try {
                    limitAwareBulkIndexer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        limitAwareBulkIndexer.close();
        this.currentResults = new LinkedList<>();
    }

    private void executeBulkRequest(BulkRequest bulkRequest) {
        bulkRequest.setParentTask(this.parentTaskId);
        this.resultsPersisterService.bulkIndexWithHeadersWithRetry(this.dataExtractor.getHeaders(), bulkRequest, this.analyticsId, () -> {
            return Boolean.valueOf(!this.isCancelled);
        }, str -> {
        });
    }

    private void checkChecksumsMatch(DataFrameDataExtractor.Row row, RowResults rowResults) {
        if (row.getChecksum() != rowResults.getChecksum()) {
            throw ExceptionsHelper.serverError(((("Detected checksum mismatch for document with id [" + row.getHit().getId() + "]; ") + "expected [" + row.getChecksum() + "] but result had [" + rowResults.getChecksum() + "]; ") + "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ") + "We rely on this index being immutable during a running analysis and so the results will be unreliable.");
        }
    }

    private IndexRequest createIndexRequest(RowResults rowResults, SearchHit searchHit) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(searchHit.getSourceAsMap());
        linkedHashMap.putAll(rowResults.getResults());
        IndexRequest indexRequest = new IndexRequest(searchHit.getIndex());
        indexRequest.id(searchHit.getId());
        indexRequest.source(linkedHashMap);
        indexRequest.opType(DocWriteRequest.OpType.INDEX);
        indexRequest.setParentTask(this.parentTaskId);
        return indexRequest;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            try {
                joinCurrentResults();
            } catch (Exception e) {
                LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", this.analyticsId), e);
                this.failure = "[" + this.analyticsId + "] Failed to join results: " + e.getMessage();
                try {
                    consumeDataExtractor();
                } catch (Exception e2) {
                    LOGGER.error(new ParameterizedMessage("[{}] Failed to consume data extractor", this.analyticsId), e2);
                }
            }
        } finally {
            try {
                consumeDataExtractor();
            } catch (Exception e3) {
                LOGGER.error(new ParameterizedMessage("[{}] Failed to consume data extractor", this.analyticsId), e3);
            }
        }
    }

    private void consumeDataExtractor() throws IOException {
        this.dataExtractor.cancel();
        while (this.dataExtractor.hasNext()) {
            this.dataExtractor.next();
        }
    }
}
