package io.siddhi.core.stream.output.sink;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.core.util.StringUtil;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.error.handler.model.ErroneousEvent;
import io.siddhi.core.util.error.handler.util.ErrorOccurrence;
import io.siddhi.core.util.error.handler.util.ErrorStoreHelper;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.snapshot.state.EmptyStateHolder;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.core.util.transport.BackoffRetryCounter;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

/* loaded from: classes.dex */
public abstract class Sink<S extends State> implements SinkListener {
    private static final Logger LOG = Logger.getLogger(Sink.class);
    private SinkHandler handler;
    private SinkMapper mapper;
    private LatencyTracker mapperLatencyTracker;
    private OnErrorAction onErrorAction;
    private ScheduledExecutorService scheduledExecutorService;
    private ServiceDeploymentInfo serviceDeploymentInfo;
    private SiddhiAppContext siddhiAppContext;
    private StateHolder<S> stateHolder;
    private StreamDefinition streamDefinition;
    private StreamJunction streamJunction;
    private ThroughputTracker throughputTracker;
    private ThreadLocal<DynamicOptions> trpDynamicOptions;
    private String type;
    protected AtomicBoolean isTryingToConnect = new AtomicBoolean(false);
    private DistributedTransport.ConnectionCallback connectionCallback = null;
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private AtomicBoolean isShutdown = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.siddhi.core.stream.output.sink.Sink$2, reason: invalid class name */
    /* loaded from: classes3.dex */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction;

