package org.pentaho.di.trans.ael.websocket;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.engine.api.remote.ExecutionRequest;
import org.pentaho.di.engine.api.remote.Message;
import org.pentaho.di.engine.api.remote.MessageDecoder;
import org.pentaho.di.engine.api.remote.MessageEncoder;
import org.pentaho.di.engine.api.remote.StopMessage;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.ael.websocket.exception.MessageEventFireEventException;

/* loaded from: input_file:org/pentaho/di/trans/ael/websocket/DaemonMessagesClientEndpoint.class */
public class DaemonMessagesClientEndpoint extends Endpoint {
    private static Class<?> PKG = DaemonMessagesClientEndpoint.class;
    private static final String EXCEPTION_SESSION_CLOSED = "DaemonMessagesClientEndpoint.Exception.SessionIsClosed";
    private static final String KETTLE_AEL_PDI_DAEMON_PRINCIPAL = "KETTLE_AEL_PDI_DAEMON_PRINCIPAL";
    private static final String KETTLE_AEL_PDI_DAEMON_KEYTAB = "KETTLE_AEL_PDI_DAEMON_KEYTAB";
    private static final String KETTLE_AEL_PDI_DAEMON_CONTEXT_REUSE = "KETTLE_AEL_PDI_DAEMON_CONTEXT_REUSE";
    private static final String Y_LWC = "y";
    private static final int MAX_TXT_MSG_BUF_SIZE = 500000;
    private static final int MAX_BIN_MSG_BUF_SIZE = 500000;
    private static final String URL_TEMPLATE = "%s://%s:%s/execution";
    private static final String PRFX_WS = "ws";
    private static final String PRFX_WS_SSL = "wss";
    private final MessageEventService messageEventService;
    private Session userSession = null;
    private String principal = null;
    private String keytab = null;
    private boolean reuseSparkContext = false;
    private AtomicBoolean alReadySendedStopMessage = new AtomicBoolean(false);

    public DaemonMessagesClientEndpoint(String str, int i, boolean z, MessageEventService messageEventService) throws KettleException {
        try {
            setAuthProperties();
            Object[] objArr = new Object[3];
            objArr[0] = z ? PRFX_WS_SSL : PRFX_WS;
            objArr[1] = str;
            objArr[2] = Integer.valueOf(i);
            URI uri = new URI(String.format(URL_TEMPLATE, objArr));
            this.messageEventService = messageEventService;
            ContainerProvider.getWebSocketContainer().connectToServer(this, ClientEndpointConfig.Builder.create().encoders(Collections.singletonList(MessageEncoder.class)).decoders(Collections.singletonList(MessageDecoder.class)).configurator(new SessionConfigurator(uri, this.keytab, this.principal)).build(), uri);
        } catch (Exception e) {
            throw new KettleException(e);
        }
    }

    private void setAuthProperties() {
        Variables variables = new Variables();
        variables.initializeVariablesFrom((VariableSpace) null);
        this.principal = variables.getVariable(KETTLE_AEL_PDI_DAEMON_PRINCIPAL, (String) null);
        this.keytab = variables.getVariable(KETTLE_AEL_PDI_DAEMON_KEYTAB, (String) null);
        String variable = variables.getVariable(KETTLE_AEL_PDI_DAEMON_CONTEXT_REUSE, Boolean.FALSE.toString());
        this.reuseSparkContext = Boolean.TRUE.toString().equals(variable.toLowerCase()) || Y_LWC.equals(variable.toLowerCase());
    }

    public void onOpen(Session session, EndpointConfig endpointConfig) {
        this.userSession = session;
        this.userSession.setMaxTextMessageBufferSize(500000);
        this.userSession.setMaxBinaryMessageBufferSize(500000);
        session.addMessageHandler(new MessageHandler.Whole<Message>() { // from class: org.pentaho.di.trans.ael.websocket.DaemonMessagesClientEndpoint.1
            public void onMessage(Message message) {
                try {
                    DaemonMessagesClientEndpoint.this.messageEventService.fireEvent(message);
                } catch (MessageEventFireEventException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }
        });
    }

    public void onClose(Session session, CloseReason closeReason) {
        this.userSession = null;
    }

    public void onError(Session session, Throwable th) {
        throw new RuntimeException(th);
    }

    public void sendMessage(ExecutionRequest executionRequest) throws KettleException {
        sessionValid();
        try {
            executionRequest.setReuseSparkContext(this.reuseSparkContext);
            this.userSession.getBasicRemote().sendObject(executionRequest);
        } catch (Exception e) {
            throw new KettleException(e);
        }
    }

    public void sendMessage(StopMessage stopMessage) throws KettleException {
        sessionValid();
        try {
            if (!this.alReadySendedStopMessage.getAndSet(true)) {
                this.userSession.getBasicRemote().sendObject(stopMessage);
            }
        } catch (Exception e) {
            throw new KettleException(e);
        }
    }

    public void close(String str) throws KettleException {
        sessionValid();
        try {
            if (this.userSession != null && this.userSession.isOpen()) {
                this.userSession.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, str));
            }
        } catch (IOException e) {
            throw new KettleException(e);
        }
    }

    public void sessionValid() throws KettleException {
        if (this.userSession == null || !this.userSession.isOpen()) {
            throw new KettleException(BaseMessages.getString(PKG, EXCEPTION_SESSION_CLOSED, new String[0]));
        }
    }
}
