package biz.chitec.quarterback.gjsaserver.amqp;

import biz.chitec.quarterback.gjsa.amqp.AMQPDeserializationException;
import biz.chitec.quarterback.gjsa.amqp.AMQPMessage;
import biz.chitec.quarterback.gjsa.client.SessionedServerConnector;
import biz.chitec.quarterback.gjsa.consolidation.StringRepresentationFormat;
import biz.chitec.quarterback.gjsa.core.GJSACore;
import biz.chitec.quarterback.gjsa.core.ServerEnvelope;
import biz.chitec.quarterback.gjsa.core.ServerReply;
import biz.chitec.quarterback.gjsaserver.GJSAServer;
import biz.chitec.quarterback.gjsaserver.ServerThreadBase;
import biz.chitec.quarterback.util.ThreadInterface;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;
import de.cantamen.quarterback.core.Catcher;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:biz/chitec/quarterback/gjsaserver/amqp/AMQPServerHandler.class */
public class AMQPServerHandler implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String sendQueueNameBase = "TOCLIENT";
    private static final String receiveQueueNameBase = "TOSERVER";
    private final GJSAServer server;
    private final ConnectionFactory factory;
    private final String label;
    private final ServerEnvelope initialCommand;
    private Connection connection;
    private Channel sendChannel;
    private Channel receiveChannel;
    private ServerThreadBase serverThreadBase;
    private final URI target;
    private final String sendQueueName;
    private final String receiveQueueName;
    private ThreadInterface<ServerEnvelope> readti;
    private ThreadInterface<ServerEnvelope> writeti;

    public AMQPServerHandler(GJSAServer gJSAServer, URI uri, String str, String str2, ServerEnvelope serverEnvelope) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException {
        this.server = gJSAServer;
        this.target = uri;
        this.sendQueueName = str + "TOCLIENT";
        this.receiveQueueName = str + "TOSERVER";
        this.label = str2;
        this.initialCommand = serverEnvelope;
        if (SessionedServerConnector.SocketLevel.of(this.target) != SessionedServerConnector.SocketLevel.AMQP) {
            throw new IllegalArgumentException("AMQPServerHandler cannot handle URL " + this.target);
        }
        this.factory = new ConnectionFactory();
        this.factory.setUri(uri);
    }

    public void serve() throws IOException {
        try {
            if (this.connection != null) {
                return;
            }
            try {
                this.connection = this.factory.newConnection();
                this.sendChannel = this.connection.createChannel();
                this.sendChannel.queueDeclare(this.sendQueueName, false, false, false, null);
                this.receiveChannel = this.connection.createChannel();
                this.receiveChannel.queueDeclare(this.receiveQueueName, false, false, false, null);
                this.serverThreadBase = this.server.createConnectionHandler("AMQP " + this.label, false, (InetAddress) Catcher.wrap(InetAddress::getLocalHost), StringRepresentationFormat.JSON, ServerThreadBase.TransportProtocol.AMQP);
                if (this.initialCommand != null && this.serverThreadBase.reply((ServerEnvelope) this.initialCommand.clone()).getReplies().stream().anyMatch(serverReply -> {
                    return !serverReply.isPositive();
                })) {
                    throw new IllegalStateException("Initial command failed");
                }
                this.readti = new ThreadInterface<>();
                this.writeti = new ThreadInterface<>();
                new Thread(this::handler, "AMQP GJSA " + this.label + " handler").start();
                new Thread(this::writer, "AMQP GJSA " + this.label + " writer").start();
                this.serverThreadBase.setFreeFloatingInterface(serverEnvelope -> {
                    this.writeti.accept(serverEnvelope);
                });
                this.receiveChannel.basicConsume(this.receiveQueueName, true, this::handleIncoming, str -> {
                });
            } catch (TimeoutException e) {
                throw new IOException(e);
            }
        } catch (IOException e2) {
            close();
            throw e2;
        }
    }

    private void handleIncoming(String str, Delivery delivery) {
        try {
            this.readti.accept(AMQPMessage.deserialize(delivery.getBody()).getServerEnvelope());
        } catch (AMQPDeserializationException e) {
            logger.warn("Problem deserializing an incoming AMQP message for handler \"" + this.label + "\"", (Throwable) e);
            this.writeti.accept(new ServerEnvelope(10, new ServerReply(100)));
        }
    }

    private void handler() {
        ServerEnvelope serverEnvelope;
        while (true) {
            try {
                ServerEnvelope consume = this.readti.consume();
                if (consume == null) {
                    break;
                }
                try {
                    if (consume.isClosingRequest()) {
                        serverEnvelope = consume;
                        serverEnvelope.removeRequests();
                        serverEnvelope.removeReplies();
                        serverEnvelope.addFirstReply(new ServerReply(10));
                    } else if (this.initialCommand == null || consume.getFirstRequest().command != this.initialCommand.getFirstRequest().command) {
                        serverEnvelope = this.serverThreadBase.reply(consume);
                    } else {
                        serverEnvelope = consume;
                        serverEnvelope.removeRequests();
                        serverEnvelope.removeReplies();
                        serverEnvelope.addFirstReply(new ServerReply(20));
                    }
                } catch (Exception e) {
                    serverEnvelope = consume;
                    serverEnvelope.removeRequests();
                    serverEnvelope.removeReplies();
                    serverEnvelope.addReply(new ServerReply(100));
                }
                this.writeti.accept(serverEnvelope);
            } catch (IOException e2) {
                logger.info("AMQP GJSA " + this.label + " handler: Closing");
                return;
            } catch (Exception e3) {
                logger.error("AMQP GJSA " + this.label + " handler: Fatal exception in handler loop", (Throwable) e3);
                return;
            }
        }
        throw new IOException("Null read");
    }

    private void writer() {
        while (true) {
            try {
                ServerEnvelope consume = this.writeti.consume();
                if (consume == null) {
                    break;
                }
                try {
                    this.sendChannel.basicPublish("", this.sendQueueName, null, new AMQPMessage(consume).serialize());
                } catch (IOException e) {
                    logger.error("Problem sending " + GJSACore.debugPrint(consume), (Throwable) e);
                }
            } catch (IOException e2) {
                logger.info("AMQP GJSA " + this.label + " writer: Closing");
                return;
            } catch (Exception e3) {
                logger.error("AMQP GJSA " + this.label + " writer: Fatal exception in handler loop", (Throwable) e3);
                return;
            }
        }
        throw new IOException("Null read");
    }

    private <T extends Closeable> T _close(T t) {
        if (t == null) {
            return null;
        }
        try {
            t.close();
            return null;
        } catch (IOException e) {
            return null;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.readti = (ThreadInterface) _close(this.readti);
        this.writeti = (ThreadInterface) _close(this.writeti);
        this.serverThreadBase = (ServerThreadBase) _close(this.serverThreadBase);
        this.connection = (Connection) _close(this.connection);
    }
}
