/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.jobhistory;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.EventWriter;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobQueueChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;

public class JobHistoryEventHandler
extends AbstractService
implements EventHandler<JobHistoryEvent> {
    private final AppContext context;
    private final int startCount;
    private int eventCounter;
    private FileSystem stagingDirFS;
    private FileSystem doneDirFS;
    private Path stagingDirPath = null;
    private Path doneDirPrefixPath = null;
    private int maxUnflushedCompletionEvents;
    private int postJobCompletionMultiplier;
    private long flushTimeout;
    private int minQueueSizeForBatchingFlushes;
    private int numUnflushedCompletionEvents = 0;
    private boolean isTimerActive;
    protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>();
    protected Thread eventHandlingThread;
    private volatile boolean stopped;
    private final Object lock = new Object();
    private static final Log LOG = LogFactory.getLog(JobHistoryEventHandler.class);
    protected static final Map<JobId, MetaInfo> fileMap = Collections.synchronizedMap(new HashMap());
    protected volatile boolean forceJobCompletion = false;
    protected TimelineClient timelineClient;
    private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
    private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";

    public JobHistoryEventHandler(AppContext context, int startCount) {
        super("JobHistoryEventHandler");
        this.context = context;
        this.startCount = startCount;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        String userDoneDirStr;
        block13: {
            String jobId = TypeConverter.fromYarn((ApplicationId)this.context.getApplicationID()).toString();
            String stagingDirStr = null;
            String doneDirStr = null;
            userDoneDirStr = null;
            try {
                stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix((Configuration)conf, (String)jobId);
                doneDirStr = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix((Configuration)conf);
                userDoneDirStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser((Configuration)conf);
            }
            catch (IOException e) {
                LOG.error((Object)"Failed while getting the configured log directories", (Throwable)e);
                throw new YarnRuntimeException((Throwable)e);
            }
            try {
                this.stagingDirPath = FileContext.getFileContext((Configuration)conf).makeQualified(new Path(stagingDirStr));
                this.stagingDirFS = FileSystem.get((URI)this.stagingDirPath.toUri(), (Configuration)conf);
                this.mkdir(this.stagingDirFS, this.stagingDirPath, new FsPermission(JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
            }
            catch (IOException e) {
                LOG.error((Object)("Failed while checking for/creating  history staging path: [" + this.stagingDirPath + "]"), (Throwable)e);
                throw new YarnRuntimeException((Throwable)e);
            }
            Path doneDirPath = null;
            try {
                doneDirPath = FileContext.getFileContext((Configuration)conf).makeQualified(new Path(doneDirStr));
                this.doneDirFS = FileSystem.get((URI)doneDirPath.toUri(), (Configuration)conf);
                if (this.doneDirFS.exists(doneDirPath)) break block13;
                if (JobHistoryUtils.shouldCreateNonUserDirectory((Configuration)conf)) {
                    LOG.info((Object)("Creating intermediate history logDir: [" + doneDirPath + "] + based on conf. Should ideally be created by the JobHistoryServer: " + "yarn.app.mapreduce.am.create-intermediate-jh-base-dir"));
                    this.mkdir(this.doneDirFS, doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
                    break block13;
                }
                String message = "Not creating intermediate history logDir: [" + doneDirPath + "] based on conf: " + "yarn.app.mapreduce.am.create-intermediate-jh-base-dir" + ". Either set to true or pre-create this directory with" + " appropriate permissions";
                LOG.error((Object)message);
                throw new YarnRuntimeException(message);
            }
            catch (IOException e) {
                LOG.error((Object)("Failed checking for the existance of history intermediate done directory: [" + doneDirPath + "]"));
                throw new YarnRuntimeException((Throwable)e);
            }
        }
        try {
            this.doneDirPrefixPath = FileContext.getFileContext((Configuration)conf).makeQualified(new Path(userDoneDirStr));
            this.mkdir(this.doneDirFS, this.doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
        }
        catch (IOException e) {
            LOG.error((Object)("Error creating user intermediate history done directory: [ " + this.doneDirPrefixPath + "]"), (Throwable)e);
            throw new YarnRuntimeException((Throwable)e);
        }
        this.maxUnflushedCompletionEvents = conf.getInt("yarn.app.mapreduce.am.history.max-unflushed-events", 200);
        this.postJobCompletionMultiplier = conf.getInt("yarn.app.mapreduce.am.history.job-complete-unflushed-multiplier", 30);
        this.flushTimeout = conf.getLong("yarn.app.mapreduce.am.history.complete-event-flush-timeout", 30000L);
        this.minQueueSizeForBatchingFlushes = conf.getInt("yarn.app.mapreduce.am.history.use-batched-flush.queue-size.threshold", 50);
        if (conf.getBoolean("mapreduce.job.emit-timeline-data", false)) {
            if (conf.getBoolean("yarn.timeline-service.enabled", false)) {
                this.timelineClient = TimelineClient.createTimelineClient();
                this.timelineClient.init(conf);
                LOG.info((Object)"Timeline service is enabled");
                LOG.info((Object)"Emitting job history data to the timeline server is enabled");
            } else {
                LOG.info((Object)"Timeline service is not enabled");
            }
        } else {
            LOG.info((Object)"Emitting job history data to the timeline server is not enabled");
        }
        super.serviceInit(conf);
    }

    private void mkdir(FileSystem fs, Path path, FsPermission fsp) throws IOException {
        if (!fs.exists(path)) {
            try {
                fs.mkdirs(path, fsp);
                FileStatus fsStatus = fs.getFileStatus(path);
                LOG.info((Object)("Perms after creating " + fsStatus.getPermission().toShort() + ", Expected: " + fsp.toShort()));
                if (fsStatus.getPermission().toShort() != fsp.toShort()) {
                    LOG.info((Object)("Explicitly setting permissions to : " + fsp.toShort() + ", " + fsp));
                    fs.setPermission(path, fsp);
                }
            }
            catch (FileAlreadyExistsException e) {
                LOG.info((Object)("Directory: [" + path + "] already exists."));
            }
        }
    }

    protected void serviceStart() throws Exception {
        if (this.timelineClient != null) {
            this.timelineClient.start();
        }
        this.eventHandlingThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                JobHistoryEvent event = null;
                while (!JobHistoryEventHandler.this.stopped && !Thread.currentThread().isInterrupted()) {
                    if (JobHistoryEventHandler.this.eventCounter != 0 && JobHistoryEventHandler.this.eventCounter % 1000 == 0) {
                        JobHistoryEventHandler.this.eventCounter = 0;
                        LOG.info((Object)("Size of the JobHistory event queue is " + JobHistoryEventHandler.this.eventQueue.size()));
                    } else {
                        JobHistoryEventHandler.this.eventCounter++;
                    }
                    try {
                        event = JobHistoryEventHandler.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)"EventQueue take interrupted. Returning");
                        return;
                    }
                    Object object = JobHistoryEventHandler.this.lock;
                    synchronized (object) {
                        boolean isInterrupted = Thread.interrupted();
                        JobHistoryEventHandler.this.handleEvent(event);
                        if (isInterrupted) {
                            LOG.debug((Object)"Event handling interrupted");
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            }
        }, "eventHandlingThread");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void serviceStop() throws Exception {
        LOG.info((Object)("Stopping JobHistoryEventHandler. Size of the outstanding queue size is " + this.eventQueue.size()));
        this.stopped = true;
        Object object = this.lock;
        synchronized (object) {
            if (this.eventHandlingThread != null) {
                LOG.debug((Object)"Interrupting Event Handling thread");
                this.eventHandlingThread.interrupt();
            } else {
                LOG.debug((Object)"Null event handling thread");
            }
        }
        try {
            if (this.eventHandlingThread != null) {
                LOG.debug((Object)"Waiting for Event Handling thread to complete");
                this.eventHandlingThread.join();
            }
        }
        catch (InterruptedException ie) {
            LOG.info((Object)"Interrupted Exception while stopping", (Throwable)ie);
        }
        for (MetaInfo mi : fileMap.values()) {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Shutting down timer for " + mi));
                }
                mi.shutDownTimer();
            }
            catch (IOException iOException) {
                LOG.info((Object)("Exception while cancelling delayed flush timer. Likely caused by a failed flush " + iOException.getMessage()));
            }
        }
        for (JobHistoryEvent ev : this.eventQueue) {
            LOG.info((Object)("In stop, writing event " + ev.getType()));
            this.handleEvent(ev);
        }
        if (this.forceJobCompletion) {
            for (Map.Entry entry : fileMap.entrySet()) {
                JobId toClose = (JobId)entry.getKey();
                MetaInfo mi = (MetaInfo)entry.getValue();
                if (mi == null || !mi.isWriterActive()) continue;
                LOG.warn((Object)("Found jobId " + toClose + " to have not been closed. Will close"));
                Job job = this.context.getJob(toClose);
                JobUnsuccessfulCompletionEvent jucEvent = new JobUnsuccessfulCompletionEvent((JobID)TypeConverter.fromYarn((JobId)toClose), System.currentTimeMillis(), job.getCompletedMaps(), job.getCompletedReduces(), this.createJobStateForJobUnsuccessfulCompletionEvent(mi.getForcedJobStateOnShutDown()), job.getDiagnostics());
                JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, (HistoryEvent)jucEvent);
                this.handleEvent(jfEvent);
            }
        }
        for (MetaInfo metaInfo : fileMap.values()) {
            try {
                metaInfo.closeWriter();
            }
            catch (IOException e) {
                LOG.info((Object)("Exception while closing file " + e.getMessage()));
            }
        }
        if (this.timelineClient != null) {
            this.timelineClient.stop();
        }
        LOG.info((Object)"Stopped JobHistoryEventHandler. super.stop()");
        super.serviceStop();
    }

    protected EventWriter createEventWriter(Path historyFilePath) throws IOException {
        FSDataOutputStream out = this.stagingDirFS.create(historyFilePath, true);
        return new EventWriter(out);
    }

    protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent) throws IOException {
        if (this.stagingDirPath == null) {
            LOG.error((Object)"Log Directory is null, returning");
            throw new IOException("Missing Log Directory for History");
        }
        MetaInfo oldFi = fileMap.get(jobId);
        Configuration conf = this.getConfig();
        Path historyFile = JobHistoryUtils.getStagingJobHistoryFile((Path)this.stagingDirPath, (JobId)jobId, (int)this.startCount);
        String user = UserGroupInformation.getCurrentUser().getShortUserName();
        if (user == null) {
            throw new IOException("User is null while setting up jobhistory eventwriter");
        }
        String jobName = this.context.getJob(jobId).getName();
        EventWriter writer = oldFi == null ? null : oldFi.writer;
        Path logDirConfPath = JobHistoryUtils.getStagingConfFile((Path)this.stagingDirPath, (JobId)jobId, (int)this.startCount);
        if (writer == null) {
            try {
                writer = this.createEventWriter(historyFile);
                LOG.info((Object)("Event Writer setup for JobId: " + jobId + ", File: " + historyFile));
            }
            catch (IOException ioe) {
                LOG.info((Object)("Could not create log file: [" + historyFile + "] + for job " + "[" + jobName + "]"));
                throw ioe;
            }
            if (conf != null) {
                FSDataOutputStream jobFileOut = null;
                try {
                    if (logDirConfPath != null) {
                        jobFileOut = this.stagingDirFS.create(logDirConfPath, true);
                        conf.writeXml((OutputStream)jobFileOut);
                        jobFileOut.close();
                    }
                }
                catch (IOException e) {
                    LOG.info((Object)"Failed to write the job configuration file", (Throwable)e);
                    throw e;
                }
            }
        }
        String queueName = "default";
        if (conf != null) {
            queueName = conf.get("mapreduce.job.queuename", "default");
        }
        MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(), queueName);
        fi.getJobSummary().setJobId(jobId);
        fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime());
        fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime());
        fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime());
        fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime());
        fileMap.put(jobId, fi);
    }

    public void closeWriter(JobId id) throws IOException {
        try {
            MetaInfo mi = fileMap.get(id);
            if (mi != null) {
                mi.closeWriter();
            }
        }
        catch (IOException e) {
            LOG.error((Object)("Error closing writer for JobID: " + id));
            throw e;
        }
    }

    public void handle(JobHistoryEvent event) {
        try {
            if (this.isJobCompletionEvent(event.getHistoryEvent())) {
                this.maxUnflushedCompletionEvents *= this.postJobCompletionMultiplier;
            }
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnRuntimeException((Throwable)e);
        }
    }

    private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
        return EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED, EventType.JOB_KILLED).contains(historyEvent.getEventType());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @InterfaceAudience.Private
    public void handleEvent(JobHistoryEvent event) {
        Object object = this.lock;
        synchronized (object) {
            JobUnsuccessfulCompletionEvent jucEvent;
            if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
                try {
                    AMStartedEvent amStartedEvent = (AMStartedEvent)event.getHistoryEvent();
                    this.setupEventWriter(event.getJobID(), amStartedEvent);
                }
                catch (IOException ioe) {
                    LOG.error((Object)("Error JobHistoryEventHandler in handleEvent: " + (Object)((Object)event)), (Throwable)ioe);
                    throw new YarnRuntimeException((Throwable)ioe);
                }
            }
            MetaInfo mi = fileMap.get(event.getJobID());
            try {
                HistoryEvent historyEvent = event.getHistoryEvent();
                if (!(historyEvent instanceof NormalizedResourceEvent)) {
                    mi.writeEvent(historyEvent);
                }
                this.processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID());
                if (this.timelineClient != null) {
                    this.processEventForTimelineServer(historyEvent, event.getJobID(), event.getTimestamp());
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("In HistoryEventHandler " + event.getHistoryEvent().getEventType()));
                }
            }
            catch (IOException e) {
                LOG.error((Object)("Error writing History Event: " + event.getHistoryEvent()), (Throwable)e);
                throw new YarnRuntimeException((Throwable)e);
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
                JobSubmittedEvent jobSubmittedEvent = (JobSubmittedEvent)event.getHistoryEvent();
                mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
                mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) {
                JobInitedEvent jie = (JobInitedEvent)event.getHistoryEvent();
                mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime());
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_QUEUE_CHANGED) {
                JobQueueChangeEvent jQueueEvent = (JobQueueChangeEvent)event.getHistoryEvent();
                mi.getJobIndexInfo().setQueueName(jQueueEvent.getJobQueueName());
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
                try {
                    JobFinishedEvent jFinishedEvent = (JobFinishedEvent)event.getHistoryEvent();
                    mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
                    mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
                    mi.getJobIndexInfo().setNumReduces(jFinishedEvent.getFinishedReduces());
                    mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
                    this.closeEventWriter(event.getJobID());
                    this.processDoneFiles(event.getJobID());
                }
                catch (IOException e) {
                    throw new YarnRuntimeException((Throwable)e);
                }
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
                try {
                    jucEvent = (JobUnsuccessfulCompletionEvent)event.getHistoryEvent();
                    mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
                    mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
                    mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
                    mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
                    this.closeEventWriter(event.getJobID());
                    if (this.context.isLastAMRetry()) {
                        this.processDoneFiles(event.getJobID());
                    }
                }
                catch (IOException e) {
                    throw new YarnRuntimeException((Throwable)e);
                }
            }
            if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
                try {
                    jucEvent = (JobUnsuccessfulCompletionEvent)event.getHistoryEvent();
                    mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
                    mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
                    mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
                    mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
                    this.closeEventWriter(event.getJobID());
                    this.processDoneFiles(event.getJobID());
                }
                catch (IOException e) {
                    throw new YarnRuntimeException((Throwable)e);
                }
            }
        }
    }

    public void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) {
        switch (event.getEventType()) {
            case JOB_SUBMITTED: {
                JobSubmittedEvent jse = (JobSubmittedEvent)event;
                summary.setUser(jse.getUserName());
                summary.setQueue(jse.getJobQueueName());
                summary.setJobSubmitTime(jse.getSubmitTime());
                summary.setJobName(jse.getJobName());
                break;
            }
            case NORMALIZED_RESOURCE: {
                NormalizedResourceEvent normalizedResourceEvent = (NormalizedResourceEvent)event;
                if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
                    summary.setResourcesPerMap((int)normalizedResourceEvent.getMemory());
                    break;
                }
                if (normalizedResourceEvent.getTaskType() != TaskType.REDUCE) break;
                summary.setResourcesPerReduce((int)normalizedResourceEvent.getMemory());
                break;
            }
            case JOB_INITED: {
                JobInitedEvent jie = (JobInitedEvent)event;
                summary.setJobLaunchTime(jie.getLaunchTime());
                break;
            }
            case MAP_ATTEMPT_STARTED: {
                TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent)event;
                if (summary.getFirstMapTaskLaunchTime() != 0L) break;
                summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
                break;
            }
            case REDUCE_ATTEMPT_STARTED: {
                TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent)event;
                if (summary.getFirstReduceTaskLaunchTime() != 0L) break;
                summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
                break;
            }
            case JOB_FINISHED: {
                JobFinishedEvent jfe = (JobFinishedEvent)event;
                summary.setJobFinishTime(jfe.getFinishTime());
                summary.setNumFinishedMaps(jfe.getFinishedMaps());
                summary.setNumFailedMaps(jfe.getFailedMaps());
                summary.setNumFinishedReduces(jfe.getFinishedReduces());
                summary.setNumFailedReduces(jfe.getFailedReduces());
                if (summary.getJobStatus() == null) {
                    summary.setJobStatus(JobStatus.State.SUCCEEDED.toString());
                }
                this.setSummarySlotSeconds(summary, jfe.getTotalCounters());
                break;
            }
            case JOB_FAILED: 
            case JOB_KILLED: {
                JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent)event;
                summary.setJobStatus(juce.getStatus());
                summary.setNumFinishedMaps(this.context.getJob(jobId).getTotalMaps());
                summary.setNumFinishedReduces(this.context.getJob(jobId).getTotalReduces());
                summary.setJobFinishTime(juce.getFinishTime());
                this.setSummarySlotSeconds(summary, this.context.getJob(jobId).getAllCounters());
                break;
            }
        }
    }

    private void processEventForTimelineServer(HistoryEvent event, JobId jobId, long timestamp) {
        TimelineEvent tEvent = new TimelineEvent();
        tEvent.setEventType(StringUtils.toUpperCase((String)event.getEventType().name()));
        tEvent.setTimestamp(timestamp);
        TimelineEntity tEntity = new TimelineEntity();
        switch (event.getEventType()) {
            case JOB_SUBMITTED: {
                JobSubmittedEvent jse = (JobSubmittedEvent)event;
                tEvent.addEventInfo("SUBMIT_TIME", (Object)jse.getSubmitTime());
                tEvent.addEventInfo("QUEUE_NAME", (Object)jse.getJobQueueName());
                tEvent.addEventInfo("JOB_NAME", (Object)jse.getJobName());
                tEvent.addEventInfo("USER_NAME", (Object)jse.getUserName());
                tEvent.addEventInfo("JOB_CONF_PATH", (Object)jse.getJobConfPath());
                tEvent.addEventInfo("ACLS", (Object)jse.getJobAcls());
                tEvent.addEventInfo("JOB_QUEUE_NAME", (Object)jse.getJobQueueName());
                tEvent.addEventInfo("WORKLFOW_ID", (Object)jse.getWorkflowId());
                tEvent.addEventInfo("WORKFLOW_NAME", (Object)jse.getWorkflowName());
                tEvent.addEventInfo("WORKFLOW_NAME_NAME", (Object)jse.getWorkflowNodeName());
                tEvent.addEventInfo("WORKFLOW_ADJACENCIES", (Object)jse.getWorkflowAdjacencies());
                tEvent.addEventInfo("WORKFLOW_TAGS", (Object)jse.getWorkflowTags());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_STATUS_CHANGED: {
                JobStatusChangedEvent jsce = (JobStatusChangedEvent)event;
                tEvent.addEventInfo("STATUS", (Object)jsce.getStatus());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_INFO_CHANGED: {
                JobInfoChangeEvent jice = (JobInfoChangeEvent)event;
                tEvent.addEventInfo("SUBMIT_TIME", (Object)jice.getSubmitTime());
                tEvent.addEventInfo("LAUNCH_TIME", (Object)jice.getLaunchTime());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_INITED: {
                JobInitedEvent jie = (JobInitedEvent)event;
                tEvent.addEventInfo("START_TIME", (Object)jie.getLaunchTime());
                tEvent.addEventInfo("STATUS", (Object)jie.getStatus());
                tEvent.addEventInfo("TOTAL_MAPS", (Object)jie.getTotalMaps());
                tEvent.addEventInfo("TOTAL_REDUCES", (Object)jie.getTotalReduces());
                tEvent.addEventInfo("UBERIZED", (Object)jie.getUberized());
                tEntity.setStartTime(Long.valueOf(jie.getLaunchTime()));
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_PRIORITY_CHANGED: {
                JobPriorityChangeEvent jpce = (JobPriorityChangeEvent)event;
                tEvent.addEventInfo("PRIORITY", (Object)jpce.getPriority().toString());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_QUEUE_CHANGED: {
                JobQueueChangeEvent jqe = (JobQueueChangeEvent)event;
                tEvent.addEventInfo("QUEUE_NAMES", (Object)jqe.getJobQueueName());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_FAILED: 
            case JOB_KILLED: 
            case JOB_ERROR: {
                JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent)event;
                tEvent.addEventInfo("FINISH_TIME", (Object)juce.getFinishTime());
                tEvent.addEventInfo("NUM_MAPS", (Object)juce.getFinishedMaps());
                tEvent.addEventInfo("NUM_REDUCES", (Object)juce.getFinishedReduces());
                tEvent.addEventInfo("JOB_STATUS", (Object)juce.getStatus());
                tEvent.addEventInfo("DIAGNOSTICS", (Object)juce.getDiagnostics());
                tEvent.addEventInfo("FINISHED_MAPS", (Object)juce.getFinishedMaps());
                tEvent.addEventInfo("FINISHED_REDUCES", (Object)juce.getFinishedReduces());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case JOB_FINISHED: {
                JobFinishedEvent jfe = (JobFinishedEvent)event;
                tEvent.addEventInfo("FINISH_TIME", (Object)jfe.getFinishTime());
                tEvent.addEventInfo("NUM_MAPS", (Object)jfe.getFinishedMaps());
                tEvent.addEventInfo("NUM_REDUCES", (Object)jfe.getFinishedReduces());
                tEvent.addEventInfo("FAILED_MAPS", (Object)jfe.getFailedMaps());
                tEvent.addEventInfo("FAILED_REDUCES", (Object)jfe.getFailedReduces());
                tEvent.addEventInfo("FINISHED_MAPS", (Object)jfe.getFinishedMaps());
                tEvent.addEventInfo("FINISHED_REDUCES", (Object)jfe.getFinishedReduces());
                tEvent.addEventInfo("MAP_COUNTERS_GROUPS", (Object)this.countersToJSON(jfe.getMapCounters()));
                tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS", (Object)this.countersToJSON(jfe.getReduceCounters()));
                tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS", (Object)this.countersToJSON(jfe.getTotalCounters()));
                tEvent.addEventInfo("JOB_STATUS", (Object)JobState.SUCCEEDED.toString());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
            case TASK_STARTED: {
                TaskStartedEvent tse = (TaskStartedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)tse.getTaskType().toString());
                tEvent.addEventInfo("START_TIME", (Object)tse.getStartTime());
                tEvent.addEventInfo("SPLIT_LOCATIONS", (Object)tse.getSplitLocations());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tse.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case TASK_FAILED: {
                TaskFailedEvent tfe = (TaskFailedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)tfe.getTaskType().toString());
                tEvent.addEventInfo("STATUS", (Object)TaskStatus.State.FAILED.toString());
                tEvent.addEventInfo("FINISH_TIME", (Object)tfe.getFinishTime());
                tEvent.addEventInfo("ERROR", (Object)tfe.getError());
                tEvent.addEventInfo("FAILED_ATTEMPT_ID", (Object)(tfe.getFailedAttemptID() == null ? "" : tfe.getFailedAttemptID().toString()));
                tEvent.addEventInfo("COUNTERS_GROUPS", (Object)this.countersToJSON(tfe.getCounters()));
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tfe.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case TASK_UPDATED: {
                TaskUpdatedEvent tue = (TaskUpdatedEvent)event;
                tEvent.addEventInfo("FINISH_TIME", (Object)tue.getFinishTime());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tue.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case TASK_FINISHED: {
                TaskFinishedEvent tfe2 = (TaskFinishedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)tfe2.getTaskType().toString());
                tEvent.addEventInfo("COUNTERS_GROUPS", (Object)this.countersToJSON(tfe2.getCounters()));
                tEvent.addEventInfo("FINISH_TIME", (Object)tfe2.getFinishTime());
                tEvent.addEventInfo("STATUS", (Object)TaskStatus.State.SUCCEEDED.toString());
                tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID", (Object)(tfe2.getSuccessfulTaskAttemptId() == null ? "" : tfe2.getSuccessfulTaskAttemptId().toString()));
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tfe2.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case MAP_ATTEMPT_STARTED: 
            case REDUCE_ATTEMPT_STARTED: 
            case CLEANUP_ATTEMPT_STARTED: 
            case SETUP_ATTEMPT_STARTED: {
                TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)tase.getTaskType().toString());
                tEvent.addEventInfo("TASK_ATTEMPT_ID", (Object)tase.getTaskAttemptId().toString());
                tEvent.addEventInfo("START_TIME", (Object)tase.getStartTime());
                tEvent.addEventInfo("HTTP_PORT", (Object)tase.getHttpPort());
                tEvent.addEventInfo("TRACKER_NAME", (Object)tase.getTrackerName());
                tEvent.addEventInfo("TASK_TYPE", (Object)tase.getTaskType().toString());
                tEvent.addEventInfo("SHUFFLE_PORT", (Object)tase.getShufflePort());
                tEvent.addEventInfo("CONTAINER_ID", (Object)(tase.getContainerId() == null ? "" : tase.getContainerId().toString()));
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tase.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case MAP_ATTEMPT_FAILED: 
            case CLEANUP_ATTEMPT_FAILED: 
            case REDUCE_ATTEMPT_FAILED: 
            case SETUP_ATTEMPT_FAILED: 
            case MAP_ATTEMPT_KILLED: 
            case CLEANUP_ATTEMPT_KILLED: 
            case REDUCE_ATTEMPT_KILLED: 
            case SETUP_ATTEMPT_KILLED: {
                TaskAttemptUnsuccessfulCompletionEvent tauce = (TaskAttemptUnsuccessfulCompletionEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)tauce.getTaskType().toString());
                tEvent.addEventInfo("TASK_ATTEMPT_ID", (Object)(tauce.getTaskAttemptId() == null ? "" : tauce.getTaskAttemptId().toString()));
                tEvent.addEventInfo("FINISH_TIME", (Object)tauce.getFinishTime());
                tEvent.addEventInfo("ERROR", (Object)tauce.getError());
                tEvent.addEventInfo("STATUS", (Object)tauce.getTaskStatus());
                tEvent.addEventInfo("HOSTNAME", (Object)tauce.getHostname());
                tEvent.addEventInfo("PORT", (Object)tauce.getPort());
                tEvent.addEventInfo("RACK_NAME", (Object)tauce.getRackName());
                tEvent.addEventInfo("SHUFFLE_FINISH_TIME", (Object)tauce.getFinishTime());
                tEvent.addEventInfo("SORT_FINISH_TIME", (Object)tauce.getFinishTime());
                tEvent.addEventInfo("MAP_FINISH_TIME", (Object)tauce.getFinishTime());
                tEvent.addEventInfo("COUNTERS_GROUPS", (Object)this.countersToJSON(tauce.getCounters()));
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tauce.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case MAP_ATTEMPT_FINISHED: {
                MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)mafe.getTaskType().toString());
                tEvent.addEventInfo("FINISH_TIME", (Object)mafe.getFinishTime());
                tEvent.addEventInfo("STATUS", (Object)mafe.getTaskStatus());
                tEvent.addEventInfo("STATE", (Object)mafe.getState());
                tEvent.addEventInfo("MAP_FINISH_TIME", (Object)mafe.getMapFinishTime());
                tEvent.addEventInfo("COUNTERS_GROUPS", (Object)this.countersToJSON(mafe.getCounters()));
                tEvent.addEventInfo("HOSTNAME", (Object)mafe.getHostname());
                tEvent.addEventInfo("PORT", (Object)mafe.getPort());
                tEvent.addEventInfo("RACK_NAME", (Object)mafe.getRackName());
                tEvent.addEventInfo("ATTEMPT_ID", (Object)(mafe.getAttemptId() == null ? "" : mafe.getAttemptId().toString()));
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(mafe.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case REDUCE_ATTEMPT_FINISHED: {
                ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)rafe.getTaskType().toString());
                tEvent.addEventInfo("ATTEMPT_ID", (Object)(rafe.getAttemptId() == null ? "" : rafe.getAttemptId().toString()));
                tEvent.addEventInfo("FINISH_TIME", (Object)rafe.getFinishTime());
                tEvent.addEventInfo("STATUS", (Object)rafe.getTaskStatus());
                tEvent.addEventInfo("STATE", (Object)rafe.getState());
                tEvent.addEventInfo("SHUFFLE_FINISH_TIME", (Object)rafe.getShuffleFinishTime());
                tEvent.addEventInfo("SORT_FINISH_TIME", (Object)rafe.getSortFinishTime());
                tEvent.addEventInfo("COUNTERS_GROUPS", (Object)this.countersToJSON(rafe.getCounters()));
                tEvent.addEventInfo("HOSTNAME", (Object)rafe.getHostname());
                tEvent.addEventInfo("PORT", (Object)rafe.getPort());
                tEvent.addEventInfo("RACK_NAME", (Object)rafe.getRackName());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(rafe.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case SETUP_ATTEMPT_FINISHED: 
            case CLEANUP_ATTEMPT_FINISHED: {
                TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent)event;
                tEvent.addEventInfo("TASK_TYPE", (Object)tafe.getTaskType().toString());
                tEvent.addEventInfo("ATTEMPT_ID", (Object)(tafe.getAttemptId() == null ? "" : tafe.getAttemptId().toString()));
                tEvent.addEventInfo("FINISH_TIME", (Object)tafe.getFinishTime());
                tEvent.addEventInfo("STATUS", (Object)tafe.getTaskStatus());
                tEvent.addEventInfo("STATE", (Object)tafe.getState());
                tEvent.addEventInfo("COUNTERS_GROUPS", (Object)this.countersToJSON(tafe.getCounters()));
                tEvent.addEventInfo("HOSTNAME", (Object)tafe.getHostname());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(tafe.getTaskId().toString());
                tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
                tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
                break;
            }
            case AM_STARTED: {
                AMStartedEvent ase = (AMStartedEvent)event;
                tEvent.addEventInfo("APPLICATION_ATTEMPT_ID", (Object)(ase.getAppAttemptId() == null ? "" : ase.getAppAttemptId().toString()));
                tEvent.addEventInfo("CONTAINER_ID", (Object)(ase.getContainerId() == null ? "" : ase.getContainerId().toString()));
                tEvent.addEventInfo("NODE_MANAGER_HOST", (Object)ase.getNodeManagerHost());
                tEvent.addEventInfo("NODE_MANAGER_PORT", (Object)ase.getNodeManagerPort());
                tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT", (Object)ase.getNodeManagerHttpPort());
                tEvent.addEventInfo("START_TIME", (Object)ase.getStartTime());
                tEvent.addEventInfo("SUBMIT_TIME", (Object)ase.getSubmitTime());
                tEntity.addEvent(tEvent);
                tEntity.setEntityId(jobId.toString());
                tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
                break;
            }
        }
        try {
            this.timelineClient.putEntities(new TimelineEntity[]{tEntity});
        }
        catch (IOException ex) {
            LOG.error((Object)("Error putting entity " + tEntity.getEntityId() + " to Timeline" + "Server"), (Throwable)ex);
        }
        catch (YarnException ex) {
            LOG.error((Object)("Error putting entity " + tEntity.getEntityId() + " to Timeline" + "Server"), (Throwable)ex);
        }
    }

    @InterfaceAudience.Private
    public JsonNode countersToJSON(Counters counters) {
        ObjectMapper mapper = new ObjectMapper();
        ArrayNode nodes = mapper.createArrayNode();
        if (counters != null) {
            for (CounterGroup counterGroup : counters) {
                ObjectNode groupNode = nodes.addObject();
                groupNode.put("NAME", counterGroup.getName());
                groupNode.put("DISPLAY_NAME", counterGroup.getDisplayName());
                ArrayNode countersNode = groupNode.putArray("COUNTERS");
                for (Counter counter : counterGroup) {
                    ObjectNode counterNode = countersNode.addObject();
                    counterNode.put("NAME", counter.getName());
                    counterNode.put("DISPLAY_NAME", counter.getDisplayName());
                    counterNode.put("VALUE", counter.getValue());
                }
            }
        }
        return nodes;
    }

    private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
        Counter slotMillisReduceCounter;
        Counter slotMillisMapCounter = allCounters.findCounter((Enum)JobCounter.SLOTS_MILLIS_MAPS);
        if (slotMillisMapCounter != null) {
            summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000L);
        }
        if ((slotMillisReduceCounter = allCounters.findCounter((Enum)JobCounter.SLOTS_MILLIS_REDUCES)) != null) {
            summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000L);
        }
    }

    protected void closeEventWriter(JobId jobId) throws IOException {
        MetaInfo mi = fileMap.get(jobId);
        if (mi == null) {
            throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
        }
        if (!mi.isWriterActive()) {
            throw new IOException("Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: [" + jobId + "]");
        }
        try {
            mi.closeWriter();
        }
        catch (IOException e) {
            LOG.error((Object)("Error closing writer for JobID: " + jobId));
            throw e;
        }
    }

    protected void processDoneFiles(JobId jobId) throws IOException {
        MetaInfo mi = fileMap.get(jobId);
        if (mi == null) {
            throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
        }
        if (mi.getHistoryFile() == null) {
            LOG.warn((Object)("No file for job-history with " + jobId + " found in cache!"));
        }
        if (mi.getConfFile() == null) {
            LOG.warn((Object)("No file for jobconf with " + jobId + " found in cache!"));
        }
        Path qualifiedSummaryDoneFile = null;
        FSDataOutputStream summaryFileOut = null;
        try {
            String doneSummaryFileName = this.getTempFileName(JobHistoryUtils.getIntermediateSummaryFileName((JobId)jobId));
            qualifiedSummaryDoneFile = this.doneDirFS.makeQualified(new Path(this.doneDirPrefixPath, doneSummaryFileName));
            summaryFileOut = this.doneDirFS.create(qualifiedSummaryDoneFile, true);
            summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
            summaryFileOut.close();
            this.doneDirFS.setPermission(qualifiedSummaryDoneFile, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
        }
        catch (IOException e) {
            LOG.info((Object)("Unable to write out JobSummaryInfo to [" + qualifiedSummaryDoneFile + "]"), (Throwable)e);
            throw e;
        }
        try {
            Path qualifiedDoneFile = null;
            if (mi.getHistoryFile() != null) {
                Path historyFile = mi.getHistoryFile();
                Path qualifiedLogFile = this.stagingDirFS.makeQualified(historyFile);
                int jobNameLimit = this.getConfig().getInt("mapreduce.jobhistory.jobname.limit", 50);
                String doneJobHistoryFileName = this.getTempFileName(FileNameIndexUtils.getDoneFileName((JobIndexInfo)mi.getJobIndexInfo(), (int)jobNameLimit));
                qualifiedDoneFile = this.doneDirFS.makeQualified(new Path(this.doneDirPrefixPath, doneJobHistoryFileName));
                this.moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
            }
            Path qualifiedConfDoneFile = null;
            if (mi.getConfFile() != null) {
                Path confFile = mi.getConfFile();
                Path qualifiedConfFile = this.stagingDirFS.makeQualified(confFile);
                String doneConfFileName = this.getTempFileName(JobHistoryUtils.getIntermediateConfFileName((JobId)jobId));
                qualifiedConfDoneFile = this.doneDirFS.makeQualified(new Path(this.doneDirPrefixPath, doneConfFileName));
                this.moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
            }
            this.moveTmpToDone(qualifiedSummaryDoneFile);
            this.moveTmpToDone(qualifiedConfDoneFile);
            this.moveTmpToDone(qualifiedDoneFile);
        }
        catch (IOException e) {
            LOG.error((Object)("Error closing writer for JobID: " + jobId));
            throw e;
        }
    }

    private void moveTmpToDone(Path tmpPath) throws IOException {
        if (tmpPath != null) {
            String tmpFileName = tmpPath.getName();
            String fileName = this.getFileNameFromTmpFN(tmpFileName);
            Path path = new Path(tmpPath.getParent(), fileName);
            this.doneDirFS.rename(tmpPath, path);
            LOG.info((Object)("Moved tmp to done: " + tmpPath + " to " + path));
        }
    }

    private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
        if (this.stagingDirFS.exists(fromPath)) {
            boolean copied;
            LOG.info((Object)("Copying " + fromPath.toString() + " to " + toPath.toString()));
            if (this.doneDirFS.exists(toPath)) {
                this.doneDirFS.delete(toPath, true);
            }
            if (copied = FileUtil.copy((FileSystem)this.stagingDirFS, (Path)fromPath, (FileSystem)this.doneDirFS, (Path)toPath, (boolean)false, (Configuration)this.getConfig())) {
                LOG.info((Object)("Copied to done location: " + toPath));
            } else {
                LOG.info((Object)"copy failed");
            }
            this.doneDirFS.setPermission(toPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
        }
    }

    boolean pathExists(FileSystem fileSys, Path path) throws IOException {
        return fileSys.exists(path);
    }

    private String getTempFileName(String srcFile) {
        return srcFile + "_tmp";
    }

    private String getFileNameFromTmpFN(String tmpFileName) {
        return tmpFileName.substring(0, tmpFileName.length() - 4);
    }

    public void setForcejobCompletion(boolean forceJobCompletion) {
        this.forceJobCompletion = forceJobCompletion;
        LOG.info((Object)("JobHistoryEventHandler notified that forceJobCompletion is " + forceJobCompletion));
    }

    private String createJobStateForJobUnsuccessfulCompletionEvent(String forcedJobStateOnShutDown) {
        if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown.isEmpty()) {
            return JobState.KILLED.toString();
        }
        if (forcedJobStateOnShutDown.equals(JobStateInternal.ERROR.toString()) || forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
            return JobState.FAILED.toString();
        }
        if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED.toString())) {
            return JobState.SUCCEEDED.toString();
        }
        return JobState.KILLED.toString();
    }

    @VisibleForTesting
    boolean getFlushTimerStatus() {
        return this.isTimerActive;
    }

    protected class MetaInfo {
        private Path historyFile;
        private Path confFile;
        private EventWriter writer;
        JobIndexInfo jobIndexInfo;
        JobSummary jobSummary;
        Timer flushTimer;
        FlushTimerTask flushTimerTask;
        private boolean isTimerShutDown = false;
        private String forcedJobStateOnShutDown;

        MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, String jobName, JobId jobId, String forcedJobStateOnShutDown, String queueName) {
            this.historyFile = historyFile;
            this.confFile = conf;
            this.writer = writer;
            this.jobIndexInfo = new JobIndexInfo(-1L, -1L, user, jobName, jobId, -1, -1, null, queueName);
            this.jobSummary = new JobSummary();
            this.flushTimer = new Timer("FlushTimer", true);
            this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
        }

        Path getHistoryFile() {
            return this.historyFile;
        }

        Path getConfFile() {
            return this.confFile;
        }

        JobIndexInfo getJobIndexInfo() {
            return this.jobIndexInfo;
        }

        JobSummary getJobSummary() {
            return this.jobSummary;
        }

        boolean isWriterActive() {
            return this.writer != null;
        }

        boolean isTimerShutDown() {
            return this.isTimerShutDown;
        }

        String getForcedJobStateOnShutDown() {
            return this.forcedJobStateOnShutDown;
        }

        public String toString() {
            return "Job MetaInfo for " + this.jobSummary.getJobId() + " history file " + this.historyFile;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void closeWriter() throws IOException {
            LOG.debug((Object)"Closing Writer");
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                if (this.writer != null) {
                    this.writer.close();
                }
                this.writer = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void writeEvent(HistoryEvent event) throws IOException {
            LOG.debug((Object)"Writing event");
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                if (this.writer != null) {
                    this.writer.write(event);
                    this.processEventForFlush(event);
                    this.maybeFlush(event);
                }
            }
        }

        void processEventForFlush(HistoryEvent historyEvent) throws IOException {
            if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED, new EventType[]{EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED, EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED, EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED, EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED, EventType.JOB_KILLED}).contains(historyEvent.getEventType())) {
                JobHistoryEventHandler.this.numUnflushedCompletionEvents++;
                if (!JobHistoryEventHandler.this.isTimerActive) {
                    this.resetFlushTimer();
                    if (!this.isTimerShutDown) {
                        this.flushTimerTask = new FlushTimerTask(this);
                        this.flushTimer.schedule((TimerTask)this.flushTimerTask, JobHistoryEventHandler.this.flushTimeout);
                        JobHistoryEventHandler.this.isTimerActive = true;
                    }
                }
            }
        }

        void resetFlushTimer() throws IOException {
            if (this.flushTimerTask != null) {
                IOException exception = this.flushTimerTask.getException();
                this.flushTimerTask.stop();
                if (exception != null) {
                    throw exception;
                }
                this.flushTimerTask = null;
            }
            JobHistoryEventHandler.this.isTimerActive = false;
        }

        void maybeFlush(HistoryEvent historyEvent) throws IOException {
            if (JobHistoryEventHandler.this.eventQueue.size() < JobHistoryEventHandler.this.minQueueSizeForBatchingFlushes && JobHistoryEventHandler.this.numUnflushedCompletionEvents > 0 || JobHistoryEventHandler.this.numUnflushedCompletionEvents >= JobHistoryEventHandler.this.maxUnflushedCompletionEvents || JobHistoryEventHandler.this.isJobCompletionEvent(historyEvent)) {
                this.flush();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void flush() throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Flushing " + this.toString()));
            }
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                if (JobHistoryEventHandler.this.numUnflushedCompletionEvents != 0) {
                    this.writer.flush();
                    JobHistoryEventHandler.this.numUnflushedCompletionEvents = 0;
                    this.resetFlushTimer();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void shutDownTimer() throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Shutting down timer " + this.toString()));
            }
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                this.isTimerShutDown = true;
                this.flushTimer.cancel();
                if (this.flushTimerTask != null && this.flushTimerTask.getException() != null) {
                    throw this.flushTimerTask.getException();
                }
            }
        }
    }

    private class FlushTimerTask
    extends TimerTask {
        private MetaInfo metaInfo;
        private IOException ioe = null;
        private volatile boolean shouldRun = true;

        FlushTimerTask(MetaInfo metaInfo) {
            this.metaInfo = metaInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.debug((Object)"In flush timer task");
            Object object = JobHistoryEventHandler.this.lock;
            synchronized (object) {
                try {
                    if (!this.metaInfo.isTimerShutDown() && this.shouldRun) {
                        this.metaInfo.flush();
                    }
                }
                catch (IOException e) {
                    this.ioe = e;
                }
            }
        }

        public IOException getException() {
            return this.ioe;
        }

        public void stop() {
            this.shouldRun = false;
            this.cancel();
        }
    }
}

