package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.AmqpMetricsProvider;
import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:payload/TIB_bwpluginamqp_6.4.0_common.zip:assemblies/assembly_tibco_com_tibco_bw_ms_asb_sdk_tpcl_feature_1.5.0.005.zip:source/plugins/com.tibco.bw.ms.asb.sdk.tpcl_1.5.0.004.jar:lib/azure-core-amqp-2.8.3.jar:com/azure/core/amqp/implementation/handler/SendLinkHandler.class */
public class SendLinkHandler extends LinkHandler {
    private final String linkName;
    private final String entityPath;
    private final AtomicBoolean isRemoteActive;
    private final AtomicBoolean isTerminated;
    private final Sinks.Many<Integer> creditProcessor;
    private final Sinks.Many<Delivery> deliveryProcessor;

    @Deprecated
    public SendLinkHandler(String str, String str2, String str3, String str4) {
        this(str, str2, str3, str4, new AmqpMetricsProvider(null, str2, null));
    }

    public SendLinkHandler(String str, String str2, String str3, String str4, AmqpMetricsProvider amqpMetricsProvider) {
        super(str, str2, str4, amqpMetricsProvider);
        this.isRemoteActive = new AtomicBoolean();
        this.isTerminated = new AtomicBoolean();
        this.creditProcessor = Sinks.many().unicast().onBackpressureBuffer();
        this.deliveryProcessor = Sinks.many().multicast().onBackpressureBuffer();
        this.linkName = (String) Objects.requireNonNull(str3, "'linkName' cannot be null.");
        this.entityPath = str4;
    }

    public String getLinkName() {
        return this.linkName;
    }

    public Flux<Integer> getLinkCredits() {
        return this.creditProcessor.asFlux();
    }

    public Flux<Delivery> getDeliveredMessages() {
        return this.deliveryProcessor.asFlux();
    }

    @Override // com.azure.core.amqp.implementation.handler.Handler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        this.creditProcessor.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        this.deliveryProcessor.emitComplete((signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).log("Unable to emit complete on deliverySink.");
            return false;
        });
        onNext(EndpointState.CLOSED);
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkLocalOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Sender) {
            this.logger.atVerbose().addKeyValue(ClientConstants.LINK_NAME_KEY, link.getName()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue("localTarget", link.getTarget()).log("onLinkLocalOpen");
        }
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkRemoteOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Sender) {
            LoggingEventBuilder addKeyValue = this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, link.getName()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath);
            if (link.getRemoteTarget() != null) {
                addKeyValue.addKeyValue("remoteTarget", link.getRemoteTarget());
                if (!this.isRemoteActive.getAndSet(true)) {
                    onNext(EndpointState.ACTIVE);
                }
            } else {
                addKeyValue.addKeyValue("remoteTarget", ClientConstants.NOT_APPLICABLE).addKeyValue("action", "waitingForError");
            }
            addKeyValue.log("onLinkRemoteOpen");
        }
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkFlow(Event event) {
        if (!this.isRemoteActive.getAndSet(true)) {
            onNext(EndpointState.ACTIVE);
        }
        int remoteCredit = event.getSender().getRemoteCredit();
        this.creditProcessor.emitNext(Integer.valueOf(remoteCredit), (signalType, emitResult) -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).addKeyValue(ClientConstants.EMIT_RESULT_KEY, emitResult).addKeyValue("credits", remoteCredit).log("Unable to emit credits.");
            return false;
        });
        this.logger.atVerbose().addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).addKeyValue("unsettled", r0.getUnsettled()).addKeyValue("credits", remoteCredit).log("onLinkFlow.");
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler, org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onLinkLocalClose(Event event) {
        super.onLinkLocalClose(event);
        if (this.isRemoteActive.get()) {
            return;
        }
        this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, getLinkName()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).log("Sender link was never active. Closing endpoint states.");
        super.close();
    }

    @Override // org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        while (true) {
            Delivery delivery2 = delivery;
            if (delivery2 == null) {
                return;
            }
            Sender sender = (Sender) delivery2.getLink();
            String str = new String(delivery2.getTag(), StandardCharsets.UTF_8);
            this.logger.atVerbose().addKeyValue(ClientConstants.LINK_NAME_KEY, getLinkName()).addKeyValue("unsettled", sender.getUnsettled()).addKeyValue("credit", sender.getRemoteCredit()).addKeyValue("deliveryState", delivery2.getRemoteState()).addKeyValue("delivery.isBuffered", delivery2.isBuffered()).addKeyValue("delivery.id", str).log("onDelivery");
            this.deliveryProcessor.emitNext(delivery2, (signalType, emitResult) -> {
                this.logger.atWarning().addKeyValue(ClientConstants.LINK_NAME_KEY, getLinkName()).addKeyValue(ClientConstants.EMIT_RESULT_KEY, emitResult).addKeyValue("delivery.id", str).log("Unable to emit delivery.");
                return emitResult == Sinks.EmitResult.FAIL_OVERFLOW;
            });
            delivery2.settle();
            delivery = sender.current();
        }
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ AmqpErrorContext getErrorContext(Link link) {
        return super.getErrorContext(link);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler, org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public /* bridge */ /* synthetic */ void onLinkFinal(Event event) {
        super.onLinkFinal(event);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler, org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteDetach(Event event) {
        super.onLinkRemoteDetach(event);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler, org.apache.qpid.proton.engine.BaseHandler, org.apache.qpid.proton.engine.CoreHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteClose(Event event) {
        super.onLinkRemoteClose(event);
    }
}
