package org.apache.camel.component.file;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.regex.Pattern;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.ScheduledBatchPollingConsumer;
import org.apache.camel.support.EmptyAsyncCallback;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.StringHelper;
import org.apache.camel.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.25.0.jar:org/apache/camel/component/file/GenericFileConsumer.class */
public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
    protected final Logger log;
    protected GenericFileEndpoint<T> endpoint;
    protected GenericFileOperations<T> operations;
    protected GenericFileProcessStrategy<T> processStrategy;
    protected String fileExpressionResult;
    protected volatile ShutdownRunningTask shutdownRunningTask;
    protected volatile int pendingExchanges;
    protected Processor customProcessor;
    protected boolean eagerLimitMaxMessagesPerPoll;
    protected volatile boolean prepareOnStartup;
    private final Pattern includePattern;
    private final Pattern excludePattern;

    public GenericFileConsumer(GenericFileEndpoint<T> genericFileEndpoint, Processor processor, GenericFileOperations<T> genericFileOperations, GenericFileProcessStrategy<T> genericFileProcessStrategy) {
        super(genericFileEndpoint, processor);
        this.log = LoggerFactory.getLogger(getClass());
        this.eagerLimitMaxMessagesPerPoll = true;
        this.endpoint = genericFileEndpoint;
        this.operations = genericFileOperations;
        this.processStrategy = genericFileProcessStrategy;
        this.includePattern = genericFileEndpoint.getIncludePattern();
        this.excludePattern = genericFileEndpoint.getExcludePattern();
    }

    public Processor getCustomProcessor() {
        return this.customProcessor;
    }

    public void setCustomProcessor(Processor processor) {
        this.customProcessor = processor;
    }

    public boolean isEagerLimitMaxMessagesPerPoll() {
        return this.eagerLimitMaxMessagesPerPoll;
    }

    public void setEagerLimitMaxMessagesPerPoll(boolean z) {
        this.eagerLimitMaxMessagesPerPoll = z;
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer
    public int poll() throws Exception {
        if (!this.prepareOnStartup) {
            this.processStrategy.prepareOnStartup(this.operations, this.endpoint);
            this.prepareOnStartup = true;
        }
        this.fileExpressionResult = null;
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        if (!prePollCheck()) {
            this.log.debug("Skipping poll as pre poll check returned false");
            return 0;
        }
        List<GenericFile<T>> arrayList = new ArrayList<>();
        String directory = this.endpoint.getConfiguration().getDirectory();
        StopWatch stopWatch = new StopWatch();
        try {
            boolean z = !pollDirectory(directory, arrayList, 0);
            long taken = stopWatch.taken();
            if (this.log.isDebugEnabled()) {
                this.log.debug("Took {} to poll: {}", TimeUtils.printDuration(taken), directory);
            }
            if (z) {
                this.log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", Integer.valueOf(this.maxMessagesPerPoll));
            }
            if (this.endpoint.getSorter() != null) {
                arrayList.sort(this.endpoint.getSorter());
            }
            LinkedList linkedList = new LinkedList();
            for (GenericFile<T> genericFile : arrayList) {
                Exchange createExchange = this.endpoint.createExchange(genericFile);
                this.endpoint.configureExchange(createExchange);
                this.endpoint.configureMessage(genericFile, createExchange.getIn());
                linkedList.add(createExchange);
            }
            if (this.endpoint.getSortBy() != null) {
                linkedList.sort(this.endpoint.getSortBy());
            }
            if (this.endpoint.isShuffle()) {
                Collections.shuffle(linkedList);
            }
            if (!this.eagerLimitMaxMessagesPerPoll && this.maxMessagesPerPoll > 0 && arrayList.size() > this.maxMessagesPerPoll) {
                this.log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", Integer.valueOf(this.maxMessagesPerPoll));
                removeExcessiveInProgressFiles(linkedList, this.maxMessagesPerPoll);
            }
            int size = linkedList.size();
            if (size > 0) {
                this.log.debug("Total {} files to consume", Integer.valueOf(size));
            }
            int processBatch = processBatch(CastUtils.cast((Deque<?>) linkedList));
            postPollCheck(processBatch);
            return processBatch;
        } catch (Exception e) {
            this.log.debug("Error occurred during poll directory: {} due {}. Removing {} files marked as in-progress.", directory, e.getMessage(), Integer.valueOf(arrayList.size()));
            removeExcessiveInProgressFiles(arrayList);
            throw e;
        }
    }

    @Override // org.apache.camel.BatchConsumer
    public int processBatch(Queue<Object> queue) {
        int size = queue.size();
        int i = size;
        if (this.maxMessagesPerPoll > 0 && size > this.maxMessagesPerPoll) {
            this.log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", Integer.valueOf(this.maxMessagesPerPoll), Integer.valueOf(size));
            size = this.maxMessagesPerPoll;
        }
        int i2 = 0;
        while (i2 < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) queue.poll();
            exchange.setProperty(Exchange.BATCH_INDEX, Integer.valueOf(i2));
            exchange.setProperty(Exchange.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(Exchange.BATCH_COMPLETE, Boolean.valueOf(i2 == size - 1));
            this.pendingExchanges = (size - i2) - 1;
            if (!(this.customProcessor != null ? customProcessExchange(exchange, this.customProcessor) : processExchange(exchange))) {
                i--;
            }
            i2++;
        }
        removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) queue, Exchange.class), 0);
        return i;
    }

    protected void removeExcessiveInProgressFiles(Deque<Exchange> deque, int i) {
        while (deque.size() > i) {
            this.endpoint.getInProgressRepository().remove(((GenericFile) deque.removeLast().getProperty(FileComponent.FILE_EXCHANGE_FILE, (Class) GenericFile.class)).getAbsoluteFilePath());
        }
    }

    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> list) {
        Iterator<GenericFile<T>> it2 = list.iterator();
        while (it2.hasNext()) {
            this.endpoint.getInProgressRepository().remove(it2.next().getAbsoluteFilePath());
        }
    }

    public boolean canPollMoreFiles(List<?> list) {
        return !this.eagerLimitMaxMessagesPerPoll || this.maxMessagesPerPoll <= 0 || list.size() < this.maxMessagesPerPoll;
    }

    protected boolean prePollCheck() throws Exception {
        return true;
    }

    protected void postPollCheck(int i) {
    }

    protected abstract boolean pollDirectory(String str, List<GenericFile<T>> list, int i);

    public void setOperations(GenericFileOperations<T> genericFileOperations) {
        this.operations = genericFileOperations;
    }

    protected boolean ignoreCannotRetrieveFile(String str, Exchange exchange, Exception exc) {
        return false;
    }

    protected boolean processExchange(Exchange exchange) {
        boolean z;
        GenericFile<T> exchangeFileProperty = getExchangeFileProperty(exchange);
        this.log.trace("Processing file: {}", exchangeFileProperty);
        String absoluteFilePath = exchangeFileProperty.getAbsoluteFilePath();
        Exception exc = null;
        boolean z2 = false;
        try {
            z2 = this.processStrategy.begin(this.operations, this.endpoint, exchange, exchangeFileProperty);
        } catch (Exception e) {
            exc = e;
        }
        if (!z2) {
            Exception exc2 = null;
            this.log.debug("{} cannot begin processing file: {}", this.endpoint, exchangeFileProperty);
            try {
                this.processStrategy.abort(this.operations, this.endpoint, exchange, exchangeFileProperty);
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            } catch (Exception e2) {
                exc2 = e2;
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            } catch (Throwable th) {
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
                throw th;
            }
            if (exc != null) {
                handleException(this.endpoint + " cannot begin processing file: " + exchangeFileProperty + " due to: " + exc.getMessage(), exc);
            }
            if (exc2 == null) {
                return false;
            }
            handleException(this.endpoint + " cannot abort processing file: " + exchangeFileProperty + " due to: " + exc2.getMessage(), exc2);
            return false;
        }
        GenericFile<T> exchangeFileProperty2 = getExchangeFileProperty(exchange);
        updateFileHeaders(exchangeFileProperty2, exchange.getIn());
        String absoluteFilePath2 = exchangeFileProperty2.getAbsoluteFilePath();
        try {
            if (isRetrieveFile()) {
                this.log.trace("Retrieving file: {} from: {}", absoluteFilePath2, this.endpoint);
                Exception exc3 = null;
                try {
                    z = this.operations.retrieveFile(absoluteFilePath2, exchange, exchangeFileProperty2.getFileLength());
                } catch (Exception e3) {
                    z = false;
                    exc3 = e3;
                }
                if (!z) {
                    if (ignoreCannotRetrieveFile(absoluteFilePath2, exchange, exc3)) {
                        this.log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", absoluteFilePath2);
                        this.endpoint.getInProgressRepository().remove(absoluteFilePath);
                        return false;
                    }
                    if (exc3 instanceof GenericFileOperationFailedException) {
                        throw exc3;
                    }
                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + exchangeFileProperty + " from: " + this.endpoint, exc3);
                }
                this.log.trace("Retrieved file: {} from: {}", absoluteFilePath2, this.endpoint);
            } else {
                this.log.trace("Skipped retrieval of file: {} from: {}", absoluteFilePath2, this.endpoint);
                exchange.getIn().setBody(null);
            }
            exchange.addOnCompletion(new GenericFileOnCompletion(this.endpoint, this.operations, this.processStrategy, exchangeFileProperty2, absoluteFilePath));
            this.log.debug("About to process file: {} using exchange: {}", exchangeFileProperty2, exchange);
            if (this.endpoint.isSynchronous()) {
                getProcessor().process(exchange);
            } else {
                getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
            }
            return true;
        } catch (Exception e4) {
            this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            handleException("Error processing file " + exchangeFileProperty + " due to " + e4.getMessage(), e4);
            return true;
        }
    }

    protected abstract void updateFileHeaders(GenericFile<T> genericFile, Message message);

    protected boolean isRetrieveFile() {
        return true;
    }

    protected boolean customProcessExchange(Exchange exchange, Processor processor) {
        GenericFile<T> exchangeFileProperty = getExchangeFileProperty(exchange);
        this.log.trace("Custom processing file: {}", exchangeFileProperty);
        String absoluteFilePath = exchangeFileProperty.getAbsoluteFilePath();
        try {
            try {
                processor.process(exchange);
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
                return true;
            } catch (Exception e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug(this.endpoint + " error custom processing: " + exchangeFileProperty + " due to: " + e.getMessage() + ". This exception will be ignored.", (Throwable) e);
                }
                handleException(e);
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
                return true;
            }
        } catch (Throwable th) {
            this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isValidFile(GenericFile<T> genericFile, boolean z, List<T> list) {
        String absoluteFilePath = genericFile.getAbsoluteFilePath();
        if (!isMatched(genericFile, z, list)) {
            this.log.trace("File did not match. Will skip this file: {}", genericFile);
            return false;
        }
        if (z) {
            return true;
        }
        if (this.endpoint.getInProgressRepository().contains(absoluteFilePath)) {
            if (!this.log.isTraceEnabled()) {
                return false;
            }
            this.log.trace("Skipping as file is already in progress: {}", genericFile.getFileName());
            return false;
        }
        if (this.endpoint.isIdempotent().booleanValue()) {
            String absoluteFilePath2 = genericFile.getAbsoluteFilePath();
            if (this.endpoint.getIdempotentKey() != null) {
                absoluteFilePath2 = (String) this.endpoint.getIdempotentKey().evaluate(this.endpoint.createExchange(genericFile), String.class);
            }
            if (absoluteFilePath2 != null && this.endpoint.getIdempotentRepository().contains(absoluteFilePath2)) {
                this.log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", absoluteFilePath2, genericFile);
                return false;
            }
        }
        return this.endpoint.getInProgressRepository().add(absoluteFilePath);
    }

    protected boolean isMatched(GenericFile<T> genericFile, boolean z, List<T> list) {
        String fileNameOnly = genericFile.getFileNameOnly();
        if (fileNameOnly.startsWith(".") || fileNameOnly.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
            return false;
        }
        if (this.endpoint.getFilter() != null && !this.endpoint.getFilter().accept(genericFile)) {
            return false;
        }
        if (this.endpoint.getAntFilter() != null && !this.endpoint.getAntFilter().accept(genericFile)) {
            return false;
        }
        if (z && this.endpoint.getFilterDirectory() != null) {
            if (!this.endpoint.getFilterDirectory().matches(this.endpoint.createExchange(genericFile))) {
                return false;
            }
        }
        if (z) {
            return true;
        }
        if (this.excludePattern != null && this.excludePattern.matcher(fileNameOnly).matches()) {
            return false;
        }
        if (this.includePattern != null && !this.includePattern.matcher(fileNameOnly).matches()) {
            return false;
        }
        if (this.endpoint.getFileName() != null) {
            this.fileExpressionResult = evaluateFileExpression();
            if (this.fileExpressionResult != null && !fileNameOnly.equals(this.fileExpressionResult)) {
                return false;
            }
        }
        if (this.endpoint.getFilterFile() != null) {
            if (!this.endpoint.getFilterFile().matches(this.endpoint.createExchange(genericFile))) {
                return false;
            }
        }
        if (this.endpoint.getDoneFileName() == null) {
            return true;
        }
        String createDoneFileName = this.endpoint.createDoneFileName(genericFile.getAbsoluteFilePath());
        StringHelper.notEmpty(createDoneFileName, "doneFileName", this.endpoint);
        if (!this.endpoint.isDoneFile(genericFile.getFileNameOnly())) {
            return isMatched(genericFile, createDoneFileName, list);
        }
        this.log.trace("Skipping done file: {}", genericFile);
        return false;
    }

    protected abstract boolean isMatched(GenericFile<T> genericFile, String str, List<T> list);

    @Deprecated
    protected boolean isInProgress(GenericFile<T> genericFile) {
        return !this.endpoint.getInProgressRepository().add(genericFile.getAbsoluteFilePath());
    }

    protected String evaluateFileExpression() {
        if (this.fileExpressionResult == null && this.endpoint.getFileName() != null) {
            Exchange createExchange = this.endpoint.createExchange();
            this.fileExpressionResult = (String) this.endpoint.getFileName().evaluate(createExchange, String.class);
            if (createExchange.getException() != null) {
                throw ObjectHelper.wrapRuntimeCamelException(createExchange.getException());
            }
        }
        return this.fileExpressionResult;
    }

    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
        return (GenericFile) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer, org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    protected void doStart() throws Exception {
        if (this.processStrategy instanceof CamelContextAware) {
            ((CamelContextAware) this.processStrategy).setCamelContext(getEndpoint().getCamelContext());
        }
        ServiceHelper.startService(this.processStrategy);
        super.doStart();
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer, org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    protected void doStop() throws Exception {
        this.prepareOnStartup = false;
        super.doStop();
        ServiceHelper.stopService(this.processStrategy);
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer, org.apache.camel.PollingConsumerPollingStrategy
    public void onInit() throws Exception {
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer, org.apache.camel.PollingConsumerPollingStrategy
    public long beforePoll(long j) throws Exception {
        return j;
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer, org.apache.camel.PollingConsumerPollingStrategy
    public void afterPoll() throws Exception {
    }
}
