/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.dataservice.streaming.execution;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.trans.dataservice.client.api.IDataServiceClientService;

public class StreamExecutionListener {
    private IDataServiceClientService.StreamingMode windowMode;
    private long windowSize;
    private long windowEvery;
    private int maxRows;
    private long maxTime;
    private Disposable outputSubject;
    private Disposable starterSubject;
    private Disposable subject;
    private Disposable fallbackSubject;
    private Observable<List<RowMetaAndData>> buffer;
    private Observable<List<RowMetaAndData>> fallbackBuffer;
    private List<RowMetaAndData> cachePreWindow = Collections.synchronizedList(new ArrayList());
    private PublishSubject<List<RowMetaAndData>> outputBufferPublisher;
    private final AtomicBoolean hasWindow = new AtomicBoolean(false);

    public StreamExecutionListener(PublishSubject<RowMetaAndData> stream, Consumer<List<RowMetaAndData>> windowConsumer, IDataServiceClientService.StreamingMode windowMode, long windowSize, long windowEvery, int maxRows, long maxTime) {
        this.windowMode = windowMode;
        this.windowSize = windowSize;
        this.windowEvery = windowEvery;
        this.maxRows = maxRows;
        this.maxTime = maxTime;
        this.init(stream, windowConsumer);
    }

    private void init(PublishSubject<RowMetaAndData> stream, Consumer<List<RowMetaAndData>> windowConsumer) {
        boolean rowBased = IDataServiceClientService.StreamingMode.ROW_BASED.equals((Object)this.windowMode);
        boolean timeBased = IDataServiceClientService.StreamingMode.TIME_BASED.equals((Object)this.windowMode);
        if (this.windowEvery > 0L) {
            if (timeBased) {
                this.buffer = stream.buffer(this.windowSize, this.windowEvery, TimeUnit.MILLISECONDS);
            } else if (rowBased) {
                this.buffer = stream.buffer((int)this.windowSize, (int)this.windowEvery);
            }
        } else {
            this.buffer = timeBased ? stream.buffer(this.windowSize, TimeUnit.MILLISECONDS) : stream.buffer((int)this.windowSize);
        }
        this.fallbackBuffer = stream.buffer(this.maxTime, TimeUnit.MILLISECONDS, Schedulers.computation(), this.maxRows, () -> new ArrayList(), true);
        this.outputBufferPublisher = PublishSubject.create();
        this.outputSubject = this.outputBufferPublisher.subscribe(windowConsumer);
        this.resetBuffer();
        this.resetFallbackBuffer();
        this.starterSubject = timeBased ? stream.buffer((long)((int)(this.windowEvery > 0L ? this.windowEvery : this.windowSize)), TimeUnit.MILLISECONDS).subscribe(items -> {
            if (!this.hasWindow.get()) {
                this.resetFallbackBuffer();
                this.cachePreWindow.addAll((Collection<RowMetaAndData>)items);
                this.outputBufferPublisher.onNext(this.cachePreWindow);
            }
        }) : stream.buffer((int)(this.windowEvery > 0L ? this.windowEvery : this.windowSize)).subscribe(items -> {
            if (!this.hasWindow.get()) {
                this.resetFallbackBuffer();
                this.cachePreWindow.addAll((Collection<RowMetaAndData>)items);
                this.outputBufferPublisher.onNext(this.cachePreWindow);
            }
        });
    }

    public List<RowMetaAndData> getCachePreWindow() {
        return this.cachePreWindow;
    }

    public void unSubscribe() {
        this.unSubscribeOutput();
        this.unSubscribeStarter();
        this.unSubscribeBuffer();
        this.unSubscribeFallbackBuffer();
    }

    @VisibleForTesting
    protected void unSubscribeOutput() {
        if (this.outputSubject != null && !this.outputSubject.isDisposed()) {
            this.outputSubject.dispose();
        }
        this.outputSubject = null;
    }

    @VisibleForTesting
    protected void unSubscribeStarter() {
        if (this.starterSubject != null && !this.starterSubject.isDisposed()) {
            this.starterSubject.dispose();
        }
        this.starterSubject = null;
        this.cachePreWindow.clear();
    }

    @VisibleForTesting
    protected void unSubscribeBuffer() {
        if (this.subject != null && !this.subject.isDisposed()) {
            this.subject.dispose();
        }
        this.subject = null;
    }

    @VisibleForTesting
    protected void unSubscribeFallbackBuffer() {
        if (this.fallbackSubject != null && !this.fallbackSubject.isDisposed()) {
            this.fallbackSubject.dispose();
        }
        this.fallbackSubject = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processBufferWindow(List<RowMetaAndData> windowList) {
        Observable<List<RowMetaAndData>> observable = this.buffer;
        synchronized (observable) {
            if (this.hasWindow.compareAndSet(false, true)) {
                this.unSubscribeStarter();
            }
            this.resetFallbackBuffer();
            if (this.outputSubject != null && !this.outputSubject.isDisposed()) {
                this.outputBufferPublisher.onNext(windowList);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processFallbackWindow(List<RowMetaAndData> windowList) {
        Observable<List<RowMetaAndData>> observable = this.buffer;
        synchronized (observable) {
            if (this.hasWindow.compareAndSet(false, true)) {
                this.unSubscribeStarter();
            }
            this.resetBuffer();
            if (this.outputSubject != null && !this.outputSubject.isDisposed()) {
                this.outputBufferPublisher.onNext(windowList);
            }
        }
    }

    private void resetBuffer() {
        this.unSubscribeBuffer();
        this.subject = this.buffer.subscribe(this::processBufferWindow);
    }

    private void resetFallbackBuffer() {
        this.unSubscribeFallbackBuffer();
        this.fallbackSubject = this.fallbackBuffer.subscribe(this::processFallbackWindow);
    }
}

