package net.es.lookup.client;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import net.es.lookup.common.ReservedKeys;
import net.es.lookup.common.ReservedValues;
import net.es.lookup.common.exception.LSClientException;
import net.es.lookup.common.exception.ParserException;
import net.es.lookup.common.exception.RecordException;
import net.es.lookup.protocol.json.JSONParser;
import net.es.lookup.queries.Query;
import net.es.lookup.records.ErrorRecord;
import net.es.lookup.records.PubSub.SubscribeRecord;
import net.es.lookup.records.Record;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

/* loaded from: input_file:lib/simple-lookup-service-client-1.1-SNAPSHOT.jar:net/es/lookup/client/Subscriber.class */
public class Subscriber {
    private SimpleLS server;
    private Query query;
    private String subscribeRequestUrl;
    private Connection conn;
    private String queueUrl;
    private String queue;
    private Session session;
    private Topic topic;
    private MessageConsumer consumer;
    private Thread recordWaitThread;
    private Subscriber instance;
    private List<WeakReference<SubscriberListener>> listeners;
    private static Logger LOG = Logger.getLogger(Subscriber.class);

    public Subscriber(SimpleLS simpleLS, Query query, String str) throws LSClientException {
        this.instance = null;
        this.server = simpleLS;
        this.server.connect();
        this.query = query;
        LOG.info("net.es.lookup.client.Subscriber: Creating Subscriber");
        if (str == null || str.isEmpty()) {
            throw new LSClientException("subscribe URL is not specified");
        }
        this.subscribeRequestUrl = str;
        this.listeners = new LinkedList();
        LOG.info("net.es.lookup.client.Subscriber: Created Subscriber listener");
        initiateSubscription();
        LOG.info("net.es.lookup.client.Subscriber: Created Subscriber");
        this.instance = this;
    }

    public SimpleLS getServer() {
        return this.server;
    }

    public Query getQuery() {
        return this.query;
    }

    public String getSubscribeRequestUrl() {
        return this.subscribeRequestUrl;
    }

    public String getQueueUrl() {
        return this.queueUrl;
    }

    public String getQueue() {
        return this.queue;
    }

    public void initiateSubscription() throws LSClientException {
        SubscribeRecord heartbeat = heartbeat();
        this.queueUrl = heartbeat.getLocator().get(0);
        this.queue = heartbeat.getQueues().get(0);
        LOG.info("net.es.lookup.client.Subscriber: Initialized Subscriber");
    }

    public SubscribeRecord heartbeat() throws LSClientException {
        if (this.server == null || !this.server.getStatus().equals(ReservedValues.SERVER_STATUS_ALIVE)) {
            LOG.debug("net.es.lookup.client.Subscriber.heartbeat: Error initializing server");
            throw new LSClientException("Subscriber heartbeat: Server Initialization Error");
        }
        LOG.info("net.es.lookup.client.Subscriber: Initiating subscription");
        LOG.debug("net.es.lookup.client.Subscriber: Parsing query");
        String str = "";
        if (this.query != null) {
            try {
                str = JSONParser.toString(this.query);
            } catch (ParserException e) {
                throw new LSClientException(e.getMessage());
            }
        }
        LOG.debug("net.es.lookup.client.Subscriber: Query=" + str);
        LOG.debug("net.es.lookup.client.Subscriber.heartbeat: Setting server config");
        this.server.setRelativeUrl(this.subscribeRequestUrl);
        this.server.setConnectionType("POST");
        this.server.setData(str);
        LOG.debug("net.es.lookup.client.Subscriber: Sending subscribe request to server");
        this.server.send();
        LOG.debug("net.es.lookup.client.Subscriber: Response Code from server=" + this.server.getResponseCode());
        if (this.server.getResponseCode() != 200) {
            LOG.debug("net.es.lookup.client.Subscriber.heartbeat: Error in response:" + this.server.getErrorMessage());
            throw new LSClientException("Heartbeat Error in response. Response code: " + this.server.getResponseCode() + ". Error Message: " + this.server.getErrorMessage());
        }
        LOG.debug("net.es.lookup.client.Subscriber: Parsing response");
        try {
            SubscribeRecord subscribeRecord = (SubscribeRecord) JSONParser.toRecord(this.server.getResponse());
            LOG.info("Sent Heartbeat message");
            return subscribeRecord;
        } catch (ParserException e2) {
            LOG.error("net.es.lookup.client.Subscriber: Error Parsing response");
            throw new LSClientException(e2.getMessage());
        }
    }

