package com.tibco.bw.palette.salesforce.streaming.runtime;

import com.tibco.bw.palette.salesforce.runtime.resource.StreamingMessageBundle;
import com.tibco.bw.palette.salesforce.runtime.resource.xml.SchemaUtils;
import com.tibco.bw.palette.salesforce.streaming.runtime.exception.SalesforceStreamingLifecycleFault;
import com.tibco.bw.runtime.EventSourceFault;
import java.net.ConnectException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.axis.client.async.Status;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.BasicAuthentication;

/* loaded from: input_file:payload/common/assembly_tibco_com_tibco_bw_palette_salesforce_runtime_feature_6.9.0.006.zip:source/plugins/com.tibco.bw.palette.salesforce.streaming.runtime_6.9.0.005.jar:com/tibco/bw/palette/salesforce/streaming/runtime/CometDSubscriber.class */
public class CometDSubscriber<N> {
    private static final String ERROR = "error";
    private static final String FAILURE = "failure";
    public static final String PROXY_HOST = "http.proxyHost";
    public static final String PROXY_PORT = "http.proxyPort";
    public static final String PROXY_USER = "http.proxyUser";
    public static final String PROXY_PASS = "http.proxyPassword";
    public static final String SALESFORCE_PROXY_HOST = "com.tibco.plugin.salesforce.proxyHost";
    public static final String SALESFORCE_PROXY_PORT = "com.tibco.plugin.salesforce.proxyPort";
    public static final String SALESFORCE_PROXY_USER = "com.tibco.plugin.salesforce.proxyUser";
    public static final String SALESFORCE_PROXY_PASS = "com.tibco.plugin.salesforce.proxyPwd";
    public static final String SALESFORCE_BUFFER_LIMIT = "com.tibco.plugin.salesforce.streaming.buffer";
    public static final String SALESFORCE_NETWORK_DELAY = "com.tibco.plugin.salesforce.streaming.network.delay";
    private static String AUTHORIZATION = "Authorization";
    private volatile BayeuxClient client;
    private final HttpClient httpClient;
    private volatile ScheduledFuture<?> keepAlive;
    private BayeuxParameters parameters;
    private final ConcurrentMap<String, Long> replay;
    private final AtomicBoolean running;
    private CometDSubscriber<N>.Poller poller;
    private Consumer<Map<String, Object>> consumer;
    private final Set<CometDSubscriber<N>.TopicSubscription> subscriptions;

    /* loaded from: input_file:payload/common/assembly_tibco_com_tibco_bw_palette_salesforce_runtime_feature_6.9.0.006.zip:source/plugins/com.tibco.bw.palette.salesforce.streaming.runtime_6.9.0.005.jar:com/tibco/bw/palette/salesforce/streaming/runtime/CometDSubscriber$Poller.class */
    private class Poller {
        private boolean running;

        public Poller(boolean z) {
            this.running = z;
        }

        public void setRunning(boolean z) {
            this.running = z;
        }

