/*
 * Decompiled with CFR 0.152.
 */
package pt.webdetails.cda.push;

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.swing.table.TableModel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.pentaho.di.core.RowMetaAndData;
import pt.webdetails.cda.CdaCoreService;
import pt.webdetails.cda.CdaEngine;
import pt.webdetails.cda.dataaccess.StreamingDataservicesDataAccess;
import pt.webdetails.cda.exporter.TableExporter;
import pt.webdetails.cda.push.IWebsocketEndpoint;
import pt.webdetails.cda.query.QueryOptions;
import pt.webdetails.cda.settings.CdaSettings;
import pt.webdetails.cda.utils.DoQueryParameters;
import pt.webdetails.cda.utils.QueryParameters;
import pt.webdetails.cda.utils.kettle.RowMetaToTableModel;

public class WebsocketJsonQueryEndpoint
implements IWebsocketEndpoint {
    private static final Log logger = LogFactory.getLog(WebsocketJsonQueryEndpoint.class);
    public static final String ACCEPTED_SUB_PROTOCOL = "JSON-CDA-query";
    private PublishSubject<List<RowMetaAndData>> consumer;
    private WebsocketDisposableObserver<List<RowMetaAndData>> disposableConsumer;
    private QueryParameters queryParametersUtil;

    public WebsocketJsonQueryEndpoint() {
        this.queryParametersUtil = new QueryParameters();
    }

    public WebsocketJsonQueryEndpoint(QueryParameters queryParameters) {
        this.queryParametersUtil = queryParameters;
    }

    public void setQueryParametersUtil(QueryParameters queryParametersUtil) {
        this.queryParametersUtil = queryParametersUtil;
    }

    public QueryParameters getQueryParametersUtil() {
        return this.queryParametersUtil;
    }

    @Override
    public void onOpen(Consumer<String> outboundMessageConsumer) {
    }

    @Override
    public void onMessage(String message, Consumer<String> outboundMessageConsumer) {
        try {
            Map<String, List<String>> params = this.queryParametersUtil.getParametersFromJson(message);
            DoQueryParameters parameters = this.queryParametersUtil.getDoQueryParameters(params);
            CdaCoreService core = this.getCdaCoreService();
            String path = parameters.getPath();
            CdaEngine cdaEngine = CdaEngine.getInstance();
            CdaSettings cdaSettings = cdaEngine.getSettingsManager().parseSettingsFile(path);
            QueryOptions queryOptions = CdaCoreService.getQueryOptions(parameters);
            TableExporter tableExporter = cdaEngine.getExporter(queryOptions);
            StreamingDataservicesDataAccess dataAccess = (StreamingDataservicesDataAccess)cdaSettings.getDataAccess(queryOptions.getDataAccessId());
            this.consumer = PublishSubject.create();
            this.consumer.toFlowable(BackpressureStrategy.LATEST).onBackpressureBuffer(1L, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST).map(RowMetaToTableModel.getConverter()::apply).subscribe(tableModel -> {
                try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();){
                    tableExporter.export(outputStream, (TableModel)tableModel);
                    outboundMessageConsumer.accept(((Object)outputStream).toString());
                }
                catch (Exception e) {
                    logger.error((Object)"Error converting a stream push table model into a json output", (Throwable)e);
                }
            });
            this.disposableConsumer = new WebsocketDisposableObserver<List<RowMetaAndData>>((Observer<List<RowMetaAndData>>)this.consumer);
            dataAccess.doPushStreamQuery(queryOptions, this.disposableConsumer);
        }
        catch (Exception e) {
            logger.error((Object)"Error processing JSON query message.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void onClose() {
        if (this.consumer != null) {
            this.consumer.onComplete();
        }
        if (this.disposableConsumer != null && !this.disposableConsumer.isDisposed()) {
            this.disposableConsumer.dispose();
        }
    }

    public PublishSubject<List<RowMetaAndData>> getConsumer() {
        return this.consumer;
    }

    public WebsocketDisposableObserver<List<RowMetaAndData>> getDisposableConsumer() {
        return this.disposableConsumer;
    }

    @Override
    public List<String> getSubProtocols() {
        return Collections.singletonList(ACCEPTED_SUB_PROTOCOL);
    }

    private CdaCoreService getCdaCoreService() {
        return new CdaCoreService(CdaEngine.getInstance());
    }

    public static class WebsocketDisposableObserver<T>
    implements Observer<T>,
    Disposable {
        private Observer<T> actual;
        private Disposable toDispose;

        public WebsocketDisposableObserver(Observer<T> subject) {
            this.actual = subject;
        }

        public void dispose() {
            if (this.toDispose != null) {
                this.toDispose.dispose();
            }
        }

        public boolean isDisposed() {
            return this.toDispose != null && this.toDispose.isDisposed();
        }

        public void onSubscribe(Disposable d) {
            this.toDispose = d;
            this.actual.onSubscribe(d);
        }

        public void onNext(T t) {
            this.actual.onNext(t);
        }

        public void onError(Throwable e) {
            this.actual.onError(e);
        }

        public void onComplete() {
            this.actual.onComplete();
        }
    }
}

