package io.siddhi.core.aggregation.persistedaggregation;

import io.siddhi.core.aggregation.Executor;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.query.api.aggregation.TimePeriod;
import java.sql.SQLException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: classes3.dex */
public class PersistedIncrementalExecutor implements Executor {
    private static final Logger log = Logger.getLogger(PersistedIncrementalExecutor.class);
    private Processor cudStreamProcessor;
    private TimePeriod.Duration duration;
    private boolean isProcessingExecutor;
    private Executor next;
    private final StateHolder<ExecutorState> stateHolder;
    private StreamEventFactory streamEventFactory;
    private String timeZone;
    private final ExpressionExecutor timestampExpressionExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes3.dex */
    public class ExecutorState extends State {
        private long nextEmitTime = -1;
        private long startTimeOfAggregates = -1;
        private boolean timerStarted = false;
        private boolean canDestroy = false;

        ExecutorState() {
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public boolean canDestroy() {
            return this.canDestroy;
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public void restore(Map<String, Object> map) {
            this.nextEmitTime = ((Long) map.get("NextEmitTime")).longValue();
            this.startTimeOfAggregates = ((Long) map.get("StartTimeOfAggregates")).longValue();
            this.timerStarted = ((Boolean) map.get("TimerStarted")).booleanValue();
        }

        public void setCanDestroy(boolean z) {
            this.canDestroy = z;
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put("NextEmitTime", Long.valueOf(this.nextEmitTime));
            hashMap.put("StartTimeOfAggregates", Long.valueOf(this.startTimeOfAggregates));
            hashMap.put("TimerStarted", Boolean.valueOf(this.timerStarted));
            return hashMap;
        }
    }

    public PersistedIncrementalExecutor(String str, TimePeriod.Duration duration, List<ExpressionExecutor> list, Executor executor, SiddhiQueryContext siddhiQueryContext, MetaStreamEvent metaStreamEvent, String str2, Processor processor) {
        this.timeZone = str2;
        this.duration = duration;
        this.next = executor;
        this.cudStreamProcessor = processor;
        this.timestampExpressionExecutor = list.remove(0);
        this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
        setNextExecutor(executor);
        this.stateHolder = siddhiQueryContext.generateStateHolder(str + "-" + getClass().getName(), false, new StateFactory() { // from class: io.siddhi.core.aggregation.persistedaggregation.-$$Lambda$PersistedIncrementalExecutor$5amkrN7gdilptQLvjAbTvuivtho
            @Override // io.siddhi.core.util.snapshot.state.StateFactory
            public final State createNewState() {
                return PersistedIncrementalExecutor.this.lambda$new$0$PersistedIncrementalExecutor();
            }
        });
        this.isProcessingExecutor = false;
    }

    private void dispatchAggregateEvents(long j, long j2, String str) {
        if (j2 != -1) {
            dispatchEvent(j, j2, str);
        }
    }

    private void dispatchEvent(long j, long j2, String str) {
        ZonedDateTime ofInstant = ZonedDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneId.of(str));
        ZonedDateTime ofInstant2 = ZonedDateTime.ofInstant(Instant.ofEpochMilli(j2), ZoneId.of(str));
        log.info("Aggregation event dispatched for the duration " + this.duration + " to aggregate data from " + ofInstant.toString() + " to " + ofInstant2.toString() + StringUtils.SPACE);
        ComplexEventChunk complexEventChunk = new ComplexEventChunk();
        StreamEvent newInstance = this.streamEventFactory.newInstance();
        newInstance.setType(ComplexEvent.Type.CURRENT);
        newInstance.setTimestamp(j2);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Long.valueOf(j));
        arrayList.add(Long.valueOf(j2));
        arrayList.add(null);
        newInstance.setOutputData(arrayList.toArray());
        if (!this.isProcessingExecutor) {
            if (getNextExecutor() != null) {
                this.next.execute(complexEventChunk);
                return;
            }
            return;
        }
        complexEventChunk.add(newInstance);
        int i = 0;
        while (true) {
            int i2 = i + 1;
            try {
                this.cudStreamProcessor.process(complexEventChunk);
                return;
            } catch (Exception e) {
                if (!(e.getCause() instanceof SQLException)) {
                    log.error("Error occurred while executing the aggregation for data between " + j + " - " + j2 + " for duration \n" + this.duration, e);
                    return;
                }
                if (!e.getCause().getLocalizedMessage().contains("try restarting transaction") || i2 >= 3) {
                    log.error("Error occurred while executing the aggregation for data between " + j + " - " + j2 + " for duration " + this.duration + ". Attempted re-executing the query for 9 seconds. This Should be investigated since this will lead to a data mismatch\n", e);
                }
                Logger logger = log;
                ZonedDateTime zonedDateTime = ofInstant;
                StringBuilder sb = new StringBuilder();
                sb.append("Error occurred while executing the aggregation for data between ");
                sb.append(j);
                sb.append(" - ");
                sb.append(j2);
                sb.append(" for duration ");
                sb.append(this.duration);
                sb.append(" Retrying the transaction attempt ");
                sb.append(i2 - 1);
                logger.error(sb.toString(), e);
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e2) {
                    log.error("Thread sleep interrupted while waiting to re-execute the aggregation query for duration " + this.duration, e2);
                }
                i = i2;
                ofInstant = zonedDateTime;
            }
        }
        log.error("Error occurred while executing the aggregation for data between " + j + " - " + j2 + " for duration " + this.duration + ". Attempted re-executing the query for 9 seconds. This Should be investigated since this will lead to a data mismatch\n", e);
    }

    private long getTimestamp(StreamEvent streamEvent) {
        return streamEvent.getType() == ComplexEvent.Type.CURRENT ? ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue() : streamEvent.getTimestamp();
    }

    private void sendTimerEvent(ExecutorState executorState) {
        if (getNextExecutor() != null) {
            StreamEvent newInstance = this.streamEventFactory.newInstance();
            newInstance.setType(ComplexEvent.Type.TIMER);
            newInstance.setTimestamp(executorState.startTimeOfAggregates);
            ComplexEventChunk complexEventChunk = new ComplexEventChunk();
            complexEventChunk.add(newInstance);
            this.next.execute(complexEventChunk);
        }
    }

    @Override // io.siddhi.core.aggregation.Executor
    public void execute(ComplexEventChunk complexEventChunk) {
        Logger logger = log;
        if (logger.isDebugEnabled()) {
            logger.debug("Event Chunk received by " + this.duration + " incremental executor: " + complexEventChunk.toString() + " will be dropped since persisted aggregation has been scheduled ");
        }
        complexEventChunk.reset();
        while (complexEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent) complexEventChunk.next();
            complexEventChunk.remove();
            ExecutorState state = this.stateHolder.getState();
            try {
                long timestamp = getTimestamp(streamEvent);
                if (timestamp >= state.nextEmitTime) {
                    long j = state.nextEmitTime;
                    long j2 = state.startTimeOfAggregates;
                    state.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration, this.timeZone);
                    state.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, this.timeZone);
                    dispatchAggregateEvents(j2, j, this.timeZone);
                    sendTimerEvent(state);
                }
            } finally {
                this.stateHolder.returnState(state);
            }
        }
    }

    @Override // io.siddhi.core.aggregation.Executor
    public Executor getNextExecutor() {
        return this.next;
    }

    public /* synthetic */ State lambda$new$0$PersistedIncrementalExecutor() {
        return new ExecutorState();
    }

    @Override // io.siddhi.core.aggregation.Executor
    public void setEmitTime(long j) {
        ExecutorState state = this.stateHolder.getState();
        try {
            state.nextEmitTime = j;
        } finally {
            this.stateHolder.returnState(state);
        }
    }

    @Override // io.siddhi.core.aggregation.Executor
    public void setNextExecutor(Executor executor) {
        this.next = executor;
    }

    public void setProcessingExecutor(boolean z) {
        this.isProcessingExecutor = z;
    }
}