        public Consumer<Map<String, Object>> createConsumer(StreamingModel streamingModel, Semaphore semaphore, SalesforceStreamingEventSource<N> salesforceStreamingEventSource) {
            return map -> {
                ?? r0 = this;
                try {
                    synchronized (r0) {
                    }
                } catch (Throwable th) {
                    streamingModel.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{String.format("Error : ", th.getMessage())});
                    salesforceStreamingEventSource.getEventSourceContext().newEvent(new EventSourceFault(salesforceStreamingEventSource.getEventSourceContext(), StreamingMessageBundle.PARSE_ERROR.format(new String[]{th.getLocalizedMessage()})));
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:payload/common/assembly_tibco_com_tibco_bw_palette_salesforce_runtime_feature_6.9.0.006.zip:source/plugins/com.tibco.bw.palette.salesforce.streaming.runtime_6.9.0.005.jar:com/tibco/bw/palette/salesforce/streaming/runtime/CometDSubscriber$TopicSubscription.class */
    public class TopicSubscription {
        private final String topic;
        private final Consumer<Map<String, Object>> consumer;
        private ClientSessionChannel channel;
        private StreamingModel model;

        private TopicSubscription(String str, Consumer<Map<String, Object>> consumer, StreamingModel streamingModel) {
            this.topic = str;
            this.consumer = consumer;
            this.model = streamingModel;
            CometDSubscriber.this.subscriptions.add(this);
        }

        public void cancel() {
            if (!CometDSubscriber.this.running.get() || CometDSubscriber.this.client == null) {
                return;
            }
            this.channel.unsubscribe();
        }

        public String getTopic() {
            return this.topic;
        }

        public String toString() {
            return String.format("Subscription [%s:%s]", getTopic(), Long.valueOf(CometDSubscriber.this.parameters.getReplayFrom()));
        }

        private void processMessage(Message message, CompletableFuture<CometDSubscriber<N>.TopicSubscription> completableFuture, AtomicInteger atomicInteger) {
            if (message.isSuccessful()) {
                CometDSubscriber.this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"TopicSubscription successfully suscribed to [" + CometDSubscriber.this.parameters.getTopic() + " : " + message.toString() + "]"});
                completableFuture.complete(this);
                return;
            }
            Object obj = message.get("error");
            if (obj == null) {
                obj = message.get(CometDSubscriber.FAILURE);
                if (obj instanceof Map) {
                    obj = ((Map) obj).get(Status.EXCEPTION_STR);
                }
            }
            if (atomicInteger.get() < 1 && this.model.isDBStorage() && ((this.model.getReplayFrom().equals("-1") || this.model.getReplayFrom().equals("-2")) && obj.toString().contains("you provided was invalid.  Please provide a valid ID"))) {
                atomicInteger.set(atomicInteger.get() + 1);
                CometDSubscriber.this.parameters.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{"TopicSubscription failed to suscribe to [" + CometDSubscriber.this.parameters.getTopic() + "] for reason " + obj + " retrying subscribe from " + this.model.getReplayFrom()});
                processMessageWithError(completableFuture, atomicInteger);
            } else if (obj.toString().contains("Request requires authentication")) {
                CometDSubscriber.this.connect(true);
            } else {
                CometDSubscriber.this.parameters.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{"TopicSubscription failed to suscribe to [" + CometDSubscriber.this.parameters.getTopic() + "] for reason " + obj});
                completableFuture.completeExceptionally(new SalesforceStreamingLifecycleFault(StreamingMessageBundle.GENERIC_ERROR.format(new String[]{"TopicSubscription failed to suscribe to [" + CometDSubscriber.this.parameters.getTopic() + "] for reason " + message.get("error")})));
            }
        }

