package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:payload/TIB_bwpluginamqp_6.4.0_common.zip:assemblies/assembly_tibco_com_tibco_bw_ms_asb_sdk_tpcl_feature_1.5.0.005.zip:source/plugins/com.tibco.bw.ms.asb.sdk.tpcl_1.5.0.004.jar:lib/azure-core-amqp-2.8.3.jar:com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.class */
public final class ReceiverUnsettledDeliveries implements AutoCloseable {
    private static final String DELIVERY_TAG_KEY = "lockToken";
    private final String hostName;
    private final String entityPath;
    private final String receiveLinkName;
    private final ReactorDispatcher dispatcher;
    private final AmqpRetryPolicy retryPolicy;
    private final Duration timeout;
    private final UUID deliveryEmptyTag;
    private final ClientLogger logger;
    private final Disposable timoutTimer;
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final ConcurrentHashMap<String, Delivery> deliveries = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, DispositionWork> pendingDispositions = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:payload/TIB_bwpluginamqp_6.4.0_common.zip:assemblies/assembly_tibco_com_tibco_bw_ms_asb_sdk_tpcl_feature_1.5.0.005.zip:source/plugins/com.tibco.bw.ms.asb.sdk.tpcl_1.5.0.004.jar:lib/azure-core-amqp-2.8.3.jar:com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries$DispositionWork.class */
    public static final class DispositionWork extends AtomicBoolean {
        private final String deliveryTag;
        private final DeliveryState desiredState;
        private final Duration timeout;
        private Mono<Void> mono;
        private Instant expirationTime;
        private Throwable rejectedOutcomeError;
        private final AtomicInteger tryCount = new AtomicInteger(1);
        private MonoSink<Void> monoSink = null;

        DispositionWork(String str, DeliveryState deliveryState, Duration duration) {
            this.deliveryTag = str;
            this.desiredState = deliveryState;
            this.timeout = duration;
        }

        String getDeliveryTag() {
            return this.deliveryTag;
        }

        DeliveryState getDesiredState() {
            return this.desiredState;
        }

        int getTryCount() {
            return this.tryCount.get();
        }

        Throwable getRejectedOutcomeError() {
            return this.rejectedOutcomeError;
        }

        boolean hasTimedout() {
            return this.expirationTime != null && this.expirationTime.isBefore(Instant.now());
        }

        Mono<Void> getMono() {
            return this.mono;
        }

        void setMono(Mono<Void> mono) {
            this.mono = mono.cache();
        }

        boolean isCompleted() {
            return get();
        }

        void onStart(MonoSink<Void> monoSink) {
            this.monoSink = monoSink;
            this.expirationTime = Instant.now().plus((TemporalAmount) this.timeout);
        }

        void onRetriableRejectedOutcome(Throwable th) {
            this.rejectedOutcomeError = th;
            this.expirationTime = Instant.now().plus((TemporalAmount) this.timeout);
            this.tryCount.incrementAndGet();
        }

        void onComplete() {
            set(true);
            Objects.requireNonNull(this.monoSink);
            this.monoSink.success();
        }

        void onComplete(Throwable th) {
            set(true);
            Objects.requireNonNull(this.monoSink);
            this.monoSink.error(th);
        }
    }

    public ReceiverUnsettledDeliveries(String str, String str2, String str3, ReactorDispatcher reactorDispatcher, AmqpRetryOptions amqpRetryOptions, UUID uuid, ClientLogger clientLogger) {
        this.hostName = str;
        this.entityPath = str2;
        this.receiveLinkName = str3;
        this.dispatcher = reactorDispatcher;
        this.retryPolicy = RetryUtil.getRetryPolicy(amqpRetryOptions);
        this.timeout = amqpRetryOptions.getTryTimeout();
        this.deliveryEmptyTag = uuid;
        this.logger = clientLogger;
        this.timoutTimer = Flux.interval(this.timeout).subscribe(l -> {
            completeDispositionWorksOnTimeout("timer");
        });
    }

    public boolean onDelivery(UUID uuid, Delivery delivery) {
        if (this.isTerminated.get()) {
            return false;
        }
        this.deliveries.putIfAbsent(uuid.toString(), delivery);
        return true;
    }

    public boolean containsDelivery(UUID uuid) {
        return uuid != this.deliveryEmptyTag && this.deliveries.containsKey(uuid.toString());
    }

