package com.tibco.bw.palette.amqp.runtime.receive.qpid;

import com.azure.core.util.IterableStream;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusFailureReason;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverClient;
import com.tibco.bw.palette.amqp.model.utils.Constants;
import com.tibco.bw.palette.amqp.model.utils.Messages;
import com.tibco.bw.palette.amqp.runtime.RuntimeMessageBundle;
import com.tibco.bw.palette.amqp.runtime.common.BeanUtils;
import com.tibco.bw.palette.amqp.runtime.common.LoggerUtil;
import com.tibco.bw.palette.amqp.runtime.common.MessageConstructor;
import com.tibco.bw.palette.amqp.runtime.fault.AmqpConnectionException;
import com.tibco.bw.palette.amqp.runtime.fault.AmqpEventSourceFault;
import com.tibco.bw.palette.amqp.runtime.receive.AmqpBaseReceiver;
import com.tibco.bw.palette.amqp.runtime.receive.IReceiver;
import com.tibco.bw.runtime.ActivityLifecycleFault;
import com.tibco.bw.runtime.ActivityLogger;
import com.tibco.bw.runtime.EventSourceContext;
import com.tibco.bw.runtime.util.XMLUtils;
import com.tibco.bw.sharedresource.amqp.model.helper.AmqpConstants;
import com.tibco.bw.sharedresource.amqp.runtime.AmqpConnectionResource;
import com.tibco.bw.sharedresource.amqp.runtime.connection.AmqpBuildFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.message.JmsMessage;
import org.eclipse.emf.ecore.EObject;
import reactor.core.Disposable;

/* loaded from: input_file:payload/TIB_bwpluginamqp_6.4.0_common.zip:assemblies/assembly_tibco_com_tibco_bw_palette_amqp_runtime_feature_6.4.0.006.zip:source/plugins/com.tibco.bw.palette.amqp.runtime_6.4.0.006.jar:com/tibco/bw/palette/amqp/runtime/receive/qpid/QpidReceiver.class */
public class QpidReceiver<N> extends AmqpBaseReceiver implements IReceiver {
    Connection connection;
    ActivityLogger activityLogger;
    EventSourceContext<N> eventSourceContext;
    String brokerType;
    private boolean isSessionEnabled;
    private String sessionId;
    private int maxConcurrentSessions;
    private String sessionReceiveType;
    int ackmode;
    int maxFetchCount;
    private Disposable subscription;
    Context context = null;
    private Thread receiverThread = null;
    JmsMessage msg = null;
    Session session = null;
    MessageConsumer messageReceiver = null;
    private volatile boolean qpidactive = false;
    private boolean available = true;
    private ServiceBusReceiverClient azureReceiverClient = null;
    private ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder azureProcessorClientBuilder = null;
    private ServiceBusProcessorClient azureSessionProcessor = null;
    private ServiceBusSessionReceiverClient azureSessionReceiverClient = null;
    IterableStream<ServiceBusReceivedMessage> messages = null;

    /* JADX WARN: Multi-variable type inference failed */
    public QpidReceiver(EObject eObject, AmqpConnectionResource amqpConnectionResource, MessageConstructor<N> messageConstructor, ActivityLogger activityLogger, EventSourceContext<N> eventSourceContext) {
        this.isSessionEnabled = false;
        this.sessionId = null;
        this.sessionReceiveType = null;
        this.maxFetchCount = 1;
        this.activityConfig = eObject;
        this.connInfo = amqpConnectionResource;
        this.constructor = messageConstructor;
        this.activityLogger = activityLogger;
        this.eventSourceContext = eventSourceContext;
        this.protocolVersion = Constants.PROTOCOL_VERSION_10;
        this.ackmode = BeanUtils.getAckMode(eObject);
        this.isSessionEnabled = BeanUtils.isSessionEnabled(eObject);
        this.sessionId = BeanUtils.getSessionId(this.activityConfig);
        this.maxConcurrentSessions = BeanUtils.getMaxConcurrentSessions(eObject);
        this.maxFetchCount = BeanUtils.getMaxMessages(eObject);
        this.sessionReceiveType = BeanUtils.getReceiveType(eObject);
    }