        static {
            int[] iArr = new int[OnErrorAction.values().length];
            $SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction = iArr;
            try {
                iArr[OnErrorAction.STREAM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction[OnErrorAction.WAIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction[OnErrorAction.STORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction[OnErrorAction.LOG.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: classes3.dex */
    public enum OnErrorAction {
        LOG,
        WAIT,
        STREAM,
        STORE
    }

    private void connectAndPublish(Object obj, DynamicOptions dynamicOptions, S s) throws ConnectionUnavailableException {
        connect();
        setConnected(true);
        publish(obj, dynamicOptions, s);
        DistributedTransport.ConnectionCallback connectionCallback = this.connectionCallback;
        if (connectionCallback != null) {
            connectionCallback.connectionEstablished();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connectWithRetry(boolean z) {
        if (this.isConnected.get()) {
            return;
        }
        if (!this.isTryingToConnect.getAndSet(true) || z) {
            try {
                connect();
                setConnected(true);
                this.isTryingToConnect.set(false);
                DistributedTransport.ConnectionCallback connectionCallback = this.connectionCallback;
                if (connectionCallback != null) {
                    connectionCallback.connectionEstablished();
                }
                this.backoffRetryCounter.reset();
            } catch (ConnectionUnavailableException e) {
                LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + ", error while connecting at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "', will retry in '" + this.backoffRetryCounter.getTimeInterval() + "'."), e);
                this.scheduledExecutorService.schedule(new Runnable() { // from class: io.siddhi.core.stream.output.sink.Sink.1
                    @Override // java.lang.Runnable
                    public void run() {
                        Sink.this.connectWithRetry(true);
                    }
                }, this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
                this.backoffRetryCounter.increment();
            } catch (RuntimeException e2) {
                LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(e2, this.siddhiAppContext)) + ", error while connecting at Sink '" + StringUtil.removeCRLFCharacters(this.type) + "' at '" + StringUtil.removeCRLFCharacters(this.streamDefinition.getId()) + "'.", e2);
                throw e2;
            }
        }
    }

    private void retryWait(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    public abstract void connect() throws ConnectionUnavailableException;

    public void connectWithRetry() {
        connectWithRetry(false);
    }

    public abstract void destroy();

    public abstract void disconnect();

    protected abstract ServiceDeploymentInfo exposeServiceDeploymentInfo();

    public final SinkHandler getHandler() {
        return this.handler;
    }

    public final SinkMapper getMapper() {
        return this.mapper;
    }

    public List<ServiceDeploymentInfo> getServiceDeploymentInfoList() {
        if (this.serviceDeploymentInfo == null) {
            return new ArrayList(0);
        }
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(this.serviceDeploymentInfo);
        return arrayList;
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public abstract String[] getSupportedDynamicOptions();

    public abstract Class[] getSupportedInputEventClasses();

    public final String getType() {
        return this.type;
    }

    protected abstract StateFactory<S> init(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext);

    public final void init(StreamDefinition streamDefinition, String str, OptionHolder optionHolder, ConfigReader configReader, SinkMapper sinkMapper, String str2, OptionHolder optionHolder2, SinkHandler sinkHandler, List<Element> list, ConfigReader configReader2, Map<String, String> map, StreamJunction streamJunction, SiddhiAppContext siddhiAppContext) {
        SiddhiAppContext siddhiAppContext2;
        Map<String, String> map2;
        this.streamDefinition = streamDefinition;
        this.type = str;
        this.streamJunction = streamJunction;
        this.siddhiAppContext = siddhiAppContext;
        OnErrorAction valueOf = OnErrorAction.valueOf(optionHolder.getOrCreateOption(SiddhiConstants.ANNOTATION_ELEMENT_ON_ERROR, "LOG").getValue().toUpperCase());
        this.onErrorAction = valueOf;
        if (valueOf == OnErrorAction.STORE && siddhiAppContext.getSiddhiContext().getErrorStore() == null) {
            LOG.error("On error action is 'STORE' for sink connected to stream " + streamDefinition.getId() + " in Siddhi App " + siddhiAppContext.getName() + " but error store is not configured in Siddhi Manager");
        }
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, streamDefinition.getId(), SiddhiConstants.METRIC_INFIX_SINKS, str);
            this.mapperLatencyTracker = QueryParserHelper.createLatencyTracker(siddhiAppContext, streamDefinition.getId(), SiddhiConstants.METRIC_INFIX_SINK_MAPPERS, str + SiddhiConstants.METRIC_DELIMITER + str2);
        }
        this.stateHolder = siddhiAppContext.generateStateHolder(streamDefinition.getId() + "-" + getClass().getName(), init(streamDefinition, optionHolder, configReader, siddhiAppContext));
        if (sinkMapper != null) {
            siddhiAppContext2 = siddhiAppContext;
            map2 = map;
            sinkMapper.init(streamDefinition, str2, optionHolder2, list, this, configReader2, this.mapperLatencyTracker, optionHolder, siddhiAppContext);
            this.mapper = sinkMapper;
        } else {
            siddhiAppContext2 = siddhiAppContext;
            map2 = map;
        }
        if (sinkHandler != null) {
            sinkHandler.initSinkHandler(siddhiAppContext.getName(), streamDefinition, new SinkHandlerCallback(sinkMapper), siddhiAppContext2);
            this.handler = sinkHandler;
        }
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        ServiceDeploymentInfo exposeServiceDeploymentInfo = exposeServiceDeploymentInfo();
        this.serviceDeploymentInfo = exposeServiceDeploymentInfo;
        if (exposeServiceDeploymentInfo != null) {
            exposeServiceDeploymentInfo.addDeploymentProperties(map2);
            return;
        }
        if (map.isEmpty()) {
            return;
        }
        throw new SiddhiAppCreationException("Deployment properties '" + map2 + "' are defined for sink '" + str + "' which does not expose a service");
    }

    public final void initOnlyTransport(StreamDefinition streamDefinition, OptionHolder optionHolder, ConfigReader configReader, String str, DistributedTransport.ConnectionCallback connectionCallback, Map<String, String> map, SiddhiAppContext siddhiAppContext) {
        this.type = str;
        this.streamDefinition = streamDefinition;
        this.connectionCallback = connectionCallback;
        this.siddhiAppContext = siddhiAppContext;
        init(streamDefinition, optionHolder, configReader, siddhiAppContext);
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        ServiceDeploymentInfo exposeServiceDeploymentInfo = exposeServiceDeploymentInfo();
        this.serviceDeploymentInfo = exposeServiceDeploymentInfo;
        if (exposeServiceDeploymentInfo != null) {
            exposeServiceDeploymentInfo.addDeploymentProperties(map);
            return;
        }
        if (map.isEmpty()) {
            return;
        }
        throw new SiddhiAppCreationException("Deployment properties '" + map + "' are defined for sink '" + str + "' which does not expose a service");
    }

    public boolean isConnected() {
        return this.isConnected.get();
    }

    public boolean isStateful() {
        StateHolder<S> stateHolder = this.stateHolder;
        return (stateHolder == null || (stateHolder instanceof EmptyStateHolder)) ? false : true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r12v0 */
    /* JADX WARN: Type inference failed for: r12v1 */
    /* JADX WARN: Type inference failed for: r12v3 */
    /* JADX WARN: Type inference failed for: r12v4 */
    /* JADX WARN: Type inference failed for: r12v6 */
    /* JADX WARN: Type inference failed for: r12v8 */
    /* JADX WARN: Type inference failed for: r16v0, types: [io.siddhi.core.stream.output.sink.Sink, io.siddhi.core.stream.output.sink.Sink<S extends io.siddhi.core.util.snapshot.state.State>] */
    public void onError(Object obj, DynamicOptions dynamicOptions, Exception exc) {
        String str;
        StateHolder<S> stateHolder;
        OnErrorAction onErrorAction;
        String str2;
        OnErrorAction onErrorAction2 = this.onErrorAction;
        String str3 = "', events dropped '";
        boolean z = false;
        if (exc instanceof ConnectionUnavailableException) {
            setConnected(false);
            DistributedTransport.ConnectionCallback connectionCallback = this.connectionCallback;
            if (connectionCallback != null) {
                connectionCallback.connectionFailed();
            }
        } else if (onErrorAction2 == OnErrorAction.WAIT) {
            LOG.error("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as on.error='wait' does not handle '" + exc.getClass().getName() + "' error: '" + exc.getMessage() + "', events dropped '" + obj + "'", exc);
            return;
        }
        try {
            int i = AnonymousClass2.$SwitchMap$io$siddhi$core$stream$output$sink$Sink$OnErrorAction[onErrorAction2.ordinal()];
            str = 1;
            boolean z2 = true;
            try {
                if (i == 1) {
                    connectWithRetry();
                    this.streamJunction.handleError(dynamicOptions.getEvent(), exc);
                    return;
                }
                try {
                    if (i != 2) {
                        if (i == 3) {
                            ErroneousEvent erroneousEvent = new ErroneousEvent(dynamicOptions.getEvent(), exc, exc.getMessage());
                            erroneousEvent.setOriginalPayload(obj);
                            ErrorStoreHelper.storeErroneousEvent(this.siddhiAppContext.getSiddhiContext().getErrorStore(), ErrorOccurrence.STORE_ON_SINK_ERROR, this.siddhiAppContext.getName(), erroneousEvent, this.streamDefinition.getId());
                            return;
                        }
                        connectWithRetry();
                        Logger logger = LOG;
                        logger.error("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as its still trying to reconnect!, events dropped '" + obj + "'");
                        if (logger.isDebugEnabled()) {
                            logger.debug(exc);
                            return;
                        }
                        return;
                    }
                    LOG.error(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(exc, this.siddhiAppContext) + ", error while connecting Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "', will retry every '5 sec'."), exc);
                    int i2 = 0;
                    while (!this.isConnected.get()) {
                        if (this.isShutdown.get()) {
                            this.isTryingToConnect.set(z);
                            return;
                        }
                        retryWait(5000L);
                        i2++;
                        if (this.isConnected.get() || this.isTryingToConnect.getAndSet(z2)) {
                            onErrorAction = onErrorAction2;
                            str2 = str3;
                        } else {
                            while (!this.isConnected.get()) {
                                if (this.isShutdown.get()) {
                                    this.isTryingToConnect.set(z);
                                    return;
                                }
                                S state = this.stateHolder.getState();
                                try {
                                    connectAndPublish(obj, dynamicOptions, state);
                                    this.isTryingToConnect.set(z);
                                    return;
                                } catch (ConnectionUnavailableException e) {
                                    this.stateHolder.returnState(state);
                                    if (i2 % 12 == 0) {
                                        LOG.warn(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(exc, this.siddhiAppContext) + ", still waiting to connect Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' retrying every '5 sec'."), exc);
                                    }
                                    OnErrorAction onErrorAction3 = onErrorAction2;
                                    String str4 = str3;
                                    retryWait(5000L);
                                    i2++;
                                    onErrorAction2 = onErrorAction3;
                                    str3 = str4;
                                    z = false;
                                } catch (Throwable th) {
                                    throw th;
                                }
                            }
                            onErrorAction = onErrorAction2;
                            str2 = str3;
                        }
                        if (i2 % 12 == 0) {
                            LOG.warn(StringUtil.removeCRLFCharacters(ExceptionUtil.getMessageWithContext(exc, this.siddhiAppContext) + ", still waiting to connect Sink '" + this.type + "' at '" + this.streamDefinition.getId() + SiddhiConstants.METRIC_DELIMITER), exc);
                        }
                        onErrorAction2 = onErrorAction;
                        str3 = str2;
                        z = false;
                        z2 = true;
                    }
                    str = str3;
                    S state2 = this.stateHolder.getState();
                    try {
                        try {
                            publish(obj, dynamicOptions, state2);
                            stateHolder = this.stateHolder;
                        } catch (ConnectionUnavailableException e2) {
                            onError(obj, dynamicOptions, exc);
                            stateHolder = this.stateHolder;
                        }
                        stateHolder.returnState(state2);
                    } finally {
                        this.stateHolder.returnState(state2);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    str = str3;
                    LOG.error("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as there is an issue when handling the error: '" + th.getMessage() + str + obj + "'", exc);
                }
            } catch (Throwable th3) {
                th = th3;
                LOG.error("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as there is an issue when handling the error: '" + th.getMessage() + str + obj + "'", exc);
            }
        } catch (Throwable th4) {
            th = th4;
            str = str3;
        }
    }

    @Deprecated
    void onError(Object obj, Exception exc) {
        DynamicOptions dynamicOptions = this.trpDynamicOptions.get();
        if (dynamicOptions != null || this.onErrorAction != OnErrorAction.WAIT) {
            onError(obj, dynamicOptions, exc);
            return;
        }
        LOG.error("Error on '" + this.siddhiAppContext.getName() + "'. Dropping event at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "' as its does not support 'WAIT' as it uses deprecated onError(Object payload, Exception e) method!, events dropped '" + obj + "'");
    }

    @Override // io.siddhi.core.stream.output.sink.SinkListener
    public final void publish(Object obj) {
        if (this.mapperLatencyTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
            this.mapperLatencyTracker.markOut();
        }
        DynamicOptions dynamicOptions = this.trpDynamicOptions.get();
        if (!isConnected()) {
            if (this.isShutdown.get()) {
                return;
            }
            onError(obj, dynamicOptions, new ConnectionUnavailableException("Connection unavailable at Sink '" + this.type + "' at '" + this.streamDefinition.getId() + "'. Connection retrying is in progress from a different thread"));
            return;
        }
        S state = this.stateHolder.getState();
        try {
            try {
                publish(obj, dynamicOptions, state);
                if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                    this.throughputTracker.eventIn();
                }
            } catch (ConnectionUnavailableException e) {
                setConnected(false);
                DistributedTransport.ConnectionCallback connectionCallback = this.connectionCallback;
                if (connectionCallback != null) {
                    connectionCallback.connectionFailed();
                }
                if (this.isTryingToConnect.getAndSet(true)) {
                    onError(obj, dynamicOptions, e);
                } else {
                    try {
                        connectAndPublish(obj, dynamicOptions, state);
                        this.isTryingToConnect.set(false);
                    } catch (ConnectionUnavailableException e2) {
                        this.isTryingToConnect.set(false);
                        onError(obj, dynamicOptions, e);
                    }
                }
            }
        } finally {
            this.stateHolder.returnState(state);
        }
    }

    public abstract void publish(Object obj, DynamicOptions dynamicOptions, S s) throws ConnectionUnavailableException;

    protected void retryPublish(Object obj) throws ConnectionUnavailableException {
        DynamicOptions dynamicOptions = this.trpDynamicOptions.get();
        S state = this.stateHolder.getState();
        try {
            publish(obj, dynamicOptions, state);
            if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTracker.eventIn();
            }
        } finally {
            this.stateHolder.returnState(state);
        }
    }

    public void setConnected(boolean z) {
        this.isConnected.set(z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTrpDynamicOptions(ThreadLocal<DynamicOptions> threadLocal) {
        this.trpDynamicOptions = threadLocal;
    }

    public void shutdown() {
        this.isShutdown.set(true);
        disconnect();
        destroy();
        setConnected(false);
        this.isTryingToConnect.set(false);
        DistributedTransport.ConnectionCallback connectionCallback = this.connectionCallback;
        if (connectionCallback != null) {
            connectionCallback.connectionFailed();
        }
    }
}
