/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.DefaultConnectionContext;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.JMSExceptionListener;
import org.apache.oozie.jms.MessageHandler;
import org.apache.oozie.jms.MessageReceiver;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.XLog;

public class JMSAccessorService
implements Service {
    public static final String CONF_PREFIX = "oozie.service.JMSAccessorService.";
    public static final String JMS_CONNECTION_CONTEXT_IMPL = "oozie.service.JMSAccessorService.connectioncontext.impl";
    public static final String SESSION_OPTS = "oozie.service.JMSAccessorService.jms.sessionOpts";
    public static final String CONF_RETRY_INITIAL_DELAY = "oozie.service.JMSAccessorService.retry.initial.delay";
    public static final String CONF_RETRY_MULTIPLIER = "oozie.service.JMSAccessorService.retry.multiplier";
    public static final String CONF_RETRY_MAX_ATTEMPTS = "oozie.service.JMSAccessorService.retry.max.attempts";
    private static XLog LOG;
    private Configuration conf;
    private int sessionOpts;
    private int retryInitialDelay;
    private int retryMultiplier;
    private int retryMaxAttempts;
    private ConnectionContext jmsProducerConnContext;
    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
    private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();

    @Override
    public void init(Services services) throws ServiceException {
        LOG = XLog.getLog(this.getClass());
        this.conf = services.getConf();
        this.sessionOpts = this.conf.getInt(SESSION_OPTS, 1);
        this.retryInitialDelay = this.conf.getInt(CONF_RETRY_INITIAL_DELAY, 60);
        this.retryMultiplier = this.conf.getInt(CONF_RETRY_MULTIPLIER, 2);
        this.retryMaxAttempts = this.conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
        if (!this.isTopicInRetryList(connInfo, topic)) {
            if (this.isConnectionInRetryList(connInfo)) {
                this.queueTopicForRetry(connInfo, topic, msgHandler);
            } else {
                Map<String, MessageReceiver> topicsMap = this.getReceiversTopicsMap(connInfo);
                if (!topicsMap.containsKey(topic)) {
                    Map<String, MessageReceiver> map = topicsMap;
                    synchronized (map) {
                        if (!topicsMap.containsKey(topic)) {
                            ConnectionContext connCtxt = this.createConnectionContext(connInfo);
                            if (connCtxt == null) {
                                this.queueTopicForRetry(connInfo, topic, msgHandler);
                                return;
                            }
                            MessageReceiver receiver = this.registerForTopic(connInfo, connCtxt, topic, msgHandler);
                            if (receiver == null) {
                                this.queueTopicForRetry(connInfo, topic, msgHandler);
                            } else {
                                LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
                                topicsMap.put(topic, receiver);
                            }
                        }
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
        LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
        if (this.isTopicInRetryList(connInfo, topic)) {
            this.removeTopicFromRetryList(connInfo, topic);
        } else {
            Map topicsMap = (Map)this.receiversMap.get(connInfo);
            if (topicsMap != null) {
                MessageReceiver receiver = null;
                Map map = topicsMap;
                synchronized (map) {
                    receiver = (MessageReceiver)topicsMap.remove(topic);
                    if (topicsMap.isEmpty()) {
                        this.receiversMap.remove(connInfo);
                    }
                }
                if (receiver != null) {
                    try {
                        receiver.getSession().close();
                    }
                    catch (JMSException e) {
                        LOG.warn((Object)("Unable to close session " + receiver.getSession()), e);
                    }
                } else {
                    LOG.warn("Received request to unregister from topic [{0}] on [{1}], but no matching session.", topic, connInfo);
                }
            }
        }
    }

    private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
        Map exists;
        Map topicsMap = (HashMap)this.receiversMap.get(connInfo);
        if (topicsMap == null && (exists = (Map)this.receiversMap.putIfAbsent(connInfo, topicsMap = new HashMap())) != null) {
            topicsMap = exists;
        }
        return topicsMap;
    }

    @VisibleForTesting
    boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
        Map topicsMap = (Map)this.receiversMap.get(connInfo);
        return topicsMap != null && topicsMap.containsKey(topic);
    }

    @VisibleForTesting
    boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
        return this.retryConnectionsMap.containsKey(connInfo);
    }

    @VisibleForTesting
    boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
        ConnectionRetryInfo connRetryInfo = this.retryConnectionsMap.get(connInfo);
        if (connRetryInfo == null) {
            return false;
        }
        Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
        return topicsMap.containsKey(topic);
    }

    @VisibleForTesting
    int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
        return this.retryConnectionsMap.get(connInfo).getNumAttempt();
    }

    private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
        ConnectionRetryInfo connRetryInfo = this.retryConnectionsMap.get(connInfo);
        if (connRetryInfo == null) {
            LOG.info("Queueing connection {0} for retry", connInfo);
            connRetryInfo = new ConnectionRetryInfo(0, this.retryInitialDelay);
            this.retryConnectionsMap.put(connInfo, connRetryInfo);
            this.scheduleRetry(connInfo, this.retryInitialDelay);
        }
        return connRetryInfo;
    }

    private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
        LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
        ConnectionRetryInfo connRetryInfo = this.queueConnectionForRetry(connInfo);
        Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
        topicsMap.put(topic, msgHandler);
        return connRetryInfo;
    }

    private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
        LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
        ConnectionRetryInfo connRetryInfo = this.retryConnectionsMap.get(connInfo);
        if (connRetryInfo != null) {
            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
            topicsMap.remove(topic);
        }
    }

    private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic, MessageHandler msgHandler) {
        try {
            Session session = connCtxt.createSession(this.sessionOpts);
            MessageConsumer consumer = connCtxt.createConsumer(session, topic);
            MessageReceiver receiver = new MessageReceiver(msgHandler, session, consumer);
            consumer.setMessageListener((MessageListener)receiver);
            return receiver;
        }
        catch (JMSException e) {
            LOG.warn("Error while registering to listen to topic {0} from {1}", new Object[]{topic, connInfo, e});
            return null;
        }
    }

    public ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
        ConnectionContext connCtxt = (ConnectionContext)this.connectionMap.get(connInfo);
        if (connCtxt == null) {
            try {
                connCtxt = this.getConnectionContextImpl();
                connCtxt.createConnection(connInfo.getJNDIProperties());
                connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt, true));
                this.connectionMap.put(connInfo, connCtxt);
                LOG.info("Connection established to JMS Server for [{0}]", connInfo);
            }
            catch (Exception e) {
                LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
                return null;
            }
        }
        return connCtxt;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConnectionContext createProducerConnectionContext(JMSConnectionInfo connInfo) {
        if (this.jmsProducerConnContext != null && this.jmsProducerConnContext.isConnectionInitialized()) {
            return this.jmsProducerConnContext;
        }
        JMSAccessorService jMSAccessorService = this;
        synchronized (jMSAccessorService) {
            if (this.jmsProducerConnContext == null || !this.jmsProducerConnContext.isConnectionInitialized()) {
                try {
                    this.jmsProducerConnContext = this.getConnectionContextImpl();
                    this.jmsProducerConnContext.createConnection(connInfo.getJNDIProperties());
                    this.jmsProducerConnContext.setExceptionListener(new JMSExceptionListener(connInfo, this.jmsProducerConnContext, false));
                    LOG.info("Connection established to JMS Server for [{0}]", connInfo);
                }
                catch (Exception e) {
                    LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
                    return null;
                }
            }
        }
        return this.jmsProducerConnContext;
    }

    private ConnectionContext getConnectionContextImpl() {
        Class<?> defaultClazz = ConfigurationService.getClass(this.conf, JMS_CONNECTION_CONTEXT_IMPL);
        ConnectionContext connCtx = null;
        connCtx = defaultClazz == DefaultConnectionContext.class ? new DefaultConnectionContext() : (ConnectionContext)ReflectionUtils.newInstance(defaultClazz, null);
        return connCtx;
    }

    @VisibleForTesting
    MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
        Map topicsMap = (Map)this.receiversMap.get(connInfo);
        if (topicsMap != null) {
            return (MessageReceiver)topicsMap.get(topic);
        }
        return null;
    }

    @Override
    public void destroy() {
        LOG.info("Destroying JMSAccessor service ");
        this.receiversMap.clear();
        LOG.info("Closing JMS connections");
        for (ConnectionContext conn : this.connectionMap.values()) {
            conn.close();
        }
        if (this.jmsProducerConnContext != null) {
            this.jmsProducerConnContext.close();
        }
        this.connectionMap.clear();
    }

    @Override
    public Class<? extends Service> getInterface() {
        return JMSAccessorService.class;
    }

    public void reestablishConnection(JMSConnectionInfo connInfo) {
        this.connectionMap.remove(connInfo);
        ConnectionRetryInfo connRetryInfo = this.queueConnectionForRetry(connInfo);
        Map listeningTopicsMap = (Map)this.receiversMap.remove(connInfo);
        if (listeningTopicsMap != null) {
            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
            for (Map.Entry topicEntry : listeningTopicsMap.entrySet()) {
                MessageReceiver receiver = (MessageReceiver)topicEntry.getValue();
                retryTopicsMap.put((String)topicEntry.getKey(), receiver.getMessageHandler());
            }
        }
    }

    private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
        LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
        JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
        SchedulerService scheduler = Services.get().get(SchedulerService.class);
        scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    boolean retryConnection(JMSConnectionInfo connInfo) {
        ConnectionRetryInfo connRetryInfo = this.retryConnectionsMap.get(connInfo);
        if (connRetryInfo.getNumAttempt() >= this.retryMaxAttempts) {
            LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, this.retryMaxAttempts);
            return false;
        }
        LOG.info("Attempting retry of connection [{0}]", connInfo);
        connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
        connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * this.retryMultiplier);
        ConnectionContext connCtxt = this.createConnectionContext(connInfo);
        boolean shouldRetry = false;
        if (connCtxt == null) {
            shouldRetry = true;
        } else {
            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
            Map<String, MessageReceiver> listeningTopicsMap = this.getReceiversTopicsMap(connInfo);
            ArrayList<String> topicsToRemoveList = new ArrayList<String>();
            for (Map.Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
                String topic = topicEntry.getKey();
                if (listeningTopicsMap.containsKey(topic)) continue;
                Map<String, MessageReceiver> map = listeningTopicsMap;
                synchronized (map) {
                    if (!listeningTopicsMap.containsKey(topic)) {
                        MessageReceiver receiver = this.registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
                        if (receiver == null) {
                            LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
                        } else {
                            listeningTopicsMap.put(topic, receiver);
                            topicsToRemoveList.add(topic);
                            LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
                        }
                    }
                }
            }
            for (String topic : topicsToRemoveList) {
                retryTopicsMap.remove(topic);
            }
            if (retryTopicsMap.isEmpty()) {
                shouldRetry = false;
            }
        }
        if (shouldRetry) {
            this.scheduleRetry(connInfo, connRetryInfo.getNextDelay());
        } else {
            this.retryConnectionsMap.remove(connInfo);
        }
        return true;
    }

    public class JMSRetryRunnable
    implements Runnable {
        private JMSConnectionInfo connInfo;

        public JMSRetryRunnable(JMSConnectionInfo connInfo) {
            this.connInfo = connInfo;
        }

        public JMSConnectionInfo getJMSConnectionInfo() {
            return this.connInfo;
        }

        @Override
        public void run() {
            JMSAccessorService.this.retryConnection(this.connInfo);
        }
    }

    private static class ConnectionRetryInfo {
        private int numAttempt;
        private int nextDelay;
        private Map<String, MessageHandler> retryTopicsMap;

        public ConnectionRetryInfo(int numAttempt, int nextDelay) {
            this.numAttempt = numAttempt;
            this.nextDelay = nextDelay;
            this.retryTopicsMap = new HashMap<String, MessageHandler>();
        }

        public int getNumAttempt() {
            return this.numAttempt;
        }

        public void setNumAttempt(int numAttempt) {
            this.numAttempt = numAttempt;
        }

        public int getNextDelay() {
            return this.nextDelay;
        }

        public void setNextDelay(int nextDelay) {
            this.nextDelay = nextDelay;
        }

        public Map<String, MessageHandler> getTopicsToRetry() {
            return this.retryTopicsMap;
        }
    }
}

