package io.siddhi.core.util;

import com.google.common.collect.TreeMultimap;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.util.extension.holder.ExternalReferencedHolder;
import io.siddhi.core.util.lock.LockWrapper;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.core.util.timestamp.TimestampGeneratorImpl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/* loaded from: classes3.dex */
public class Scheduler implements ExternalReferencedHolder {
    private static final Logger log = Logger.getLogger(Scheduler.class);
    private LatencyTracker latencyTracker;
    private LockWrapper lockWrapper;
    private final Semaphore mutex = new Semaphore(1);
    protected String queryName;
    private ScheduledExecutorService scheduledExecutorService;
    private SiddhiQueryContext siddhiQueryContext;
    private final Schedulable singleThreadEntryValve;
    private StateHolder<SchedulerState> stateHolder;
    private boolean stop;
    private StreamEventFactory streamEventFactory;
    private final ThreadBarrier threadBarrier;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public class EventCaller implements Runnable {
        private String key;
        private SchedulerState state;

        public EventCaller(SchedulerState schedulerState, String str) {
            this.state = schedulerState;
            this.key = str;
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            Semaphore semaphore;
            if (Scheduler.this.stop) {
                return;
            }
            SiddhiAppContext.startPartitionFlow(this.key);
            try {
                if (Scheduler.this.siddhiQueryContext.getSiddhiAppContext().isPlayback()) {
                    this.state.running = false;
                } else {
                    try {
                        Scheduler.this.sendTimerEvents(this.state);
                        Long l = (Long) this.state.toNotifyQueue.peek();
                        long currentTime = Scheduler.this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                        try {
                            if (l != null) {
                                this.state.scheduledFuture = Scheduler.this.scheduledExecutorService.schedule(this, l.longValue() - currentTime, TimeUnit.MILLISECONDS);
                            } else {
                                try {
                                    Scheduler.this.mutex.acquire();
                                    this.state.running = false;
                                    if (this.state.toNotifyQueue.peek() != null) {
                                        this.state.running = true;
                                        this.state.scheduledFuture = Scheduler.this.scheduledExecutorService.schedule(this, 0L, TimeUnit.MILLISECONDS);
                                    }
                                    semaphore = Scheduler.this.mutex;
                                } catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                    Scheduler.log.error("Error when scheduling System Time Based Scheduler", e);
                                    semaphore = Scheduler.this.mutex;
                                }
                                semaphore.release();
                            }
                        } catch (Throwable th) {
                            Scheduler.this.mutex.release();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        try {
                            Scheduler.log.error("Error while executing Scheduled Timer Event Caller, " + th.getMessage(), th);
                        } finally {
                            SiddhiAppContext.stopPartitionFlow();
                        }
                    }
                }
                SiddhiAppContext.stopPartitionFlow();
            } catch (Throwable th3) {
                th = th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes3.dex */
    public class SchedulerState extends State implements Comparable {
        private EventCaller eventCaller;
        private final String key;
        private ScheduledFuture scheduledFuture;
        private final BlockingQueue<Long> toNotifyQueue = new LinkedBlockingQueue();
        private volatile boolean running = false;

        public SchedulerState() {
            String partitionFlowId = SiddhiAppContext.getPartitionFlowId();
            this.key = partitionFlowId;
            this.eventCaller = new EventCaller(this, partitionFlowId);
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public boolean canDestroy() {
            ScheduledFuture scheduledFuture;
            return this.toNotifyQueue.isEmpty() && ((scheduledFuture = this.scheduledFuture) == null || scheduledFuture.isDone());
        }

        @Override // java.lang.Comparable
        public int compareTo(Object obj) {
            return 0;
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public void restore(Map<String, Object> map) {
            Iterator it = ((BlockingQueue) map.get("ToNotifyQueue")).iterator();
            while (it.hasNext()) {
                Scheduler.this.notifyAt(((Long) it.next()).longValue());
            }
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put("ToNotifyQueue", this.toNotifyQueue);
            return hashMap;
        }
    }

    public Scheduler(Schedulable schedulable, SiddhiQueryContext siddhiQueryContext) {
        this.threadBarrier = siddhiQueryContext.getSiddhiAppContext().getThreadBarrier();
        this.siddhiQueryContext = siddhiQueryContext;
        this.singleThreadEntryValve = schedulable;
        this.scheduledExecutorService = siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService();
        siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().addTimeChangeListener(new TimestampGeneratorImpl.TimeChangeListener() { // from class: io.siddhi.core.util.Scheduler.1
            @Override // io.siddhi.core.util.timestamp.TimestampGeneratorImpl.TimeChangeListener
            public synchronized void onTimeChange(long j) {
                Map allStates = Scheduler.this.stateHolder.getAllStates();
                try {
                    TreeMultimap create = TreeMultimap.create();
                    Iterator it = allStates.entrySet().iterator();
                    while (it.hasNext()) {
                        try {
                            for (Map.Entry entry : ((Map) ((Map.Entry) it.next()).getValue()).entrySet()) {
                                Long l = (Long) ((SchedulerState) entry.getValue()).toNotifyQueue.peek();
                                if (l != null && l.longValue() <= j) {
                                    create.put(l, entry.getValue());
                                }
                            }
                        } catch (Throwable th) {
                            th = th;
                            Scheduler.this.stateHolder.returnAllStates(allStates);
                            throw th;
                        }
                    }
                    for (Map.Entry entry2 : create.entries()) {
                        try {
                            SiddhiAppContext.startPartitionFlow(((SchedulerState) entry2.getValue()).key);
                            Scheduler.this.sendTimerEvents((SchedulerState) entry2.getValue());
                            SiddhiAppContext.stopPartitionFlow();
                        } finally {
                        }
                    }
                    Scheduler.this.stateHolder.returnAllStates(allStates);
                } catch (Throwable th2) {
                    th = th2;
                }
            }
        });
    }

    private void schedule(long j, SchedulerState schedulerState, boolean z) {
        if (this.siddhiQueryContext.getSiddhiAppContext().isPlayback() || schedulerState.running) {
            return;
        }
        if (schedulerState.toNotifyQueue.size() == 1 || z) {
            try {
                try {
                    this.mutex.acquire();
                    if (!schedulerState.running) {
                        schedulerState.running = true;
                        long currentTime = j - this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                        if (currentTime > 0) {
                            schedulerState.scheduledFuture = this.scheduledExecutorService.schedule(schedulerState.eventCaller, currentTime, TimeUnit.MILLISECONDS);
                        } else {
                            schedulerState.scheduledFuture = this.scheduledExecutorService.schedule(schedulerState.eventCaller, 0L, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("Error when scheduling System Time Based Scheduler", e);
                }
            } finally {
                this.mutex.release();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTimerEvents(SchedulerState schedulerState) {
        LockWrapper lockWrapper;
        LatencyTracker latencyTracker;
        Long l = (Long) schedulerState.toNotifyQueue.peek();
        long currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
        while (l != null && l.longValue() - currentTime <= 0) {
            schedulerState.toNotifyQueue.poll();
            StreamEvent newInstance = this.streamEventFactory.newInstance();
            newInstance.setType(ComplexEvent.Type.TIMER);
            newInstance.setTimestamp(l.longValue());
            LockWrapper lockWrapper2 = this.lockWrapper;
            if (lockWrapper2 != null) {
                lockWrapper2.lock();
            }
            this.threadBarrier.enter();
            try {
                ComplexEventChunk complexEventChunk = new ComplexEventChunk();
                complexEventChunk.add(newInstance);
                if (Level.BASIC.compareTo(this.siddhiQueryContext.getSiddhiAppContext().getRootMetricsLevel()) <= 0 && (latencyTracker = this.latencyTracker) != null) {
                    try {
                        latencyTracker.markIn();
                        this.singleThreadEntryValve.process(complexEventChunk);
                        this.latencyTracker.markOut();
                    } catch (Throwable th) {
                        this.latencyTracker.markOut();
                        throw th;
                        break;
                    }
                } else {
                    this.singleThreadEntryValve.process(complexEventChunk);
                }
                lockWrapper = this.lockWrapper;
            } finally {
                try {
                    if (lockWrapper == null) {
                        this.threadBarrier.exit();
                        l = (Long) schedulerState.toNotifyQueue.peek();
                        currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                    }
                    lockWrapper.unlock();
                    this.threadBarrier.exit();
                    l = (Long) schedulerState.toNotifyQueue.peek();
                    currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                } catch (Throwable th2) {
                }
            }
            if (lockWrapper == null) {
                this.threadBarrier.exit();
                l = (Long) schedulerState.toNotifyQueue.peek();
                currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
            }
            lockWrapper.unlock();
            this.threadBarrier.exit();
            l = (Long) schedulerState.toNotifyQueue.peek();
            currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
        }
    }

    public void init(LockWrapper lockWrapper, String str) {
        this.lockWrapper = lockWrapper;
        this.queryName = str;
        this.stateHolder = this.siddhiQueryContext.generateStateHolder("Scheduler_" + str + "_" + this.siddhiQueryContext.generateNewId(), false, new StateFactory() { // from class: io.siddhi.core.util.-$$Lambda$Scheduler$JtMCJnjzrxi7jqVXPs4zdtsbcOw
            @Override // io.siddhi.core.util.snapshot.state.StateFactory
            public final State createNewState() {
                return Scheduler.this.lambda$init$0$Scheduler();
            }
        });
    }

    public /* synthetic */ State lambda$init$0$Scheduler() {
        return new SchedulerState();
    }

    public void notifyAt(long j) {
        SchedulerState state = this.stateHolder.getState();
        try {
            try {
                state.toNotifyQueue.put(Long.valueOf(j));
                schedule(j, state, false);
            } catch (InterruptedException e) {
                if (!this.scheduledExecutorService.isShutdown()) {
                    log.error("Error when adding time:" + j + " to toNotifyQueue at Scheduler", e);
                }
            }
        } finally {
            this.stateHolder.returnState(state);
        }
    }

    public void setLatencyTracker(LatencyTracker latencyTracker) {
        this.latencyTracker = latencyTracker;
    }

    public void setStreamEventFactory(StreamEventFactory streamEventFactory) {
        this.streamEventFactory = streamEventFactory;
    }

    @Override // io.siddhi.core.util.extension.holder.ExternalReferencedHolder
    public void start() {
        this.stop = false;
    }

    @Override // io.siddhi.core.util.extension.holder.ExternalReferencedHolder
    public void stop() {
        this.stop = true;
    }

    public void switchToLiveMode() {
        Map<String, Map<String, SchedulerState>> allStates = this.stateHolder.getAllStates();
        try {
            for (Map.Entry<String, Map<String, SchedulerState>> entry : allStates.entrySet()) {
                for (Map.Entry<String, SchedulerState> entry2 : entry.getValue().entrySet()) {
                    Long l = (Long) entry2.getValue().toNotifyQueue.peek();
                    if (l != null) {
                        SiddhiAppContext.startPartitionFlow(entry.getKey());
                        SiddhiAppContext.startGroupByFlow(entry2.getKey());
                        try {
                            schedule(l.longValue(), entry2.getValue(), true);
                            SiddhiAppContext.stopGroupByFlow();
                            SiddhiAppContext.stopPartitionFlow();
                        } finally {
                        }
                    }
                }
            }
        } finally {
            this.stateHolder.returnAllStates(allStates);
        }
    }

    public void switchToPlayBackMode() {
        Map<String, Map<String, SchedulerState>> allStates = this.stateHolder.getAllStates();
        try {
            Iterator<Map.Entry<String, Map<String, SchedulerState>>> it = allStates.entrySet().iterator();
            while (it.hasNext()) {
                for (Map.Entry<String, SchedulerState> entry : it.next().getValue().entrySet()) {
                    if (entry.getValue().scheduledFuture != null) {
                        entry.getValue().scheduledFuture.cancel(true);
                    }
                    entry.getValue().running = false;
                }
            }
        } finally {
            this.stateHolder.returnAllStates(allStates);
        }
    }
}