    public Mono<Void> sendDisposition(String str, DeliveryState deliveryState) {
        return this.isTerminated.get() ? FluxUtil.monoError(this.logger, new IllegalStateException("Cannot perform sendDisposition on a disposed receiver.")) : sendDispositionImpl(str, deliveryState);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.qpid.proton.amqp.messaging.Outcome] */
    /* JADX WARN: Type inference failed for: r0v48, types: [org.apache.qpid.proton.amqp.messaging.Outcome] */
    public void onDispositionAck(UUID uuid, Delivery delivery) {
        DeliveryState remoteState = delivery.getRemoteState();
        this.logger.atVerbose().addKeyValue("lockToken", uuid).addKeyValue("deliveryState", remoteState).log("Received update disposition delivery.");
        Rejected outcome = remoteState instanceof Outcome ? (Outcome) remoteState : remoteState instanceof TransactionalState ? ((TransactionalState) remoteState).getOutcome() : null;
        if (outcome == null) {
            this.logger.atWarning().addKeyValue("lockToken", uuid).addKeyValue("delivery", delivery).log("No outcome associated with delivery.");
            return;
        }
        DispositionWork dispositionWork = this.pendingDispositions.get(uuid.toString());
        if (dispositionWork == null) {
            this.logger.atWarning().addKeyValue("lockToken", uuid).addKeyValue("delivery", delivery).log("No pending update for delivery.");
            return;
        }
        DeliveryState.DeliveryStateType type = dispositionWork.getDesiredState().getType();
        DeliveryState.DeliveryStateType type2 = remoteState.getType();
        if (type == type2) {
            completeDispositionWorkWithSettle(dispositionWork, delivery, null);
            return;
        }
        this.logger.atInfo().addKeyValue("lockToken", uuid).addKeyValue("receivedDeliveryState", remoteState).addKeyValue("deliveryState", dispositionWork.getDesiredState()).log("Received delivery state doesn't match expected state.");
        if (type2 == DeliveryState.DeliveryStateType.Rejected) {
            handleRetriableRejectedRemoteOutcome(dispositionWork, delivery, outcome);
        } else {
            handleReleasedOrUnknownRemoteOutcome(dispositionWork, delivery, outcome);
        }
    }

