package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpManagementNode;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.core.amqp.models.DeliveryOutcome;
import com.azure.core.util.logging.ClientLogger;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.Metrics;

/* loaded from: input_file:payload/TIB_bwpluginamqp_6.4.0_common.zip:assemblies/assembly_tibco_com_tibco_bw_ms_asb_sdk_tpcl_feature_1.5.0.005.zip:source/plugins/com.tibco.bw.ms.asb.sdk.tpcl_1.5.0.004.jar:lib/azure-core-amqp-2.8.3.jar:com/azure/core/amqp/implementation/ManagementChannel.class */
public class ManagementChannel implements AmqpManagementNode {
    private final TokenManager tokenManager;
    private final AmqpChannelProcessor<RequestResponseChannel> createChannel;
    private final String fullyQualifiedNamespace;
    private final ClientLogger logger;
    private final String entityPath;

    public ManagementChannel(AmqpChannelProcessor<RequestResponseChannel> amqpChannelProcessor, String str, String str2, TokenManager tokenManager) {
        this.createChannel = (AmqpChannelProcessor) Objects.requireNonNull(amqpChannelProcessor, "'createChannel' cannot be null.");
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = (String) Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        HashMap hashMap = new HashMap();
        hashMap.put(ClientConstants.ENTITY_PATH_KEY, str2);
        this.logger = new ClientLogger((Class<?>) ManagementChannel.class, hashMap);
        this.tokenManager = (TokenManager) Objects.requireNonNull(tokenManager, "'tokenManager' cannot be null.");
    }

    @Override // com.azure.core.amqp.AmqpManagementNode
    public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage amqpAnnotatedMessage) {
        return isAuthorized().then(this.createChannel.flatMap(requestResponseChannel -> {
            return requestResponseChannel.sendWithAck(MessageUtils.toProtonJMessage(amqpAnnotatedMessage)).handle((message, synchronousSink) -> {
                handleResponse(message, synchronousSink, requestResponseChannel.getErrorContext());
            }).switchIfEmpty(Mono.error((Supplier<? extends Throwable>) () -> {
                return new AmqpException(true, String.format("entityPath[%s] No response received from management channel.", this.entityPath), requestResponseChannel.getErrorContext());
            }));
        }));
    }

    @Override // com.azure.core.amqp.AmqpManagementNode
    public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage amqpAnnotatedMessage, DeliveryOutcome deliveryOutcome) {
        return isAuthorized().then(this.createChannel.flatMap(requestResponseChannel -> {
            return requestResponseChannel.sendWithAck(MessageUtils.toProtonJMessage(amqpAnnotatedMessage), MessageUtils.toProtonJDeliveryState(deliveryOutcome)).handle((message, synchronousSink) -> {
                handleResponse(message, synchronousSink, requestResponseChannel.getErrorContext());
            }).switchIfEmpty(Mono.error((Supplier<? extends Throwable>) () -> {
                return new AmqpException(true, String.format("entityPath[%s] outcome[%s] No response received from management channel.", this.entityPath, deliveryOutcome.getDeliveryState()), requestResponseChannel.getErrorContext());
            }));
        }));
    }

    @Override // com.azure.core.util.AsyncCloseable
    public Mono<Void> closeAsync() {
        return this.createChannel.flatMap(requestResponseChannel -> {
            return requestResponseChannel.closeAsync();
        }).cache();
    }

    private void handleResponse(Message message, SynchronousSink<AmqpAnnotatedMessage> synchronousSink, AmqpErrorContext amqpErrorContext) {
        if (RequestResponseUtils.isSuccessful(message)) {
            synchronousSink.next(MessageUtils.toAmqpAnnotatedMessage(message));
            return;
        }
        AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
        if (statusCode == AmqpResponseCode.NO_CONTENT) {
            synchronousSink.next(MessageUtils.toAmqpAnnotatedMessage(message));
            return;
        }
        String errorCondition = RequestResponseUtils.getErrorCondition(message);
        if (statusCode != AmqpResponseCode.NOT_FOUND) {
            String statusDescription = RequestResponseUtils.getStatusDescription(message);
            this.logger.atWarning().addKeyValue(Metrics.STATUS, statusCode).addKeyValue(ClientConstants.ERROR_DESCRIPTION_KEY, statusDescription).addKeyValue(ClientConstants.ERROR_CONDITION_KEY, errorCondition).log("Operation not successful.");
            synchronousSink.error(ExceptionUtil.toException(errorCondition, statusDescription, amqpErrorContext));
            return;
        }
        AmqpErrorCondition fromString = AmqpErrorCondition.fromString(errorCondition);
        if (fromString == AmqpErrorCondition.MESSAGE_NOT_FOUND) {
            this.logger.info("There was no matching message found.");
            synchronousSink.next(MessageUtils.toAmqpAnnotatedMessage(message));
        } else if (fromString == AmqpErrorCondition.SESSION_NOT_FOUND) {
            this.logger.info("There was no matching session found.");
            synchronousSink.next(MessageUtils.toAmqpAnnotatedMessage(message));
        }
    }

    private Mono<Void> isAuthorized() {
        return this.tokenManager.getAuthorizationResults().next().switchIfEmpty(Mono.error((Supplier<? extends Throwable>) () -> {
            return new AmqpException(false, "Did not get response from tokenManager: " + this.entityPath, getErrorContext());
        })).handle((amqpResponseCode, synchronousSink) -> {
            if (RequestResponseUtils.isSuccessful(amqpResponseCode)) {
                synchronousSink.complete();
            } else {
                synchronousSink.error(ExceptionUtil.amqpResponseCodeToException(amqpResponseCode.getValue(), String.format("User does not have authorization to perform operation on entity [%s]. Response: [%s]", this.entityPath, amqpResponseCode), getErrorContext()));
            }
        });
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.fullyQualifiedNamespace, this.entityPath);
    }
}
