package org.eclipse.hono.client.impl;

import io.opentracing.Span;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonDelivery;
import java.util.Objects;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandTargetMapper;
import org.eclipse.hono.client.DelegatedCommandSender;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.DeviceConnectionConstants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.8.0.jar:org/eclipse/hono/client/impl/MappingAndDelegatingCommandHandler.class */
public class MappingAndDelegatingCommandHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MappingAndDelegatingCommandHandler.class);
    private static final Boolean FORCED_COMMAND_REROUTING_ENABLED = Boolean.valueOf(System.getProperty("enableForcedCommandRerouting", "false"));
    private static final String FORCE_COMMAND_REROUTING_APPLICATION_PROPERTY = "force-command-rerouting";
    private final HonoConnection connection;
    private final CommandTargetMapper commandTargetMapper;
    private final AdapterInstanceCommandHandler adapterInstanceCommandHandler;
    private final String adapterInstanceId;
    private final CachingClientFactory<DelegatedCommandSender> delegatedCommandSenderFactory;
    private final SendMessageSampler sampler;

    public MappingAndDelegatingCommandHandler(HonoConnection honoConnection, CommandTargetMapper commandTargetMapper, AdapterInstanceCommandHandler adapterInstanceCommandHandler, String str, SendMessageSampler sendMessageSampler) {
        this.connection = (HonoConnection) Objects.requireNonNull(honoConnection);
        this.commandTargetMapper = (CommandTargetMapper) Objects.requireNonNull(commandTargetMapper);
        this.adapterInstanceCommandHandler = (AdapterInstanceCommandHandler) Objects.requireNonNull(adapterInstanceCommandHandler);
        this.adapterInstanceId = (String) Objects.requireNonNull(str);
        this.sampler = sendMessageSampler;
        this.delegatedCommandSenderFactory = new CachingClientFactory<>(honoConnection.getVertx(), delegatedCommandSender -> {
            return delegatedCommandSender.isOpen();
        });
        this.connection.addDisconnectListener(honoConnection2 -> {
            this.delegatedCommandSenderFactory.clearState();
        });
    }

    public void mapAndDelegateIncomingCommandMessage(String str, ProtonDelivery protonDelivery, Message message) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(protonDelivery);
        Objects.requireNonNull(message);
        if (!ResourceIdentifier.isValid(message.getAddress())) {
            LOG.debug("command message has no valid address");
            Rejected rejected = new Rejected();
            rejected.setError(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "missing or invalid command target address"));
            protonDelivery.disposition(rejected, true);
            return;
        }
        ResourceIdentifier fromString = ResourceIdentifier.fromString(message.getAddress());
        String resourceId = fromString.getResourceId();
        if (!str.equals(fromString.getTenantId())) {
            LOG.debug("command message address contains invalid tenant [expected: {}, found: {}]", str, fromString.getTenantId());
            Rejected rejected2 = new Rejected();
            rejected2.setError(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, "unauthorized to send command to tenant"));
            protonDelivery.disposition(rejected2, true);
            return;
        }
        if (resourceId == null) {
            LOG.debug("invalid command message address: {}", message.getAddress());
            Rejected rejected3 = new Rejected();
            rejected3.setError(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "invalid command target address"));
            protonDelivery.disposition(rejected3, true);
            return;
        }
        Command from = Command.from(message, str, resourceId);
        if (from.isValid()) {
            LOG.trace("received valid command message: [{}]", from);
        } else {
            LOG.debug("received invalid command message: {}", from);
        }
        Span createSpan = CommandConsumer.createSpan("map and delegate command", str, resourceId, null, this.connection.getTracer(), TracingHelper.extractSpanContext(this.connection.getTracer(), message));
        CommandConsumer.logReceivedCommandToSpan(from, createSpan);
        mapAndDelegateIncomingCommand(str, resourceId, CommandContext.from(from, protonDelivery, createSpan));
    }

    private void mapAndDelegateIncomingCommand(String str, String str2, CommandContext commandContext) {
        LOG.trace("determine command target gateway/adapter for [{}]", commandContext.getCommand());
        this.commandTargetMapper.getTargetGatewayAndAdapterInstance(str, str2, commandContext.getTracingContext()).onComplete2(asyncResult -> {
            if (asyncResult.succeeded()) {
                delegateIncomingCommand(str, str2, commandContext, ((JsonObject) asyncResult.result()).getString("device-id"), ((JsonObject) asyncResult.result()).getString(DeviceConnectionConstants.FIELD_ADAPTER_INSTANCE_ID));
                return;
            }
            if ((asyncResult.cause() instanceof ServiceInvocationException) && ((ServiceInvocationException) asyncResult.cause()).getErrorCode() == 404) {
                LOG.debug("no target adapter instance found for command for device {}", str2);
                TracingHelper.logError(commandContext.getTracingSpan(), "no target adapter instance found for command with device id " + str2);
            } else {
                LOG.debug("error getting target gateway and adapter instance for command with device id {}", str2, asyncResult.cause());
                TracingHelper.logError(commandContext.getTracingSpan(), "error getting target gateway and adapter instance for command with device id " + str2, asyncResult.cause());
            }
            commandContext.release();
        });
    }

    private void delegateIncomingCommand(String str, String str2, CommandContext commandContext, String str3, String str4) {
        Command command = commandContext.getCommand();
        String str5 = str3.equals(str2) ? null : str3;
        CommandHandlerWrapper deviceSpecificCommandHandler = this.adapterInstanceId.equals(str4) ? this.adapterInstanceCommandHandler.getDeviceSpecificCommandHandler(str, str3) : null;
        if (this.adapterInstanceId.equals(str4) && deviceSpecificCommandHandler == null) {
            Logger logger = LOG;
            Object[] objArr = new Object[3];
            objArr[0] = str3.equals(str2) ? "device" : "gateway";
            objArr[1] = str3;
            objArr[2] = command;
            logger.info("local command handler not found for target {} {} [{}]", objArr);
            TracingHelper.logError(commandContext.getTracingSpan(), "local command handler not found for command; target device: " + str3);
            if (command.isValid()) {
                commandContext.release();
                return;
            } else {
                commandContext.reject(getMalformedMessageError());
                return;
            }
        }
        boolean isForcedCommandReroutingSet = isForcedCommandReroutingSet(commandContext);
        if (deviceSpecificCommandHandler != null && isForcedCommandReroutingSet) {
            LOG.debug("forced command rerouting is set, skip usage of local {} for {}", deviceSpecificCommandHandler, command);
        }
        if (deviceSpecificCommandHandler != null && !isForcedCommandReroutingSet) {
            CommandContext adaptCommandContextToGatewayIfNeeded = command.isValid() ? adaptCommandContextToGatewayIfNeeded(commandContext, deviceSpecificCommandHandler.getGatewayId() != null ? deviceSpecificCommandHandler.getGatewayId() : str5) : commandContext;
            LOG.trace("use local {} for {}", deviceSpecificCommandHandler, adaptCommandContextToGatewayIfNeeded.getCommand());
            deviceSpecificCommandHandler.handleCommand(adaptCommandContextToGatewayIfNeeded);
        } else if (command.isValid()) {
            delegateCommandMessageToAdapterInstance(str4, adaptCommandContextToGatewayIfNeeded(commandContext, str5));
        } else {
            commandContext.reject(getMalformedMessageError());
        }
    }

    private ErrorCondition getMalformedMessageError() {
        return new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed command message");
    }

    private CommandContext adaptCommandContextToGatewayIfNeeded(CommandContext commandContext, String str) {
        CommandContext from;
        Command command = commandContext.getCommand();
        String tenant = command.getTenant();
        String deviceId = command.getDeviceId();
        if (str == null) {
            LOG.trace("command not mapped to gateway, use original device id {}", deviceId);
            from = commandContext;
        } else {
            LOG.trace("determined target gateway {} for device {}", str, deviceId);
            commandContext.getTracingSpan().log("determined target gateway " + str);
            if (!command.isOneWay()) {
                command.getCommandMessage().setReplyTo(String.format("%s/%s/%s", "command_response", tenant, command.getReplyToId()));
            }
            from = CommandContext.from(Command.from(command.getCommandMessage(), tenant, str), commandContext.getDelivery(), commandContext.getTracingSpan());
        }
        return from;
    }

    private void delegateCommandMessageToAdapterInstance(String str, CommandContext commandContext) {
        LOG.trace("delegate command to target adapter instance '{}' [command: {}]", str, commandContext.getCommand());
        getOrCreateDelegatedCommandSender(str).compose(delegatedCommandSender -> {
            return delegatedCommandSender.sendCommandMessage(commandContext.getCommand(), commandContext.getTracingContext());
        }).onComplete2(asyncResult -> {
            if (asyncResult.succeeded()) {
                ProtonDelivery protonDelivery = (ProtonDelivery) asyncResult.result();
                LOG.trace("command [{}] sent to downstream peer; remote state of delivery: {}", commandContext.getCommand(), protonDelivery.getRemoteState());
                commandContext.disposition(protonDelivery.getRemoteState());
            } else {
                LOG.debug("failed to send command [{}] to downstream peer", commandContext.getCommand(), asyncResult.cause());
                TracingHelper.logError(commandContext.getTracingSpan(), "failed to send command message to downstream peer: " + asyncResult.cause());
                commandContext.release();
            }
        });
    }

    private boolean isForcedCommandReroutingSet(CommandContext commandContext) {
        if (!FORCED_COMMAND_REROUTING_ENABLED.booleanValue() || !commandContext.getCommand().isValid()) {
            return false;
        }
        return Boolean.TRUE.equals(MessageHelper.getApplicationProperty(commandContext.getCommand().getCommandMessage().getApplicationProperties(), FORCE_COMMAND_REROUTING_APPLICATION_PROPERTY, Boolean.class));
    }

    private Future<DelegatedCommandSender> getOrCreateDelegatedCommandSender(String str) {
        return this.connection.executeOnContext(promise -> {
            this.delegatedCommandSenderFactory.getOrCreateClient(str, () -> {
                return DelegatedCommandSenderImpl.create(this.connection, str, this.sampler, str2 -> {
                    this.delegatedCommandSenderFactory.removeClient(str);
                });
            }, promise);
        });
    }
}
