/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.bundle.BundleStatusTransitXCommand;
import org.apache.oozie.command.coord.CoordStatusTransitXCommand;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLog;

public class StatusTransitService
implements Service {
    private static final String CONF_PREFIX = "oozie.service.StatusTransitService.";
    private static final String CONF_STATUSTRANSIT_INTERVAL = "oozie.service.StatusTransitService.statusTransit.interval";
    public static final String CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS = "oozie.service.StatusTransitService.backward.support.for.coord.status";
    public static final String CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR = "oozie.service.StatusTransitService.backward.support.for.states.without.error";
    public static int limit = -1;
    public static Date lastInstanceStartTime = null;
    public static final XLog LOG = XLog.getLog(StatusTransitRunnable.class);

    @Override
    public void init(Services services) {
        Configuration conf = services.getConf();
        StatusTransitRunnable stateTransitRunnable = new StatusTransitRunnable();
        services.get(SchedulerService.class).schedule(stateTransitRunnable, 10L, (long)ConfigurationService.getInt(conf, CONF_STATUSTRANSIT_INTERVAL), SchedulerService.Unit.SEC);
    }

    @Override
    public void destroy() {
    }

    @Override
    public Class<? extends Service> getInterface() {
        return StatusTransitService.class;
    }

    public static class StatusTransitRunnable
    implements Runnable {
        private JPAService jpaService = null;
        private LockToken lock;
        private Set<String> coordFailedIds = new HashSet<String>();
        private Set<String> bundleFailedIds = new HashSet<String>();

        public StatusTransitRunnable() {
            this.jpaService = Services.get().get(JPAService.class);
            if (this.jpaService == null) {
                LOG.error("Missing JPAService");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block7: {
                try {
                    Date curDate = new Date();
                    this.lock = Services.get().get(MemoryLocksService.class).getWriteLock(StatusTransitService.class.getName(), Service.lockTimeout);
                    if (this.lock == null) {
                        LOG.info("This StatusTransitService instance will not run since there is already an instance running");
                    } else {
                        LOG.info("Acquired lock for [{0}]", StatusTransitService.class.getName());
                        this.coordTransit();
                        this.bundleTransit();
                        lastInstanceStartTime = curDate;
                    }
                    if (this.lock == null) break block7;
                    this.lock.release();
                }
                catch (Exception ex) {
                    try {
                        LOG.warn((Object)"Exception happened during StatusTransitRunnable ", ex);
                        if (this.lock == null) break block7;
                        this.lock.release();
                    }
                    catch (Throwable throwable) {
                        if (this.lock != null) {
                            this.lock.release();
                            LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
                        }
                        throw throwable;
                    }
                    LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
                }
                LOG.info("Released lock for [{0}]", StatusTransitService.class.getName());
            }
        }

        private void bundleTransit() throws JPAExecutorException, CommandException {
            List<BundleJobBean> pendingJobCheckList;
            HashSet<String> bundleIds = new HashSet<String>();
            if (lastInstanceStartTime == null) {
                LOG.info("Running bundle status service first instance");
                pendingJobCheckList = this.jpaService.execute(new BundleJobsGetRunningOrPendingJPAExecutor(limit));
            } else {
                LOG.info("Running bundle status service from last instance time =  " + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
                pendingJobCheckList = BundleJobQueryExecutor.getInstance().getList(BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime);
            }
            for (BundleJobBean job : pendingJobCheckList) {
                bundleIds.add(job.getId());
            }
            bundleIds.addAll(this.bundleFailedIds);
            this.bundleFailedIds.clear();
            for (String jobId : bundleIds) {
                try {
                    new BundleStatusTransitXCommand(jobId).call();
                }
                catch (CommandException e) {
                    if (e.getErrorCode() == ErrorCode.E0606) {
                        this.bundleFailedIds.add(jobId);
                        LOG.info("Unable to acquire lock for " + jobId + ". Will try next time");
                        continue;
                    }
                    LOG.error((Object)("Error running BundleStatusTransitXCommand for job " + jobId), e);
                }
            }
        }

        private void coordTransit() throws JPAExecutorException, CommandException {
            List<CoordinatorJobBean> pendingJobCheckList = null;
            HashSet<String> coordIds = new HashSet<String>();
            if (lastInstanceStartTime == null) {
                LOG.info("Running coordinator status service first instance");
                pendingJobCheckList = this.jpaService.execute(new CoordJobsGetPendingJPAExecutor(limit));
            } else {
                LOG.info("Running coordinator status service from last instance time =  " + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
                pendingJobCheckList = CoordJobQueryExecutor.getInstance().getList(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_IDS_FOR_STATUS_TRANSIT, lastInstanceStartTime);
                pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime));
            }
            for (CoordinatorJobBean job : pendingJobCheckList) {
                coordIds.add(job.getId());
            }
            coordIds.addAll(this.coordFailedIds);
            this.coordFailedIds.clear();
            for (String coordId : coordIds) {
                try {
                    new CoordStatusTransitXCommand(coordId).call();
                }
                catch (CommandException e) {
                    if (e.getErrorCode() == ErrorCode.E0606) {
                        this.coordFailedIds.add(coordId);
                        LOG.info("Unable to acquire lock for " + coordId + ". Will try next time");
                        continue;
                    }
                    LOG.error((Object)("Error running CoordStatusTransitXCommand for job " + coordId), e);
                }
            }
        }
    }
}

