/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.dataservice.optimization.cache;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleValueException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.dataservice.DataServiceExecutor;
import org.pentaho.di.trans.dataservice.optimization.cache.CachedService;
import org.pentaho.di.trans.step.RowAdapter;
import org.pentaho.di.trans.step.RowListener;
import org.pentaho.di.trans.step.StepAdapter;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepListener;
import org.pentaho.di.trans.step.StepMeta;

public class ServiceObserver
extends AbstractFuture<CachedService>
implements Runnable {
    private final DataServiceExecutor executor;
    List<RowMetaAndData> rowMetaAndData = Lists.newArrayList();
    boolean isRunning = true;
    CountDownLatch latch = new CountDownLatch(1);

    public ServiceObserver(DataServiceExecutor executor) {
        this.executor = executor;
    }

    public Iterator<RowMetaAndData> rows() {
        return new Iterator<RowMetaAndData>(){
            int index = 0;

            @Override
            public boolean hasNext() {
                if (ServiceObserver.this.rowMetaAndData.size() > this.index) {
                    return true;
                }
                if (ServiceObserver.this.isRunning) {
                    ServiceObserver.this.latch = new CountDownLatch(1);
                    try {
                        while (!ServiceObserver.this.latch.await(1L, TimeUnit.SECONDS)) {
                            if (ServiceObserver.this.isRunning) continue;
                            return ServiceObserver.this.rowMetaAndData.size() > this.index;
                        }
                        return ServiceObserver.this.rowMetaAndData.size() > this.index;
                    }
                    catch (InterruptedException e) {
                        return ServiceObserver.this.rowMetaAndData.size() > this.index;
                    }
                }
                return ServiceObserver.this.rowMetaAndData.size() > this.index;
            }

            @Override
            public RowMetaAndData next() {
                return ServiceObserver.this.rowMetaAndData.get(this.index++);
            }
        };
    }

    public ListenableFuture<CachedService> install() {
        List serviceReady = this.executor.getListenerMap().get((Object)DataServiceExecutor.ExecutionPoint.READY);
        if (Iterables.any((Iterable)serviceReady, (Predicate)Predicates.instanceOf(ServiceObserver.class))) {
            this.setException(new IllegalStateException("More than one cache is configured for this data service."));
        } else {
            serviceReady.add(this);
        }
        return this;
    }

    @Override
    public void run() {
        StepInterface serviceStep = this.executor.getServiceTrans().findRunThread(this.executor.getService().getStepname());
        serviceStep.addRowListener((RowListener)new RowAdapter(){

            public synchronized void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row) {
                Object[] clonedRow;
                try {
                    clonedRow = rowMeta.cloneRow(row);
                }
                catch (KettleValueException e) {
                    ServiceObserver.this.setException(e);
                    return;
                }
                ServiceObserver.this.rowMetaAndData.add(new RowMetaAndData(rowMeta, clonedRow));
                ServiceObserver.this.latch.countDown();
            }
        });
        serviceStep.addStepListener((StepListener)new StepAdapter(){

            public void stepFinished(Trans trans, StepMeta stepMeta, StepInterface step) {
                ServiceObserver.this.isRunning = false;
                if (ServiceObserver.this.executor.getGenTrans().getErrors() > 0) {
                    ServiceObserver.this.setException(new KettleException("Dynamic transformation finished with errors, could not cache results"));
                } else if (step.isStopped()) {
                    ServiceObserver.this.set(CachedService.partial(ServiceObserver.this.rowMetaAndData, ServiceObserver.this.executor));
                } else {
                    ServiceObserver.this.set(CachedService.complete(ServiceObserver.this.rowMetaAndData));
                }
            }
        });
    }
}