    @Override // com.tibco.bw.palette.amqp.runtime.receive.IReceiver
    public void init() throws ActivityLifecycleFault {
        AmqpBuildFactory amqpBuildFactory = this.connInfo.getAmqpBuildFactory();
        LoggerUtil.onInfo(this.activityLogger, RuntimeMessageBundle.AMQP_INFO_INFORMATION, "Finished initialize context");
        try {
            this.brokerType = this.connInfo.getAmqpConfig().getAmqpBasicConfiguration().getBrokerType();
            boolean z = false;
            String entityTypeForAzureSB = BeanUtils.getEntityTypeForAzureSB(this.activityConfig);
            if (!AmqpConstants.AZURESB.equalsIgnoreCase(this.brokerType)) {
                this.connection = amqpBuildFactory.getQpidConnection();
                String queueName = BeanUtils.getQueueName(this.activityConfig);
                try {
                    if (!this.protocolVersion.equals(Constants.PROTOCOL_VERSION_10)) {
                        this.session = this.connection.createSession(false, 2);
                    } else if (this.connection instanceof JmsConnection) {
                        this.session = ((JmsConnection) this.connection).createSession(false, this.ackmode);
                    }
                    LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "AMQP Receiver locked into entity " + queueName);
                    Queue createQueue = this.session.createQueue(queueName);
                    LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Creating the consumer");
                    this.messageReceiver = this.session.createConsumer(createQueue);
                    LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Created the consumer" + this.messageReceiver);
                    LoggerUtil.onInfo(this.activityLogger, RuntimeMessageBundle.AMQP_INFO_INFORMATION, "Activates the connection");
                    return;
                } catch (InvalidDestinationException e) {
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_SEND_MESSAGE_INVALID_DESTINATION, queueName, "Queue");
                    throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_SEND_MESSAGE_INVALID_DESTINATION.format(e.getMessage()), e);
                } catch (JMSException e2) {
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_START_CONNECTION, e2.getMessage());
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_START_CONNECTION, LoggerUtil.getStrackTrace(e2));
                    this.eventSourceContext.newEvent(new AmqpConnectionException(this.eventSourceContext, RuntimeMessageBundle.ERROR_OCCURED_START_CONNECTION.getErrorCode(), RuntimeMessageBundle.ERROR_OCCURED_START_CONNECTION.format(e2.getMessage()), e2));
                    return;
                } catch (JMSSecurityException e3) {
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_DUE_TO_SECURITY_INFO, e3.getMessage());
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_DUE_TO_SECURITY_INFO, LoggerUtil.getStrackTrace(e3));
                    throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_DUE_TO_SECURITY_INFO.format(e3.getMessage()), e3);
                }
            }
            String str = null;
            if (entityTypeForAzureSB == null) {
                String queueName2 = BeanUtils.getQueueName(this.activityConfig);
                if (queueName2 == null || queueName2.isEmpty()) {
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.AMQP_ERROR_INFORMATION, "Entity name is empty");
                    throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE.format("Entity name is empty"), new RuntimeException());
                }
                if (!this.isSessionEnabled) {
                    LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Creating the receiver");
                    this.azureReceiverClient = amqpBuildFactory.azureReceiverClient(queueName2, false, null, this.ackmode);
                    return;
                } else {
                    if (this.sessionReceiveType == null || this.sessionReceiveType.isEmpty()) {
                        LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.AMQP_ERROR_INFORMATION, "Session receive type is empty");
                        throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE.format("Session receive type cannot be empty"), new RuntimeException());
                    }
                    LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Sessions are enabled");
                    receiveFromSession(amqpBuildFactory, false, queueName2, null);
                    return;
                }
            }
            String entityName = BeanUtils.getEntityName(this.activityConfig);
            if (entityName == null || entityName.isEmpty()) {
                LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.AMQP_ERROR_INFORMATION, "Entity name is empty");
                throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE.format("Entity name is empty"), new RuntimeException());
            }
            if (entityTypeForAzureSB.equals("Topic")) {
                z = true;
                str = BeanUtils.getSubscriptionName(this.activityConfig);
                if (str == null || str.isEmpty()) {
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.AMQP_ERROR_INFORMATION, "Subscription name is empty");
                    throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE.format("Subscription name is empty"), new RuntimeException());
                }
            }
            if (!this.isSessionEnabled) {
                LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Creating the receiver");
                this.azureReceiverClient = amqpBuildFactory.azureReceiverClient(entityName, z, str, this.ackmode);
            } else {
                if (this.sessionReceiveType == null || this.sessionReceiveType.isEmpty()) {
                    LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.AMQP_ERROR_INFORMATION, "Session-id field is empty");
                    throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE.format("Session-id field is empty"), new RuntimeException());
                }
                LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Sessions are enabled");
                receiveFromSession(amqpBuildFactory, z, entityName, str);
            }
        } catch (NamingException e4) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_INITIALIZE_CONNECTIONFACTORY, e4.getMessage());
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_INITIALIZE_CONNECTIONFACTORY.format(e4.getMessage()), e4);
        } catch (IOException e5) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE, e5.getMessage());
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_GENERATE_CONNECTION_FILE.format(e5.getMessage()), e5);
        } catch (Exception e6) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_CREATE_CONNECTION, e6.getMessage());
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_CREATE_CONNECTION.format(e6.getMessage()), e6);
        }
    }

    private void receiveFromSession(AmqpBuildFactory amqpBuildFactory, boolean z, String str, String str2) {
        amqpBuildFactory.setSessionEnabled(true);
        if (!this.sessionReceiveType.equalsIgnoreCase(Messages.AMQP_ALL_AVAILABLE_SESSIONS)) {
            createSyncSessionReceiverClient(amqpBuildFactory, z, str, str2);
            return;
        }
        if (this.maxFetchCount == 1 && this.maxConcurrentSessions == 1) {
            createSyncSessionReceiverClient(amqpBuildFactory, z, str, str2);
            return;
        }
        LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Creating the callback based session processor. Max concurrent sessions = " + this.maxConcurrentSessions);
        this.azureProcessorClientBuilder = amqpBuildFactory.azureSessionProcessorClient(str, z, str2, this.ackmode);
        this.azureSessionProcessor = this.azureProcessorClientBuilder.processMessage(serviceBusReceivedMessageContext -> {
            processAzureSessionMessage(serviceBusReceivedMessageContext);
        }).processError(serviceBusErrorContext -> {
            processAzureSessionError(serviceBusErrorContext);
        }).maxConcurrentSessions(this.maxConcurrentSessions).maxConcurrentCalls(this.maxFetchCount).prefetchCount(this.maxFetchCount).buildProcessorClient();
    }

    private void createSyncSessionReceiverClient(AmqpBuildFactory amqpBuildFactory, boolean z, String str, String str2) {
        if (!this.sessionReceiveType.equals(Messages.AMQP_SPECIFIC_SESSION)) {
            LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Creating the synchronous session receiver to receive from the first available session");
            this.azureSessionReceiverClient = amqpBuildFactory.azureSessionReceiverClient(str, z, str2, this.ackmode);
            if (!this.isSessionEnabled || this.azureSessionReceiverClient == null) {
                return;
            }
            LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Acquiring lock on the first available session ");
            this.azureReceiverClient = this.azureSessionReceiverClient.acceptNextSession();
            return;
        }
        LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Creating the synchronous session receiver to receive from specific session");
        this.azureSessionReceiverClient = amqpBuildFactory.azureSessionReceiverClient(str, z, str2, this.ackmode);
        if (!this.isSessionEnabled || this.azureSessionReceiverClient == null) {
            return;
        }
        if (this.sessionId == null || this.sessionId.isEmpty()) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.AMQP_ERROR_INFORMATION, "SessionId field is empty");
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURRED_SESSION_LOCK.format("SessionId field cannot be empty"), new RuntimeException());
        }
        LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Acquiring lock on session: " + this.sessionId);
        this.azureReceiverClient = this.azureSessionReceiverClient.acceptSession(this.sessionId);
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.brokerType.equals(AmqpConstants.AZURESB)) {
            executeAzureReceiver();
            return;
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        while (this.qpidactive) {
            try {
                try {
                } catch (Exception e) {
                    this.start = false;
                    if (e.getMessage() != null) {
                        LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE, e.getMessage());
                        LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE, LoggerUtil.getStrackTrace(e));
                    }
                    try {
                        if (this.eventSourceContext != null && this.connection != null) {
                            this.eventSourceContext.newEvent(new AmqpEventSourceFault(this.eventSourceContext, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE.getErrorCode(), RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE.format(e.getMessage()), null));
                        }
                    } catch (Throwable unused) {
                        if (e.getMessage() != null) {
                            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE, e.getMessage());
                            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE, LoggerUtil.getStrackTrace(e));
                        }
                    }
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                }
                if (this.messageReceiver == null) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    return;
                }
                this.msg = (JmsMessage) this.messageReceiver.receive();
                Object constructMessage = this.constructor.constructMessage(this.msg, this.connInfo.getAmqpConfiguration().getAmqpBasicConfiguration().getBrokerType(), null, null);
                if (this.activityLogger.isDebugEnabled()) {
                    this.activityLogger.debug(RuntimeMessageBundle.DEBUG_PLUGIN_ACTIVITY_OUTPUT, new Object[]{this.eventSourceContext.getEventSourceName(), XMLUtils.serializeNode(constructMessage, this.eventSourceContext.getXMLProcessingContext()), AmqpConstants.RECEIVE_MESSAGE_ACTIVITY});
                }
                this.eventSourceContext.newEvent(constructMessage, new QpidReceiverEventContext(this.msg, this.eventSourceContext));
            } finally {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            }
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:56:0x0248. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:84:0x03b8 A[Catch: all -> 0x0487, TryCatch #2 {all -> 0x0487, blocks: (B:10:0x00da, B:14:0x00e1, B:16:0x00eb, B:18:0x00f8, B:19:0x012b, B:21:0x0138, B:23:0x010a, B:26:0x0152, B:27:0x0226, B:29:0x0175, B:32:0x0186, B:34:0x01a5, B:35:0x01dc, B:38:0x021d, B:44:0x0230, B:46:0x0145, B:55:0x023a, B:56:0x0248, B:57:0x026c, B:61:0x02fd, B:62:0x0325, B:63:0x027a, B:66:0x0326, B:67:0x0458, B:70:0x0288, B:76:0x029a, B:78:0x02a3, B:79:0x02cf, B:80:0x02d0, B:81:0x02fc, B:82:0x03ab, B:84:0x03b8, B:86:0x03e8, B:88:0x03ef, B:90:0x03f6, B:92:0x041f, B:94:0x0428, B:96:0x0464), top: B:9:0x00da, inners: #0, #4 }] */
    /* JADX WARN: Removed duplicated region for block: B:88:0x03ef A[Catch: Throwable -> 0x041f, all -> 0x0487, TryCatch #0 {Throwable -> 0x041f, blocks: (B:86:0x03e8, B:88:0x03ef, B:90:0x03f6), top: B:85:0x03e8, outer: #2 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void executeAzureReceiver() {
        /*
            Method dump skipped, instructions count: 1182
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.tibco.bw.palette.amqp.runtime.receive.qpid.QpidReceiver.executeAzureReceiver():void");
    }

    private void processAzureSessionError(ServiceBusErrorContext serviceBusErrorContext) {
        if (!(serviceBusErrorContext.getException() instanceof ServiceBusException)) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURRED_NON_SERVICEBUS_EXCEPTION, serviceBusErrorContext.getException().getMessage());
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURRED_NON_SERVICEBUS_EXCEPTION.format(serviceBusErrorContext.getException().getMessage()));
        }
        ServiceBusException serviceBusException = (ServiceBusException) serviceBusErrorContext.getException();
        ServiceBusFailureReason reason = serviceBusException.getReason();
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURRED_SESSION_UNRECOVERABLE, reason.toString(), serviceBusException.getMessage());
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURRED_SESSION_UNRECOVERABLE.format(reason.toString(), serviceBusException.getMessage()));
        }
        if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURRED_MESSAGE_LOCK_LOST, serviceBusException.getMessage());
            return;
        }
        if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
            try {
                LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURRED_SERVICE_BUSY, "1");
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException unused) {
                throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_UNABLE_TO_SLEEP.format(reason.toString(), serviceBusException.getMessage()));
            }
        } else {
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE, serviceBusErrorContext.getException().getMessage());
            LoggerUtil.onError(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE, LoggerUtil.getStrackTrace(serviceBusErrorContext.getException()));
            throw new ActivityLifecycleFault(RuntimeMessageBundle.ERROR_OCCURED_RECEIVE_MESSAGE.format(reason.toString(), serviceBusException.getMessage()));
        }
    }

    private void processAzureSessionMessage(ServiceBusReceivedMessageContext serviceBusReceivedMessageContext) {
        LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Callback process message triggered.");
        ServiceBusReceivedMessage message = serviceBusReceivedMessageContext.getMessage();
        this.available = false;
        LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Processing received messages.... ");
        if (message != null) {
            Object constructMessage = this.constructor.constructMessage(message, this.brokerType, null, null);
            if (this.activityLogger.isDebugEnabled()) {
                this.activityLogger.debug(RuntimeMessageBundle.DEBUG_PLUGIN_ACTIVITY_OUTPUT, new Object[]{this.eventSourceContext.getEventSourceName(), XMLUtils.serializeNode(constructMessage, this.eventSourceContext.getXMLProcessingContext()), AmqpConstants.RECEIVE_MESSAGE_ACTIVITY});
            }
            LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Triggering new job.... ");
            this.eventSourceContext.newEvent(constructMessage, new QpidReceiverEventContext(message, serviceBusReceivedMessageContext, (EventSourceContext) this.eventSourceContext, true, this.ackmode, true));
        }
    }

    @Override // com.tibco.bw.palette.amqp.runtime.receive.IReceiver
    public void activate() {
        this.qpidactive = true;
        if (this.isSessionEnabled && this.azureSessionProcessor != null) {
            this.azureSessionProcessor.start();
            return;
        }
        LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Starting new receiver thread.");
        this.receiverThread = new Thread(this);
        this.receiverThread.start();
        this.start = true;
    }

    @Override // com.tibco.bw.palette.amqp.runtime.receive.IReceiver
    public void deactivate(String str) {
        this.qpidactive = false;
        if (!str.equals("D")) {
            if (str.equals("S")) {
                try {
                    LoggerUtil.onDebug(this.activityLogger, RuntimeMessageBundle.AMQP_DEBUG_INFORMATION, "Stopping the receive operation.");
                    if (this.isSessionEnabled && this.azureSessionProcessor != null) {
                        this.azureSessionProcessor.stop();
                    }
                    return;
                } finally {
                    this.receiverThread = null;
                }
            }
            return;
        }
        if (this.session != null) {
            try {
                this.messageReceiver.close();
                this.session.close();
            } catch (JMSException e) {
                LoggerUtil.onWarn(this.activityLogger, RuntimeMessageBundle.ERROR_OCCURED_STOP_CONNECTION, "Interrupting the receiver thread ", e.getMessage());
            }
        }
        this.messageReceiver = null;
        this.session = null;
        this.receiverThread = null;
        if (this.subscription != null) {
            this.subscription.dispose();
            this.subscription = null;
        }
        if (this.azureReceiverClient != null) {
            this.azureReceiverClient.close();
            this.azureReceiverClient = null;
        }
        if (this.azureSessionProcessor != null) {
            this.azureSessionProcessor.close();
            this.azureSessionProcessor = null;
        }
    }
}
