package net.es.lookup.pubsub.amq;

import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import net.es.lookup.common.Message;
import net.es.lookup.common.exception.internal.DataFormatException;
import net.es.lookup.common.exception.internal.PubSubQueueException;
import net.es.lookup.protocol.json.JSONMessage;
import net.es.lookup.pubsub.Queue;
import net.es.lookup.utils.config.reader.QueueServiceConfigReader;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

/* loaded from: input_file:net/es/lookup/pubsub/amq/AMQueue.class */
public class AMQueue extends Queue {
    private String activemqUrl;
    private Connection connection;
    private Session session;
    private Topic topic;
    private MessageProducer producer;
    private String qid;
    private static Logger LOG = Logger.getLogger(AMQueue.class);
    private final int BATCH_SIZE = 500;
    private final long DELAY = 200;
    ConnectionFactory activemqFactory;
    long messageTtl;
    boolean isPersistent;

    public AMQueue(String str) throws PubSubQueueException {
        this.qid = "";
        String str2 = ActiveMQConnection.DEFAULT_USER;
        String str3 = ActiveMQConnection.DEFAULT_PASSWORD;
        QueueServiceConfigReader queueServiceConfigReader = QueueServiceConfigReader.getInstance();
        this.messageTtl = queueServiceConfigReader.getTtl();
        this.isPersistent = queueServiceConfigReader.isQueuePersistent();
        this.qid = str;
        this.activemqUrl = queueServiceConfigReader.getUrl();
        this.activemqFactory = new ActiveMQConnectionFactory(str2, str3, this.activemqUrl);
        LOG.info("net.es.lookup.pubsub.amq.AMQueue.AMQueue: Queue Creation Successful!");
    }

    @Override // net.es.lookup.pubsub.Queue
    public String getQid() {
        return this.qid;
    }

    @Override // net.es.lookup.pubsub.Queue
    public synchronized void push(List<Message> list) throws PubSubQueueException {
        try {
            this.connection = this.activemqFactory.createConnection();
            this.connection.start();
            LOG.debug("net.es.lookup.pubsub.amq.AMQueue.AMQueue: Created connection for queue ");
            try {
                this.session = this.connection.createSession(false, 1);
                this.topic = this.session.createTopic(this.qid);
                this.producer = this.session.createProducer(this.topic);
                this.producer.setTimeToLive(this.messageTtl);
                LOG.debug("net.es.lookup.pubsub.amq.AMQueue.AMQueue: Created ActiveMQ session, topic and producer for Queue");
                if (this.isPersistent) {
                    this.producer.setDeliveryMode(2);
                } else {
                    this.producer.setDeliveryMode(1);
                }
                if (list.size() > 500) {
                    int i = 0;
                    int i2 = 500;
                    while (true) {
                        int i3 = i2;
                        if (i >= list.size()) {
                            break;
                        }
                        try {
                            send(JSONMessage.toString(list.subList(i, i3)));
                            Thread.sleep(200L);
                            i += 500;
                            i2 = i + 500 < list.size() ? i + 500 : list.size();
                        } catch (InterruptedException e) {
                            LOG.error("net.es.lookup.pubsub.amq.AMQueue.push: Error delaying between messages - InterruptedException mapped to PubSubQueueException" + e.getMessage());
                            throw new PubSubQueueException(e.getMessage());
                        } catch (DataFormatException e2) {
                            LOG.error("net.es.lookup.pubsub.amq.AMQueue.push: Error pushing message to queue - DataFormatException mapped to PubSubQueueException" + e2.getMessage());
                            throw new PubSubQueueException(e2.getMessage());
                        }
                    }
                } else {
                    try {
                        send(JSONMessage.toString(list));
                    } catch (DataFormatException e3) {
                        LOG.error("net.es.lookup.pubsub.amq.AMQueue.push: Error pushing message to queue - DataFormatException mapped to PubSubQueueException" + e3.getMessage());
                        throw new PubSubQueueException(e3.getMessage());
                    }
                }
                close();
            } catch (JMSException e4) {
                LOG.error("net.es.lookup.pubsub.amq.AMQueue.AMQueue: Error creating session/producer for Queue. " + e4.getMessage());
                throw new PubSubQueueException(e4.getMessage());
            }
        } catch (JMSException e5) {
            LOG.error("net.es.lookup.pubsub.amq.AMQueue.AMQueue: Error creating connection for Queue. " + e5.getMessage());
            LOG.error(e5.getStackTrace());
            throw new PubSubQueueException(e5.getMessage());
        }
    }

    private synchronized void send(String str) throws PubSubQueueException {
        try {
            TextMessage createTextMessage = this.session.createTextMessage(str);
            LOG.debug("net.es.lookup.pubsub.amq.AMQueue.send: Received message to push - " + str);
            this.producer.send(createTextMessage);
            LOG.info("net.es.lookup.pubsub.amq.AMQueue.send: Pushed message to Queue - " + createTextMessage);
        } catch (JMSException e) {
            LOG.error("net.es.lookup.pubsub.amq.AMQueue.send: Error pushing message to queue - JMSException mapped to PubSubQueueException " + e.getMessage());
            throw new PubSubQueueException(e.getMessage());
        }
    }

    public void close() throws PubSubQueueException {
        try {
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
            LOG.info("net.es.lookup.pubsub.amq.AMQueue.close: Closed Queue ");
        } catch (JMSException e) {
            LOG.error("net.es.lookup.pubsub.amq.AMQueue.close: Error closing Queue - JMSException mapped to PubSubQueueException " + e.getMessage());
            throw new PubSubQueueException(e.getMessage());
        }
    }
}
