package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:payload/TIB_bwpluginamqp_6.4.0_common.zip:assemblies/assembly_tibco_com_tibco_bw_ms_asb_sdk_tpcl_feature_1.5.0.005.zip:source/plugins/com.tibco.bw.ms.asb.sdk.tpcl_1.5.0.004.jar:lib/azure-messaging-servicebus-7.13.2.jar:com/azure/messaging/servicebus/SynchronousReceiveWork.class */
public class SynchronousReceiveWork {
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000);
    private final ClientLogger logger;
    private final Duration timeout;
    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;
    private final Sinks.Many<ServiceBusReceivedMessage> downstreamEmitter;
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final AtomicBoolean isTerminal = new AtomicBoolean();
    private final Disposable.Composite timeoutSubscriptions = Disposables.composite();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousReceiveWork(long j, int i, Duration duration, Sinks.Many<ServiceBusReceivedMessage> many) {
        this.id = j;
        this.remaining = new AtomicInteger(i);
        this.numberToReceive = i;
        this.timeout = duration;
        this.downstreamEmitter = many;
        HashMap hashMap = new HashMap(1);
        hashMap.put(ServiceBusConstants.WORK_ID_KEY, Long.valueOf(j));
        this.logger = new ClientLogger((Class<?>) SynchronousReceiveWork.class, hashMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getId() {
        return this.id;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Duration getTimeout() {
        return this.timeout;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfEvents() {
        return this.numberToReceive;
    }

    int getRemainingEvents() {
        return this.remaining.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void start() {
        if (this.isStarted.getAndSet(true)) {
            return;
        }
        this.timeoutSubscriptions.add(Mono.delay(this.timeout).subscribe(l -> {
            complete("Timeout elapsed for work.");
        }, th -> {
            complete("Error occurred while waiting for timeout.", th);
        }));
        this.timeoutSubscriptions.add(Flux.switchOnNext(this.downstreamEmitter.asFlux().map(serviceBusReceivedMessage -> {
            return Mono.delay(TIMEOUT_BETWEEN_MESSAGES);
        })).subscribe(l2 -> {
            complete("Timeout between the messages occurred. Completing the work.");
        }, th2 -> {
            complete("Error occurred while waiting for timeout between messages.", th2);
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isTerminal() {
        return this.isTerminal.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean emitNext(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        if (this.isTerminal.get()) {
            return false;
        }
        if (!this.isStarted.get()) {
            start();
        }
        int decrementAndGet = this.remaining.decrementAndGet();
        if (decrementAndGet < 0) {
            this.logger.info("Number left {} < 0. Not emitting downstream.", Integer.valueOf(decrementAndGet));
            return false;
        }
        Sinks.EmitResult tryEmitNext = this.downstreamEmitter.tryEmitNext(serviceBusReceivedMessage);
        if (tryEmitNext != Sinks.EmitResult.OK) {
            this.logger.info("Could not emit downstream. EmitResult: {}", tryEmitNext);
            return false;
        }
        if (decrementAndGet != 0) {
            return true;
        }
        complete(null);
        return true;
    }

    void complete(String str) {
        complete(str, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void complete(String str, Throwable th) {
        if (this.isTerminal.getAndSet(true)) {
            return;
        }
        if (str != null) {
            if (th == null) {
                this.logger.verbose(str);
            } else {
                this.logger.warning(str, th);
            }
        }
        try {
            this.timeoutSubscriptions.dispose();
            if (th == null) {
                this.downstreamEmitter.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            } else {
                this.downstreamEmitter.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
            }
        } catch (Throwable th2) {
            if (th == null) {
                this.downstreamEmitter.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            } else {
                this.downstreamEmitter.emitError(th, Sinks.EmitFailureHandler.FAIL_FAST);
            }
            throw th2;
        }
    }
}