    public Mono<Void> terminateAndAwaitForDispositionsInProgressToComplete() {
        Mono<Void> empty;
        this.isTerminated.getAndSet(true);
        completeDispositionWorksOnTimeout("terminateAndAwaitForDispositionsInProgressToComplete");
        ArrayList arrayList = new ArrayList();
        StringJoiner stringJoiner = new StringJoiner(", ");
        for (DispositionWork dispositionWork : this.pendingDispositions.values()) {
            if (dispositionWork != null && !dispositionWork.hasTimedout()) {
                if (dispositionWork.getDesiredState() instanceof TransactionalState) {
                    arrayList.add(sendDispositionImpl(dispositionWork.getDeliveryTag(), Released.getInstance()));
                } else {
                    arrayList.add(dispositionWork.getMono());
                }
                stringJoiner.add(dispositionWork.getDeliveryTag());
            }
        }
        if (arrayList.isEmpty()) {
            empty = Mono.empty();
        } else {
            this.logger.info("Waiting for pending updates to complete. Locks: {}", stringJoiner.toString());
            empty = Mono.whenDelayError(arrayList).onErrorResume(th -> {
                this.logger.info("There was exception(s) while disposing of all disposition work.", th);
                return Mono.empty();
            });
        }
        return empty.doFinally(signalType -> {
            this.timoutTimer.dispose();
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isTerminated.getAndSet(true);
        this.timoutTimer.dispose();
        completeDispositionWorksOnClose();
    }

    private Mono<Void> sendDispositionImpl(String str, DeliveryState deliveryState) {
        Delivery delivery = this.deliveries.get(str);
        if (delivery == null) {
            this.logger.atWarning().addKeyValue("lockToken", str).log("Delivery not found to update disposition.");
            return FluxUtil.monoError(this.logger, Exceptions.propagate(new IllegalArgumentException("Delivery not on receive link.")));
        }
        DispositionWork dispositionWork = new DispositionWork(str, deliveryState, this.timeout);
        dispositionWork.setMono(Mono.create(monoSink -> {
            dispositionWork.onStart(monoSink);
            try {
                this.dispatcher.invoke(() -> {
                    delivery.disposition(deliveryState);
                    if (this.pendingDispositions.putIfAbsent(str, dispositionWork) != null) {
                        dispositionWork.onComplete(new AmqpException(false, "A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.", null));
                    }
                });
            } catch (IOException | RejectedExecutionException e) {
                dispositionWork.onComplete(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", e, getErrorContext(delivery)));
            }
        }));
        return dispositionWork.getMono();
    }

    private void handleRetriableRejectedRemoteOutcome(DispositionWork dispositionWork, Delivery delivery, Rejected rejected) {
        AmqpErrorContext errorContext = getErrorContext(delivery);
        ErrorCondition error = rejected.getError();
        Exception exception = ExceptionUtil.toException(error.getCondition().toString(), error.getDescription(), errorContext);
        if (this.retryPolicy.calculateRetryDelay(exception, dispositionWork.getTryCount()) == null) {
            this.logger.atInfo().addKeyValue("lockToken", dispositionWork.getDeliveryTag()).addKeyValue("deliveryState", delivery.getRemoteState()).log("Retry attempts exhausted.", exception);
            completeDispositionWorkWithSettle(dispositionWork, delivery, exception);
            return;
        }
        dispositionWork.onRetriableRejectedOutcome(exception);
        try {
            this.dispatcher.invoke(() -> {
                delivery.disposition(dispositionWork.getDesiredState());
            });
        } catch (IOException | RejectedExecutionException e) {
            completeDispositionWorkWithSettle(dispositionWork, delivery, this.logger.atError().addKeyValue("lockToken", dispositionWork.getDeliveryTag()).log((RuntimeException) new AmqpException(false, String.format("linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.", this.receiveLinkName, dispositionWork.getDeliveryTag()), e, getErrorContext(delivery))));
        }
    }

    private void handleReleasedOrUnknownRemoteOutcome(DispositionWork dispositionWork, Delivery delivery, Outcome outcome) {
        AmqpErrorContext errorContext = getErrorContext(delivery);
        AmqpException amqpException = delivery.getRemoteState().getType() == DeliveryState.DeliveryStateType.Released ? new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED, "AMQP layer unexpectedly aborted or disconnected.", errorContext) : new AmqpException(false, outcome.toString(), errorContext);
        this.logger.atInfo().addKeyValue("lockToken", dispositionWork.getDeliveryTag()).addKeyValue("deliveryState", delivery.getRemoteState()).log("Completing pending updateState operation with exception.", amqpException);
        completeDispositionWorkWithSettle(dispositionWork, delivery, amqpException);
    }

    private void completeDispositionWorksOnTimeout(String str) {
        if (this.pendingDispositions.isEmpty()) {
            return;
        }
        int[] iArr = new int[1];
        StringJoiner stringJoiner = new StringJoiner(", ");
        this.pendingDispositions.forEach((str2, dispositionWork) -> {
            if (dispositionWork == null || !dispositionWork.hasTimedout()) {
                return;
            }
            if (iArr[0] == 0) {
                this.logger.info("Starting completion of timed out disposition works (call site:{}).", str);
            }
            Throwable rejectedOutcomeError = dispositionWork.getRejectedOutcomeError() != null ? dispositionWork.getRejectedOutcomeError() : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Update disposition request timed out.", getErrorContext(this.deliveries.get(dispositionWork.getDeliveryTag())));
            stringJoiner.add(dispositionWork.getDeliveryTag());
            completeDispositionWork(dispositionWork, rejectedOutcomeError);
            iArr[0] = iArr[0] + 1;
        });
        if (iArr[0] > 0) {
            this.logger.info("Completed {} timed-out disposition works (call site:{}). Locks {}", str, Integer.valueOf(iArr[0]), stringJoiner.toString());
        }
    }

    private void completeDispositionWorksOnClose() {
        if (this.pendingDispositions.isEmpty()) {
            return;
        }
        int[] iArr = new int[1];
        StringJoiner stringJoiner = new StringJoiner(", ");
        AmqpException amqpException = new AmqpException(false, "The receiver didn't receive the disposition acknowledgment due to receive link closure.", null);
        this.pendingDispositions.forEach((str, dispositionWork) -> {
            if (dispositionWork == null || dispositionWork.isCompleted()) {
                return;
            }
            if (iArr[0] == 0) {
                this.logger.info("Starting completion of disposition works as part of receive link closure.");
            }
            stringJoiner.add(dispositionWork.getDeliveryTag());
            completeDispositionWork(dispositionWork, amqpException);
            iArr[0] = iArr[0] + 1;
        });
        if (iArr[0] > 0) {
            this.logger.info("Completed {} disposition works as part of receive link closure. Locks {}", Integer.valueOf(iArr[0]), stringJoiner.toString());
        }
    }

    private void completeDispositionWorkWithSettle(DispositionWork dispositionWork, Delivery delivery, Throwable th) {
        boolean remotelySettled = delivery.remotelySettled();
        if (remotelySettled) {
            delivery.settle();
        }
        if (th != null) {
            dispositionWork.onComplete(th instanceof RuntimeException ? this.logger.logExceptionAsError((RuntimeException) th) : th);
        } else {
            dispositionWork.onComplete();
        }
        if (remotelySettled) {
            String deliveryTag = dispositionWork.getDeliveryTag();
            this.pendingDispositions.remove(deliveryTag);
            this.deliveries.remove(deliveryTag);
        }
    }

    private void completeDispositionWork(DispositionWork dispositionWork, Throwable th) {
        this.pendingDispositions.remove(dispositionWork.getDeliveryTag());
        dispositionWork.onComplete(th instanceof RuntimeException ? this.logger.logExceptionAsError((RuntimeException) th) : th);
    }

    private AmqpErrorContext getErrorContext(Delivery delivery) {
        if (delivery == null || delivery.getLink() == null) {
            return null;
        }
        return LinkHandler.getErrorContext(this.hostName, this.entityPath, delivery.getLink());
    }
}