    public void startSubscription() throws LSClientException {
        LOG.info("net.es.lookup.client.Subscriber: Starting Subscriber Connection");
        try {
            this.conn = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, this.queueUrl).createConnection();
            this.session = this.conn.createSession(false, 1);
            this.topic = this.session.createTopic(this.queue);
            this.consumer = this.session.createConsumer(this.topic);
            this.conn.start();
            if (!this.listeners.isEmpty() && this.recordWaitThread == null) {
                createRecordWaitThread();
            }
            LOG.info("net.es.lookup.client.Subscriber: Started Subscriber Connection");
        } catch (JMSException e) {
            throw new LSClientException("Error starting connection: " + e.getMessage());
        }
    }

    public Record retrieveMessage() throws LSClientException {
        LOG.info("net.es.lookup.client.Subscriber: REtrieving message");
        try {
            return JSONParser.toRecord(((TextMessage) this.consumer.receive()).getText());
        } catch (JMSException e) {
            LOG.error("net.es.lookup.client.Subscriber: Connection exception" + e.getMessage());
            throw new LSClientException(e.getMessage());
        } catch (ParserException e2) {
            LOG.error("net.es.lookup.client.Subscriber: Connection exception" + e2.getMessage());
            throw new LSClientException(e2.getMessage());
        }
    }

    public synchronized SubscriberListener addListener(SubscriberListener subscriberListener) {
        LOG.info("net.es.lookup.client.Subscriber: Adding listener");
        if (subscriberListener != null) {
            this.listeners.add(new WeakReference<>(subscriberListener));
            if (this.recordWaitThread == null && this.conn != null) {
                createRecordWaitThread();
            }
        }
        LOG.info("net.es.lookup.client.Subscriber: Successfully Added listener");
        return subscriberListener;
    }

    public synchronized SubscriberListener removeListener(SubscriberListener subscriberListener) {
        LOG.info("net.es.lookup.client.Subscriber: Removing listener");
        ListIterator<WeakReference<SubscriberListener>> listIterator = this.listeners.listIterator();
        boolean z = false;
        while (true) {
            if (!listIterator.hasNext()) {
                break;
            }
            if (listIterator.next().get().equals(subscriberListener)) {
                listIterator.remove();
                z = true;
                break;
            }
        }
        if (this.listeners.isEmpty()) {
            this.recordWaitThread = null;
        }
        LOG.info("net.es.lookup.client.Subscriber: Successfully removed listener");
        if (z) {
            return subscriberListener;
        }
        return null;
    }

    private void createRecordWaitThread() {
        LOG.info("net.es.lookup.client.Subscriber: Creating message wait thread");
        this.recordWaitThread = new Thread() { // from class: net.es.lookup.client.Subscriber.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Subscriber.this.waitForRecords();
            }
        };
        this.recordWaitThread.start();
        LOG.info("Message wait Thread started");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitForRecords() {
        while (true) {
            try {
                LOG.debug("net.es.lookup.client.Subscriber: Waiting for records");
                TextMessage textMessage = (TextMessage) this.consumer.receive();
                System.out.println(textMessage);
                recordNotifier(JSONParser.toRecords(textMessage.getText()));
            } catch (JMSException e) {
                LOG.error("net.es.lookup.client.Subscriber: Error in connection" + e.getMessage());
                List<Record> arrayList = new ArrayList<>();
                ErrorRecord errorRecord = new ErrorRecord();
                try {
                    errorRecord.setErrorMessage(e.getMessage());
                } catch (RecordException e2) {
                    LOG.error("net.es.lookup.client.Subscriber: Unable to create error Record");
                }
                LOG.error("net.es.lookup.client.Subscriber: Subscriber instance" + this.instance);
                LOG.error("net.es.lookup.client.Subscriber: type" + errorRecord.getRecordType());
                errorRecord.add(ReservedKeys.SUBSCRIBER, this.instance.getSubscribeRequestUrl());
                errorRecord.add(ReservedKeys.QUEUE_URL, this.instance.getQueueUrl());
                errorRecord.add(ReservedKeys.QUEUE, this.instance.getQueue());
                arrayList.add(errorRecord);
                recordNotifier(arrayList);
                return;
            } catch (ParserException e3) {
                LOG.error("net.es.lookup.client.Subscriber: Parser error" + e3.getMessage());
            }
        }
    }

    private synchronized void recordNotifier(List<Record> list) {
        ListIterator<WeakReference<SubscriberListener>> listIterator = this.listeners.listIterator();
        while (listIterator.hasNext()) {
            LOG.info("net.es.lookup.client.Subscriber: Notifying listener");
            LOG.debug("Received records" + list.size());
            SubscriberListener subscriberListener = listIterator.next().get();
            Iterator<Record> it = list.iterator();
            while (it.hasNext()) {
                try {
                    Record duplicate = it.next().duplicate();
                    if (subscriberListener != null) {
                        try {
                            subscriberListener.onRecord(duplicate);
                        } catch (Exception e) {
                            LOG.error("net.es.lookup.client.Subscriber: Exception = " + e.getMessage());
                        }
                    } else {
                        listIterator.remove();
                    }
                } catch (RecordException e2) {
                    LOG.error("net.es.lookup.client.Subscriber: Record Exception = " + e2.getMessage());
                }
            }
        }
    }

    public void stopSubscription() throws LSClientException {
        LOG.info("net.es.lookup.client.Subscriber: Stopping connection");
        if (this.conn != null) {
            try {
                this.conn.close();
                this.recordWaitThread = null;
            } catch (JMSException e) {
                LOG.error("net.es.lookup.client.Subscriber: Connection Exception = " + e.getMessage());
                throw new LSClientException(e.getMessage());
            }
        }
    }

    public void shutdown() throws LSClientException {
        LOG.info("net.es.lookup.client.Subscriber: Stopping connection");
        if (this.conn != null) {
            try {
                this.recordWaitThread = null;
                this.listeners = null;
                this.consumer.close();
                this.conn.close();
            } catch (JMSException e) {
                LOG.error("net.es.lookup.client.Subscriber: Connection Exception = " + e.getMessage());
                throw new LSClientException(e.getMessage());
            }
        }
    }
}
