package org.eclipse.hono.client.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonSender;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.TelemetryConstants;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.11.2.jar:org/eclipse/hono/client/impl/TelemetrySenderImpl.class */
public class TelemetrySenderImpl extends AbstractDownstreamSender {
    /* JADX INFO: Access modifiers changed from: protected */
    public TelemetrySenderImpl(HonoConnection honoConnection, ProtonSender protonSender, String str, String str2, SendMessageSampler sendMessageSampler) {
        super(honoConnection, protonSender, str, str2, sendMessageSampler);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public String getEndpoint() {
        return TelemetryConstants.TELEMETRY_ENDPOINT;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.hono.client.impl.AbstractSender
    public String getTo(String str) {
        return AddressHelper.getTargetAddress(TelemetryConstants.TELEMETRY_ENDPOINT, this.tenantId, str, null);
    }

    public static Future<DownstreamSender> create(HonoConnection honoConnection, String str, SendMessageSampler sendMessageSampler, Handler<String> handler) {
        Objects.requireNonNull(honoConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(sendMessageSampler);
        String targetAddress = AddressHelper.getTargetAddress(TelemetryConstants.TELEMETRY_ENDPOINT, str, null, honoConnection.getConfig());
        return honoConnection.createSender(targetAddress, ProtonQoS.AT_LEAST_ONCE, handler).compose(protonSender -> {
            return Future.succeededFuture(new TelemetrySenderImpl(honoConnection, protonSender, str, targetAddress, sendMessageSampler));
        });
    }

    @Override // org.eclipse.hono.client.MessageSender
    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message) {
        return sendAndWaitForOutcome(message, null);
    }

    @Override // org.eclipse.hono.client.MessageSender
    public Future<ProtonDelivery> sendAndWaitForOutcome(Message message, SpanContext spanContext) {
        Objects.requireNonNull(message);
        Span startChildSpan = startChildSpan(spanContext);
        Tags.MESSAGE_BUS_DESTINATION.set(startChildSpan, getMessageAddress(message));
        TracingHelper.TAG_QOS.set(startChildSpan, this.sender.getQoS().toString());
        TracingHelper.setDeviceTags(startChildSpan, this.tenantId, MessageHelper.getDeviceId(message));
        TracingHelper.injectSpanContext(this.connection.getTracer(), startChildSpan.context(), message);
        return this.connection.executeOnContext(promise -> {
            if (!this.sender.sendQueueFull()) {
                sendMessageAndWaitForOutcome(message, startChildSpan).onComplete2(promise);
                return;
            }
            NoConsumerException noConsumerException = new NoConsumerException("no credit available");
            logMessageSendingError("error sending message [ID: {}, address: {}], no credit available (drain={})", message.getMessageId(), getMessageAddress(message), Boolean.valueOf(this.sender.getDrain()));
            TracingHelper.TAG_CREDIT.set(startChildSpan, (Integer) 0);
            logError(startChildSpan, noConsumerException);
            startChildSpan.finish();
            promise.fail(noConsumerException);
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected Future<ProtonDelivery> sendMessage(Message message, Span span) {
        Objects.requireNonNull(message);
        Objects.requireNonNull(span);
        String format = String.format("%s-%d", getClass().getSimpleName(), Long.valueOf(MESSAGE_COUNTER.getAndIncrement()));
        message.setMessageId(format);
        logMessageIdAndSenderInfo(span, format);
        SendMessageSampler.Sample start = this.sampler.start(this.tenantId);
        AtomicReference atomicReference = new AtomicReference();
        ClientConfigProperties config = this.connection.getConfig();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Long valueOf = config.getSendMessageTimeout() > 0 ? Long.valueOf(this.connection.getVertx().setTimer(config.getSendMessageTimeout(), l -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                handleSendMessageTimeout(message, config.getSendMessageTimeout(), (ProtonDelivery) atomicReference.get(), start, null, span);
            }
        })) : null;
        ProtonDelivery send = this.sender.send(message, protonDelivery -> {
            if (valueOf != null) {
                this.connection.getVertx().cancelTimer(valueOf.longValue());
            }
            DeliveryState remoteState = protonDelivery.getRemoteState();
            start.completed(remoteState);
            if (atomicBoolean.get()) {
                this.log.debug("ignoring received delivery update for message [ID: {}, address: {}]: waiting for the update has already timed out", format, getMessageAddress(message));
            } else if (protonDelivery.remotelySettled()) {
                logUpdatedDeliveryState(span, message, protonDelivery);
            } else {
                logMessageSendingError("peer did not settle message [ID: {}, address: {}, remote state: {}], failing delivery", format, getMessageAddress(message), remoteState.getClass().getSimpleName());
                TracingHelper.logError(span, new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "peer did not settle message, failing delivery"));
            }
            span.finish();
        });
        atomicReference.set(send);
        this.log.trace("sent AT_MOST_ONCE message [ID: {}, address: {}], remaining credit: {}, queued messages: {}", format, getMessageAddress(message), Integer.valueOf(this.sender.getCredit()), Integer.valueOf(this.sender.getQueued()));
        return Future.succeededFuture(send);
    }

    @Override // org.eclipse.hono.client.impl.AbstractSender
    protected Span startSpan(SpanContext spanContext, Message message) {
        Span newFollowingSpan = newFollowingSpan(spanContext, "forward Telemetry data");
        Tags.SPAN_KIND.set(newFollowingSpan, Tags.SPAN_KIND_PRODUCER);
        return newFollowingSpan;
    }

    private Span startChildSpan(SpanContext spanContext) {
        Span newChildSpan = newChildSpan(spanContext, "forward Telemetry data");
        Tags.SPAN_KIND.set(newChildSpan, Tags.SPAN_KIND_PRODUCER);
        return newChildSpan;
    }
}
