/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.spark.converter;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.data.Tuple;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;

public class MergeCogroupConverter
implements RDDConverter<Tuple, Tuple, POMergeCogroup> {
    @Override
    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POMergeCogroup physicalOperator) {
        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
        RDD<Tuple> rdd = predecessors.get(0);
        MergeCogroupFunction mergeCogroupFunction = new MergeCogroupFunction(physicalOperator);
        return rdd.toJavaRDD().mapPartitions((FlatMapFunction)mergeCogroupFunction, true).rdd();
    }

    private static class MergeCogroupFunction
    implements FlatMapFunction<Iterator<Tuple>, Tuple>,
    Serializable {
        private POMergeCogroup poMergeCogroup;

        public Iterable<Tuple> call(final Iterator<Tuple> input) throws Exception {
            return new Iterable<Tuple>(){

                @Override
                public Iterator<Tuple> iterator() {
                    return new OutputConsumerIterator(input){

                        @Override
                        protected void attach(Tuple tuple) {
                            MergeCogroupFunction.this.poMergeCogroup.setInputs(null);
                            MergeCogroupFunction.this.poMergeCogroup.attachInput(tuple);
                        }

                        @Override
                        protected Result getNextResult() throws ExecException {
                            return MergeCogroupFunction.this.poMergeCogroup.getNextTuple();
                        }

                        @Override
                        protected void endOfInput() {
                            MergeCogroupFunction.this.poMergeCogroup.setEndOfInput(true);
                        }
                    };
                }
            };
        }

        private MergeCogroupFunction(POMergeCogroup poMergeCogroup) {
            this.poMergeCogroup = poMergeCogroup;
        }
    }
}

