/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.streaming.common;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Observable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.ReplayProcessor;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.streaming.api.StreamSource;
import org.pentaho.di.trans.streaming.common.BaseStreamStep;

public abstract class BlockingQueueStreamSource<T>
implements StreamSource<T> {
    private static final Class<?> PKG = BlockingQueueStreamSource.class;
    private final AtomicBoolean paused = new AtomicBoolean(false);
    private final FlowableProcessor<T> publishProcessor = ReplayProcessor.createWithSize((int)1000);
    protected final BaseStreamStep streamStep;
    @VisibleForTesting
    Semaphore acceptingRowsSemaphore = new Semaphore(1);
    @VisibleForTesting
    LogChannel logChannel = new LogChannel((Object)this);

    protected BlockingQueueStreamSource(BaseStreamStep streamStep) {
        this.streamStep = streamStep;
    }

    @Override
    public Observable<T> observable() {
        return Observable.fromPublisher(this.publishProcessor);
    }

    @Override
    public void close() {
        if (!this.publishProcessor.hasComplete()) {
            this.publishProcessor.onComplete();
        }
    }

    @Override
    public synchronized void pause() {
        if (!this.paused.getAndSet(true)) {
            try {
                assert (this.acceptingRowsSemaphore.availablePermits() == 1);
                this.acceptingRowsSemaphore.acquire();
            }
            catch (InterruptedException e) {
                this.logChannel.logError(BaseMessages.getString(PKG, (String)"BlockingQueueStream.PauseInterrupt", (String[])new String[0]));
            }
        }
    }

    @Override
    public synchronized void resume() {
        if (this.paused.getAndSet(false)) {
            assert (this.acceptingRowsSemaphore.availablePermits() == 0);
            this.acceptingRowsSemaphore.release();
        }
    }

    protected void acceptRows(List<T> rows) {
        try {
            this.acceptingRowsSemaphore.acquire();
            rows.forEach(row -> {
                this.streamStep.incrementLinesInput();
                this.publishProcessor.onNext(row);
            });
        }
        catch (InterruptedException e) {
            this.logChannel.logError(BaseMessages.getString(PKG, (String)"BlockingQueueStream.AcceptRowsInterrupt", (String[])new String[]{Arrays.toString(rows.toArray())}));
        }
        finally {
            this.acceptingRowsSemaphore.release();
        }
    }

    public void error(Throwable throwable) {
        this.publishProcessor.onError(throwable);
    }
}

