/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;

public class JoinGroupOptimizerSpark
extends SparkOpPlanVisitor {
    private static final Log LOG = LogFactory.getLog(JoinGroupOptimizerSpark.class);

    public JoinGroupOptimizerSpark(SparkOperPlan plan) {
        super(plan, (PlanWalker<SparkOperator, SparkOperPlan>)new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true));
    }

    @Override
    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
        if (sparkOp.physicalPlan != null) {
            GlobalRearrangeDiscover glrDiscover = new GlobalRearrangeDiscover(sparkOp.physicalPlan);
            glrDiscover.visit();
            List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup();
            this.handlePlans(plans, sparkOp);
        }
    }

    private void handlePlans(List<PhysicalPlan> plans, SparkOperator sparkOp) throws VisitorException {
        for (int i = 0; i < plans.size(); ++i) {
            PhysicalPlan planWithJoinAndGroup = plans.get(i);
            POGlobalRearrangeSpark glrSpark = PlanHelper.getPhysicalOperators(planWithJoinAndGroup, POGlobalRearrangeSpark.class).get(0);
            if (!this.verifyJoinOrGroupCase(plans.get(i), glrSpark)) continue;
            try {
                this.restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp);
                continue;
            }
            catch (PlanException e) {
                throw new VisitorException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
            }
        }
    }

    private void restructSparkOp(PhysicalPlan plan, POGlobalRearrangeSpark glaOp, SparkOperator sparkOp) throws PlanException {
        List<POGlobalRearrangeSpark> predes = plan.getPredecessors(glaOp);
        if (predes != null) {
            ArrayList<POLocalRearrange> lraOps = new ArrayList<POLocalRearrange>();
            ArrayList<PhysicalOperator> allPredsOfLRA = new ArrayList<PhysicalOperator>();
            Collections.sort(predes);
            for (PhysicalOperator physicalOperator : predes) {
                lraOps.add((POLocalRearrange)physicalOperator);
                List<PhysicalOperator> predOfLRAList = plan.getPredecessors(physicalOperator);
                if (predOfLRAList == null || predOfLRAList.size() != 1) continue;
                PhysicalOperator predOfLRA = predOfLRAList.get(0);
                plan.disconnect(predOfLRA, physicalOperator);
                allPredsOfLRA.add(predOfLRA);
            }
            POPackage pkgOp = (POPackage)((Object)plan.getSuccessors(glaOp).get(0));
            PhysicalOperator physicalOperator = plan.getSuccessors(pkgOp).get(0);
            POJoinGroupSpark joinSpark = new POJoinGroupSpark(lraOps, glaOp, pkgOp);
            if (allPredsOfLRA.size() > 0) {
                joinSpark.setPredecessors(allPredsOfLRA);
            }
            plan.add(joinSpark);
            for (PhysicalOperator predOfLRA : allPredsOfLRA) {
                plan.connect(predOfLRA, joinSpark);
            }
            plan.disconnect(pkgOp, physicalOperator);
            plan.connect(joinSpark, physicalOperator);
            for (POLocalRearrange lra : lraOps) {
                this.replaceMultiqueryMapping(sparkOp, lra, joinSpark);
                plan.remove(lra);
            }
            plan.remove(glaOp);
            plan.remove(pkgOp);
        }
    }

    private void replaceMultiqueryMapping(SparkOperator sparkOperator, PhysicalOperator from, PhysicalOperator to) {
        MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionItems = sparkOperator.getMultiQueryOptimizeConnectionItem();
        if (multiQueryOptimizeConnectionItems.containsKey(from.getOperatorKey())) {
            List<OperatorKey> value = multiQueryOptimizeConnectionItems.get(from.getOperatorKey());
            multiQueryOptimizeConnectionItems.removeKey(from.getOperatorKey());
            multiQueryOptimizeConnectionItems.put(to.getOperatorKey(), value);
        }
    }

    private boolean verifyJoinOrGroupCase(PhysicalPlan plan, POGlobalRearrangeSpark glaOp) {
        List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp);
        List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp);
        boolean isAllPredecessorLRA = this.isAllPredecessorLRA(lraOps);
        boolean isSuccessorPKG = this.isSuccessorPKG(pkgOps);
        return isAllPredecessorLRA && isSuccessorPKG;
    }

    private boolean isSuccessorPKG(List<PhysicalOperator> pkgOps) {
        boolean result = false;
        if (pkgOps != null && pkgOps.size() == 1) {
            if (pkgOps.get(0) instanceof POPackage) {
                result = true;
            }
        } else {
            result = false;
        }
        return result;
    }

    private boolean isAllPredecessorLRA(List<PhysicalOperator> lraOps) {
        boolean result = true;
        if (lraOps != null) {
            for (PhysicalOperator lraOp : lraOps) {
                if (lraOp instanceof POLocalRearrange) continue;
                result = false;
                break;
            }
        } else {
            result = false;
        }
        return result;
    }

    static class GlobalRearrangeDiscover
    extends PhyPlanVisitor {
        private List<PhysicalPlan> plansWithJoinAndGroup = new ArrayList<PhysicalPlan>();

        public GlobalRearrangeDiscover(PhysicalPlan plan) {
            super(plan, (PlanWalker<PhysicalOperator, PhysicalPlan>)new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
        }

        @Override
        public void visitGlobalRearrange(POGlobalRearrange glr) throws VisitorException {
            PhysicalPlan currentPlan = (PhysicalPlan)this.mCurrentWalker.getPlan();
            if (currentPlan != null) {
                this.plansWithJoinAndGroup.add(currentPlan);
            }
        }

        public List<PhysicalPlan> getPlansWithJoinAndGroup() {
            return this.plansWithJoinAndGroup;
        }
    }
}