        private void processMessageWithError(CompletableFuture<CometDSubscriber<N>.TopicSubscription> completableFuture, AtomicInteger atomicInteger) {
            CometDSubscriber.this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Resubscribing after error due to invalid replayid"});
            CometDSubscriber.this.replay.put(CometDSubscriber.this.parameters.getTopic(), Long.valueOf(CometDSubscriber.this.getReplayFrom(this.model.getReplayFrom())));
            this.model.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{String.format(" Replaying from : %s", CometDSubscriber.this.replay)});
            this.channel = CometDSubscriber.this.client.getChannel(String.valueOf(this.topic) + CometDSubscriber.this.parameters.getFilterString());
            CometDSubscriber.this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Fetched channel : " + this.channel.getId()});
            this.channel.subscribe((clientSessionChannel, message) -> {
                this.consumer.accept(message.getDataAsMap());
            }, message2 -> {
                processMessage(message2, completableFuture, atomicInteger);
            });
        }

        /* JADX WARN: Code restructure failed: missing block: B:10:0x0032, code lost:
        
            r0 = r0.createStatement(1003, 1007).executeQuery("Select replayid from sf_replay WHERE id='" + r11.model.getDbActivityKey() + "'");
         */
        /* JADX WARN: Code restructure failed: missing block: B:11:0x006e, code lost:
        
            if (r0.next() == false) goto L11;
         */
        /* JADX WARN: Code restructure failed: missing block: B:12:0x0071, code lost:
        
            r0 = r0.getString("replayid");
         */
        /* JADX WARN: Code restructure failed: missing block: B:14:0x0099, code lost:
        
            if (r0 == null) goto L34;
         */
        /* JADX WARN: Code restructure failed: missing block: B:15:0x009c, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x00e5, code lost:
        
            r11.this$0.replay.put(r11.this$0.parameters.getTopic(), java.lang.Long.valueOf(r11.this$0.getReplayFrom(r0)));
            r11.model.getLogger().debug(com.tibco.bw.palette.salesforce.runtime.resource.StreamingMessageBundle.GENERIC_DEBUG, new java.lang.String[]{java.lang.String.format(" Replaying from : %s", r11.this$0.replay)});
         */
        /* JADX WARN: Code restructure failed: missing block: B:18:0x0096, code lost:
        
            throw new com.tibco.bw.palette.salesforce.streaming.runtime.exception.SalesforceStreamingLifecycleFault(com.tibco.bw.palette.salesforce.runtime.resource.StreamingMessageBundle.START_ERROR.format(new java.lang.String[]{"First entry not found in database for replayid"}));
         */
        /* JADX WARN: Code restructure failed: missing block: B:20:0x00a6, code lost:
        
            r13 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:22:0x00a9, code lost:
        
            if (r0 != null) goto L19;
         */
        /* JADX WARN: Code restructure failed: missing block: B:23:0x00ac, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:25:0x00b4, code lost:
        
            throw r13;
         */
        /* JADX WARN: Code restructure failed: missing block: B:26:0x00cb, code lost:
        
            r13 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:28:0x00e4, code lost:
        
            throw new com.tibco.bw.palette.salesforce.streaming.runtime.exception.SalesforceStreamingLifecycleFault(com.tibco.bw.palette.salesforce.runtime.resource.StreamingMessageBundle.START_ERROR.format(new java.lang.String[]{"Error during connecting and fetching from database"}), r13);
         */
        /* JADX WARN: Code restructure failed: missing block: B:29:0x00b5, code lost:
        
            r14 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:2:0x000f, code lost:
        
            if (r11.model.isDBStorage() != false) goto L4;
         */
        /* JADX WARN: Code restructure failed: missing block: B:31:0x00b7, code lost:
        
            if (0 == 0) goto L25;
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x00ba, code lost:
        
            r13 = r14;
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x00ca, code lost:
        
            throw r13;
         */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x00c1, code lost:
        
            if (null != r14) goto L28;
         */
        /* JADX WARN: Code restructure failed: missing block: B:37:0x00c4, code lost:
        
            r13.addSuppressed(r14);
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x0130, code lost:
        
            r11.channel = r11.this$0.client.getChannel(java.lang.String.valueOf(r11.topic) + r11.this$0.parameters.getFilterString());
            r11.this$0.parameters.getLogger().debug(com.tibco.bw.palette.salesforce.runtime.resource.StreamingMessageBundle.GENERIC_DEBUG, new java.lang.String[]{"Fetched channel : " + r11.channel.getId()});
            r0 = new java.util.concurrent.CompletableFuture();
            r0 = new java.util.concurrent.atomic.AtomicInteger(0);
            r11.channel.subscribe((v1, v2) -> { // org.cometd.bayeux.client.ClientSessionChannel.MessageListener.onMessage(org.cometd.bayeux.client.ClientSessionChannel, org.cometd.bayeux.Message):void
                lambda$2(v1, v2);
            }, (v3) -> { // org.cometd.bayeux.client.ClientSession.MessageListener.onMessage(org.cometd.bayeux.Message):void
                lambda$3(r3, r4, v3);
            });
         */
        /* JADX WARN: Code restructure failed: missing block: B:40:0x01b8, code lost:
        
            return r0;
         */
        /* JADX WARN: Code restructure failed: missing block: B:4:0x001d, code lost:
        
            if (r11.model.getMutexLock().availablePermits() < 1) goto L40;
         */
        /* JADX WARN: Code restructure failed: missing block: B:6:0x0020, code lost:
        
            r13 = null;
         */
        /* JADX WARN: Code restructure failed: missing block: B:8:0x0024, code lost:
        
            r0 = r11.model.getDataSource().getConnection();
         */
        /* JADX WARN: Finally extract failed */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        java.util.concurrent.Future<com.tibco.bw.palette.salesforce.streaming.runtime.CometDSubscriber<N>.TopicSubscription> subscribe() {
            /*
                Method dump skipped, instructions count: 441
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.tibco.bw.palette.salesforce.streaming.runtime.CometDSubscriber.TopicSubscription.subscribe():java.util.concurrent.Future");
        }

        /* synthetic */ TopicSubscription(CometDSubscriber cometDSubscriber, String str, Consumer consumer, StreamingModel streamingModel, TopicSubscription topicSubscription) {
            this(str, consumer, streamingModel);
        }
    }

    public ConcurrentMap<String, Long> getReplay() {
        return this.replay;
    }

    public AtomicBoolean getRunning() {
        return this.running;
    }

    public CometDSubscriber(BayeuxParameters bayeuxParameters, HttpClient httpClient) {
        this(bayeuxParameters, Executors.newSingleThreadScheduledExecutor(), httpClient);
    }

    public CometDSubscriber(BayeuxParameters bayeuxParameters, ScheduledExecutorService scheduledExecutorService, HttpClient httpClient) {
        this.replay = new ConcurrentHashMap();
        this.running = new AtomicBoolean();
        this.subscriptions = new CopyOnWriteArraySet();
        this.parameters = bayeuxParameters;
        this.httpClient = new HttpClient(bayeuxParameters.getSslContextFactory());
        String property = System.getProperties().containsKey("http.proxyHost") ? System.getProperty("http.proxyHost") : System.getProperties().containsKey("com.tibco.plugin.salesforce.proxyHost") ? System.getProperty("com.tibco.plugin.salesforce.proxyHost") : null;
        String property2 = System.getProperties().containsKey("http.proxyPort") ? System.getProperty("http.proxyPort") : System.getProperties().containsKey("com.tibco.plugin.salesforce.proxyPort") ? System.getProperty("com.tibco.plugin.salesforce.proxyPort") : null;
        String property3 = System.getProperties().containsKey("http.proxyUser") ? System.getProperty("http.proxyUser") : System.getProperties().containsKey("com.tibco.plugin.salesforce.proxyUser") ? System.getProperty("com.tibco.plugin.salesforce.proxyUser") : null;
        String property4 = System.getProperties().containsKey("http.proxyPassword") ? System.getProperty("http.proxyPassword") : System.getProperties().containsKey("com.tibco.plugin.salesforce.proxyPwd") ? System.getProperty("com.tibco.plugin.salesforce.proxyPwd") : null;
        if (SchemaUtils.isStringEmpty(property) || SchemaUtils.isStringEmpty(property2)) {
            return;
        }
        HttpProxy httpProxy = new HttpProxy(property, Integer.valueOf(property2).intValue());
        this.httpClient.getProxyConfiguration().getProxies().add(httpProxy);
        bayeuxParameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Using proxy host : " + property + " and port : " + property2});
        if (SchemaUtils.isStringEmpty(property3) || SchemaUtils.isStringEmpty(property4)) {
            return;
        }
        this.httpClient.getAuthenticationStore().addAuthentication(new BasicAuthentication(httpProxy.getURI(), Authentication.ANY_REALM, property3, property4));
        bayeuxParameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Using user credentials for proxy"});
    }

    public Future<Boolean> start() {
        if (this.running.compareAndSet(false, true)) {
            return connect(false);
        }
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(true);
        return completableFuture;
    }

    public void unsubscribe() {
        this.poller.setRunning(false);
        this.poller = null;
        this.consumer = null;
        this.subscriptions.forEach(topicSubscription -> {
            topicSubscription.cancel();
        });
        this.subscriptions.clear();
    }

    public void stop() {
        if (this.keepAlive != null) {
            this.keepAlive.cancel(true);
            this.keepAlive = null;
        }
        if (this.client != null) {
            this.client.disconnect();
            this.client = null;
            this.subscriptions.clear();
        }
        this.replay.remove(this.parameters.getTopic());
        if (this.httpClient != null) {
            try {
                this.httpClient.stop();
            } catch (Exception e) {
                this.parameters.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{"Unable to stop HTTP transport :" + e.getMessage()});
            }
        }
        if (this.running.compareAndSet(true, false)) {
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10 */
    /* JADX WARN: Type inference failed for: r0v11, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v13, types: [com.tibco.bw.palette.salesforce.streaming.runtime.CometDSubscriber<N>$Poller] */
    /* JADX WARN: Type inference failed for: r0v19, types: [java.util.concurrent.Future<com.tibco.bw.palette.salesforce.streaming.runtime.CometDSubscriber<N>$TopicSubscription>] */
    /* JADX WARN: Type inference failed for: r0v9, types: [java.util.concurrent.ConcurrentMap<java.lang.String, java.lang.Long>] */
    public Future<CometDSubscriber<N>.TopicSubscription> subscribe(StreamingModel streamingModel, SalesforceStreamingEventSource<N> salesforceStreamingEventSource) {
        if (!this.running.get()) {
            throw new IllegalStateException(String.format("Connector[%s} has not been started", this.parameters.getServerURL()));
        }
        this.replay.putIfAbsent(this.parameters.getTopic(), Long.valueOf(getReplayFrom(streamingModel.getReplayFrom())));
        Future<CometDSubscriber<N>.TopicSubscription> future = (CometDSubscriber<N>.Poller) this.replay;
        synchronized (future) {
            do {
                future = this.poller;
            } while (future != 0);
            this.poller = new Poller(true);
            this.consumer = this.poller.createConsumer(streamingModel, streamingModel.getMutexLock(), salesforceStreamingEventSource);
            future = new TopicSubscription(this, this.parameters.getTopic(), this.consumer, streamingModel, null).subscribe();
        }
        return future;
    }

    public long getReplayFrom(String str) {
        try {
            return Long.parseLong(str);
        } catch (NumberFormatException unused) {
            return -2L;
        }
    }

    private void configLongTransport(LongPollingTransport longPollingTransport) {
        String property = System.getProperties().containsKey(SALESFORCE_BUFFER_LIMIT) ? System.getProperty(SALESFORCE_BUFFER_LIMIT) : null;
        if (property != null && !property.trim().isEmpty()) {
            try {
                longPollingTransport.setOption(ClientTransport.MAX_MESSAGE_SIZE_OPTION, Integer.valueOf(Integer.valueOf(property).intValue() * 1024 * 1024));
                this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Buffer Limit set to : " + property + " MB"});
            } catch (NumberFormatException unused) {
                this.parameters.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{"Buffer Limit provided is not a number. Buffer limit is set to default 1MB"});
            }
        }
        longPollingTransport.setOption(ClientTransport.MAX_NETWORK_DELAY_OPTION, 40000);
        this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Default Max Network Delay set to : 40 seconds"});
        String property2 = System.getProperties().containsKey(SALESFORCE_NETWORK_DELAY) ? System.getProperty(SALESFORCE_NETWORK_DELAY) : null;
        if (property2 == null || property2.trim().isEmpty()) {
            return;
        }
        try {
            longPollingTransport.setOption(ClientTransport.MAX_NETWORK_DELAY_OPTION, Integer.valueOf(Integer.valueOf(property2).intValue() * 1000));
            this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Max Network Delay set to : " + property2 + " seconds"});
        } catch (NumberFormatException unused2) {
            this.parameters.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{"Max Network Delay provided is not a number. Max Network Delay is set to default 40 seconds"});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<Boolean> connect(boolean z) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (!z) {
            this.replay.clear();
            try {
                this.httpClient.start();
                this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Started httpClient"});
            } catch (Exception e) {
                this.parameters.getLogger().error(StreamingMessageBundle.GENERIC_ERROR, new String[]{"Unable to start HTTP transport : " + e.getMessage()});
                throw new SalesforceStreamingLifecycleFault(StreamingMessageBundle.GENERIC_ERROR.format(new String[]{"SalesforceStreamingEventSource init failed starting httpClient " + e.getLocalizedMessage()}), e);
            }
        }
        if (z) {
            if (this.client != null) {
                this.client.disconnect();
            }
            try {
                this.parameters = new SalesforceStreamingEventSource().login(this.parameters);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        LongPollingTransport longPollingTransport = new LongPollingTransport(this.parameters.getLongPollingOptions(), this.httpClient) { // from class: com.tibco.bw.palette.salesforce.streaming.runtime.CometDSubscriber.1
            @Override // org.cometd.client.transport.LongPollingTransport
            protected void customize(Request request) {
                request.header(CometDSubscriber.AUTHORIZATION, CometDSubscriber.this.parameters.getSessionID());
            }
        };
        configLongTransport(longPollingTransport);
        this.client = new BayeuxClient(this.parameters.getServerURL().toExternalForm(), longPollingTransport, new ClientTransport[0]);
        this.client.addExtension(new ReplayAdapterExtension(this.replay, this.parameters.getLogger()));
        this.client.handshake(message -> {
            processHandshakeMessage(message, completableFuture);
        });
        return completableFuture;
    }

    private void processHandshakeMessage(Message message, CompletableFuture<Boolean> completableFuture) {
        if (message.getJSON().contains("Request requires authentication")) {
            connect(true);
            return;
        }
        if (message.isSuccessful()) {
            this.parameters.getLogger().debug(StreamingMessageBundle.GENERIC_DEBUG, new String[]{"Handshake successful : " + this.parameters.getServerURL().toString()});
            this.subscriptions.forEach((v0) -> {
                v0.subscribe();
            });
            completableFuture.complete(true);
        } else {
            Object obj = message.get("error");
            if (obj == null) {
                obj = message.get(FAILURE);
            }
            this.parameters.getLogger().info(StreamingMessageBundle.HANDSHAKE_ERROR, new String[]{this.parameters.getServerURL().toString()});
            completableFuture.completeExceptionally(new ConnectException(String.format("Cannot connect [%s] : %s", this.parameters.getServerURL(), obj)));
            this.running.set(false);
        }
    }
}
