/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;

public class HadoopAccessorService
implements Service {
    private static XLog LOG = XLog.getLog(HadoopAccessorService.class);
    public static final String CONF_PREFIX = "oozie.service.HadoopAccessorService.";
    public static final String JOB_TRACKER_WHITELIST = "oozie.service.HadoopAccessorService.jobTracker.whitelist";
    public static final String NAME_NODE_WHITELIST = "oozie.service.HadoopAccessorService.nameNode.whitelist";
    public static final String HADOOP_CONFS = "oozie.service.HadoopAccessorService.hadoop.configurations";
    public static final String ACTION_CONFS = "oozie.service.HadoopAccessorService.action.configurations";
    public static final String KERBEROS_AUTH_ENABLED = "oozie.service.HadoopAccessorService.kerberos.enabled";
    public static final String KERBEROS_KEYTAB = "oozie.service.HadoopAccessorService.keytab.file";
    public static final String KERBEROS_PRINCIPAL = "oozie.service.HadoopAccessorService.kerberos.principal";
    public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
    protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
    protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
    protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
    protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
    protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
    protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
    private static Configuration cachedConf;
    private static final String DEFAULT_ACTIONNAME = "default";
    private Set<String> jobTrackerWhitelist = new HashSet<String>();
    private Set<String> nameNodeWhitelist = new HashSet<String>();
    private Map<String, Configuration> hadoopConfigs = new HashMap<String, Configuration>();
    private Map<String, File> actionConfigDirs = new HashMap<String, File>();
    private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
    private UserGroupInformationService ugiService;
    public static final String SUPPORTED_FILESYSTEMS = "oozie.service.HadoopAccessorService.supported.filesystems";
    private Set<String> supportedSchemes;
    private boolean allSchemesSupported;
    private static final String[] HADOOP_CONF_FILES;

    @Override
    public void init(Services services) throws ServiceException {
        this.ugiService = services.get(UserGroupInformationService.class);
        this.init(services.getConf());
    }

    public void init(Configuration conf) throws ServiceException {
        String tmp;
        for (String name : ConfigurationService.getStrings(conf, JOB_TRACKER_WHITELIST)) {
            tmp = name.toLowerCase().trim();
            if (tmp.length() == 0) continue;
            this.jobTrackerWhitelist.add(tmp);
        }
        LOG.info("JOB_TRACKER_WHITELIST :" + this.jobTrackerWhitelist.toString() + ", Total entries :" + this.jobTrackerWhitelist.size());
        for (String name : ConfigurationService.getStrings(conf, NAME_NODE_WHITELIST)) {
            tmp = name.toLowerCase().trim();
            if (tmp.length() == 0) continue;
            this.nameNodeWhitelist.add(tmp);
        }
        LOG.info("NAME_NODE_WHITELIST :" + this.nameNodeWhitelist.toString() + ", Total entries :" + this.nameNodeWhitelist.size());
        boolean kerberosAuthOn = ConfigurationService.getBoolean(conf, KERBEROS_AUTH_ENABLED);
        LOG.info("Oozie Kerberos Authentication [{0}]", kerberosAuthOn ? "enabled" : "disabled");
        if (kerberosAuthOn) {
            this.kerberosInit(conf);
        } else {
            Configuration ugiConf = new Configuration();
            ugiConf.set("hadoop.security.authentication", "simple");
            UserGroupInformation.setConfiguration((Configuration)ugiConf);
        }
        if (this.ugiService == null) {
            this.ugiService = new UserGroupInformationService();
        }
        this.loadHadoopConfigs(conf);
        this.preLoadActionConfigs(conf);
        this.supportedSchemes = new HashSet<String>();
        String[] schemesFromConf = ConfigurationService.getStrings(conf, SUPPORTED_FILESYSTEMS);
        if (schemesFromConf != null) {
            for (String scheme : schemesFromConf) {
                if ((scheme = scheme.trim()).equals("*")) {
                    if (schemesFromConf.length > 1) {
                        throw new ServiceException(ErrorCode.E0100, this.getClass().getName(), "oozie.service.HadoopAccessorService.supported.filesystems should contain either only wildcard or explicit list, not both");
                    }
                    this.allSchemesSupported = true;
                }
                this.supportedSchemes.add(scheme);
            }
        }
        this.setConfigForHadoopSecurityUtil(conf);
    }

    private void setConfigForHadoopSecurityUtil(Configuration conf) {
        String nameNode = conf.get("oozie.actions.default.name-node");
        if (nameNode != null && (nameNode = nameNode.trim()).isEmpty()) {
            nameNode = null;
        }
        if (nameNode == null && this.hadoopConfigs.containsKey("*")) {
            nameNode = "*";
        }
        if (nameNode == null) {
            for (String nn : this.hadoopConfigs.keySet()) {
                if ((nn = nn.trim()).isEmpty()) continue;
                nameNode = nn;
                break;
            }
        }
        if (nameNode != null) {
            Configuration hConf = this.getConfiguration(nameNode);
            try {
                Method setConfigurationMethod = SecurityUtil.class.getMethod("setConfiguration", Configuration.class);
                setConfigurationMethod.invoke(null, hConf);
                LOG.debug("Setting Hadoop SecurityUtil Configuration to that of {0}", nameNode);
            }
            catch (NoSuchMethodException e) {
                LOG.debug("Not setting Hadoop SecurityUtil Configuration because this version of Hadoop doesn't support it");
            }
            catch (Exception e) {
                LOG.error("An Exception occurred while trying to call setConfiguration on {0} via Reflection.  It won't be called.", SecurityUtil.class.getName(), e);
            }
        }
    }

    private void kerberosInit(Configuration serviceConf) throws ServiceException {
        try {
            String keytabFile = ConfigurationService.get(serviceConf, KERBEROS_KEYTAB).trim();
            if (keytabFile.length() == 0) {
                throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
            }
            String principal = SecurityUtil.getServerPrincipal((String)serviceConf.get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"), (String)InetAddress.getLocalHost().getCanonicalHostName());
            if (principal.length() == 0) {
                throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
            }
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "kerberos");
            UserGroupInformation.setConfiguration((Configuration)conf);
            UserGroupInformation.loginUserFromKeytab((String)principal, (String)keytabFile);
            LOG.info("Got Kerberos ticket, keytab [{0}], Oozie principal principal [{1}]", keytabFile, principal);
        }
        catch (ServiceException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new ServiceException(ErrorCode.E0100, this.getClass().getName(), ex.getMessage(), ex);
        }
    }

    private Configuration loadHadoopConf(File dir) throws IOException {
        XConfiguration hadoopConf = new XConfiguration();
        for (String file : HADOOP_CONF_FILES) {
            File f = new File(dir, file);
            if (!f.exists()) continue;
            FileInputStream is = new FileInputStream(f);
            XConfiguration conf = new XConfiguration(is);
            ((InputStream)is).close();
            XConfiguration.copy(conf, hadoopConf);
        }
        return hadoopConf;
    }

    private Map<String, File> parseConfigDirs(String[] confDefs, String type) throws ServiceException, IOException {
        HashMap<String, File> map = new HashMap<String, File>();
        File configDir = new File(ConfigurationService.getConfigurationDirectory());
        for (String confDef : confDefs) {
            if (confDef.trim().length() <= 0) continue;
            String[] parts = confDef.split("=");
            if (parts.length == 2) {
                String hostPort = parts[0];
                String confDir = parts[1];
                File dir = new File(confDir);
                if (!dir.isAbsolute()) {
                    dir = new File(configDir, confDir);
                }
                if (dir.exists()) {
                    map.put(hostPort.toLowerCase(), dir);
                    continue;
                }
                throw new ServiceException(ErrorCode.E0100, this.getClass().getName(), "could not find " + type + " configuration directory: " + dir.getAbsolutePath());
            }
            throw new ServiceException(ErrorCode.E0100, this.getClass().getName(), "Incorrect " + type + " configuration definition: " + confDef);
        }
        return map;
    }

    private void loadHadoopConfigs(Configuration serviceConf) throws ServiceException {
        try {
            Map<String, File> map = this.parseConfigDirs(ConfigurationService.getStrings(serviceConf, HADOOP_CONFS), "hadoop");
            for (Map.Entry<String, File> entry : map.entrySet()) {
                this.hadoopConfigs.put(entry.getKey(), this.loadHadoopConf(entry.getValue()));
            }
        }
        catch (ServiceException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new ServiceException(ErrorCode.E0100, this.getClass().getName(), ex.getMessage(), ex);
        }
    }

    private void preLoadActionConfigs(Configuration serviceConf) throws ServiceException {
        try {
            this.actionConfigDirs = this.parseConfigDirs(ConfigurationService.getStrings(serviceConf, ACTION_CONFS), "action");
            for (String hostport : this.actionConfigDirs.keySet()) {
                this.actionConfigs.put(hostport, new ConcurrentHashMap());
            }
        }
        catch (ServiceException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new ServiceException(ErrorCode.E0100, this.getClass().getName(), ex.getMessage(), ex);
        }
    }

    @Override
    public void destroy() {
    }

    @Override
    public Class<? extends Service> getInterface() {
        return HadoopAccessorService.class;
    }

    private UserGroupInformation getUGI(String user) throws IOException {
        return this.ugiService.getProxyUser(user);
    }

    public JobConf createJobConf(String hostPort) {
        JobConf jobConf = new JobConf(this.getCachedConf());
        XConfiguration.copy(this.getConfiguration(hostPort), (Configuration)jobConf);
        jobConf.setBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, true);
        return jobConf;
    }

    public Configuration getCachedConf() {
        if (cachedConf == null) {
            this.loadCachedConf();
        }
        return cachedConf;
    }

    private void loadCachedConf() {
        cachedConf = new Configuration();
        cachedConf.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private XConfiguration loadActionConf(String hostPort, String action) {
        File actionConfFile;
        File actionConfDir;
        File dir = this.actionConfigDirs.get(hostPort);
        XConfiguration actionConf = new XConfiguration();
        if (dir != null && (actionConfDir = new File(dir, action)).exists() && actionConfDir.isDirectory()) {
            LOG.info("Processing configuration files under [{0}] for action [{1}] and hostPort [{2}]", actionConfDir.getAbsolutePath(), action, hostPort);
            File[] xmlFiles = actionConfDir.listFiles(new FilenameFilter(){

                @Override
                public boolean accept(File dir, String name) {
                    return name.endsWith(".xml");
                }
            });
            Arrays.sort(xmlFiles, new Comparator<File>(){

                @Override
                public int compare(File o1, File o2) {
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : xmlFiles) {
                if (!f.isFile() || !f.canRead()) continue;
                LOG.info("Processing configuration file [{0}]", f.getName());
                FileInputStream fis = null;
                try {
                    fis = new FileInputStream(f);
                    XConfiguration conf = new XConfiguration(fis);
                    XConfiguration.copy(conf, actionConf);
                }
                catch (IOException ex) {
                    LOG.warn("Could not read file [{0}] for action [{1}] configuration and hostPort [{2}]", f.getAbsolutePath(), action, hostPort);
                }
                finally {
                    if (fis != null) {
                        try {
                            fis.close();
                        }
                        catch (IOException ioe) {}
                    }
                }
            }
        }
        if ((actionConfFile = new File(dir, action + ".xml")).exists()) {
            try {
                XConfiguration conf = new XConfiguration(new FileInputStream(actionConfFile));
                XConfiguration.copy(conf, actionConf);
            }
            catch (IOException ex) {
                LOG.warn("Could not read file [{0}] for action [{1}] configuration for hostPort [{2}]", actionConfFile.getAbsolutePath(), action, hostPort);
            }
        }
        return actionConf;
    }

    public XConfiguration createActionDefaultConf(String hostPort, String action) {
        XConfiguration actionConf;
        Map<String, XConfiguration> hostPortActionConfigs = this.actionConfigs.get(hostPort = hostPort != null ? hostPort.toLowerCase() : null);
        if (hostPortActionConfigs == null) {
            hostPortActionConfigs = this.actionConfigs.get("*");
            hostPort = "*";
        }
        if ((actionConf = hostPortActionConfigs.get(action)) == null) {
            actionConf = this.loadActionConf(hostPort, DEFAULT_ACTIONNAME);
            XConfiguration.copy(this.loadActionConf(hostPort, action), actionConf);
            hostPortActionConfigs.put(action, actionConf);
        }
        return new XConfiguration(actionConf.toProperties());
    }

    private Configuration getConfiguration(String hostPort) {
        hostPort = hostPort != null ? hostPort.toLowerCase() : null;
        Configuration conf = this.hadoopConfigs.get(hostPort);
        if (conf == null && (conf = this.hadoopConfigs.get("*")) == null) {
            conf = new XConfiguration();
        }
        return conf;
    }

    public JobClient createJobClient(String user, final JobConf conf) throws HadoopAccessorException {
        ParamChecker.notEmpty(user, "user");
        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
            throw new HadoopAccessorException(ErrorCode.E0903, new Object[0]);
        }
        String jobTracker = conf.get(HADOOP_JOB_TRACKER);
        this.validateJobTracker(jobTracker);
        try {
            UserGroupInformation ugi = this.getUGI(user);
            JobClient jobClient = (JobClient)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<JobClient>(){

                @Override
                public JobClient run() throws Exception {
                    return new JobClient(conf);
                }
            });
            Token mrdt = jobClient.getDelegationToken(this.getMRDelegationTokenRenewer(conf));
            conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
            return jobClient;
        }
        catch (InterruptedException ex) {
            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
        }
        catch (IOException ex) {
            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
        }
    }

    public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) throws HadoopAccessorException {
        ParamChecker.notEmpty(user, "user");
        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
            throw new HadoopAccessorException(ErrorCode.E0903, new Object[0]);
        }
        this.checkSupportedFilesystem(uri);
        String nameNode = uri.getAuthority();
        if (nameNode == null && (nameNode = conf.get("fs.default.name")) != null) {
            try {
                nameNode = new URI(nameNode).getAuthority();
            }
            catch (URISyntaxException ex) {
                throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
            }
        }
        this.validateNameNode(nameNode);
        try {
            UserGroupInformation ugi = this.getUGI(user);
            return (FileSystem)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<FileSystem>(){

                @Override
                public FileSystem run() throws Exception {
                    return FileSystem.get((URI)uri, (Configuration)conf);
                }
            });
        }
        catch (InterruptedException ex) {
            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
        }
        catch (IOException ex) {
            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
        }
    }

    protected void validateJobTracker(String jobTrackerUri) throws HadoopAccessorException {
        this.validate(jobTrackerUri, this.jobTrackerWhitelist, ErrorCode.E0900);
    }

    protected void validateNameNode(String nameNodeUri) throws HadoopAccessorException {
        this.validate(nameNodeUri, this.nameNodeWhitelist, ErrorCode.E0901);
    }

    private void validate(String uri, Set<String> whitelist, ErrorCode error) throws HadoopAccessorException {
        if (uri != null) {
            uri = uri.toLowerCase().trim();
            if (whitelist.size() > 0 && !whitelist.contains(uri)) {
                throw new HadoopAccessorException(error, uri, whitelist);
            }
        }
    }

    public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException {
        if (UserGroupInformation.isSecurityEnabled()) {
            return this.getMRTokenRenewerInternal(jobConf);
        }
        return MR_TOKEN_ALIAS;
    }

    Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
        Text renewer;
        String servicePrincipal = jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
        if (servicePrincipal != null) {
            renewer = mrTokenRenewers.get(servicePrincipal);
            if (renewer == null) {
                String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
                if (target == null) {
                    target = jobConf.get(HADOOP_JOB_TRACKER);
                }
                try {
                    String addr = NetUtils.createSocketAddr((String)target).getHostName();
                    renewer = new Text(SecurityUtil.getServerPrincipal((String)servicePrincipal, (String)addr));
                    LOG.info("Delegation Token Renewer details: Principal=" + servicePrincipal + ",Target=" + target + ",Renewer=" + renewer);
                }
                catch (IllegalArgumentException iae) {
                    renewer = new Text(servicePrincipal.split("[/@]")[0]);
                    LOG.info("Delegation Token Renewer for " + servicePrincipal + " is " + renewer);
                }
                mrTokenRenewers.put(servicePrincipal, renewer);
            }
        } else {
            renewer = MR_TOKEN_ALIAS;
        }
        return renewer;
    }

    public void addFileToClassPath(String user, final Path file, final Configuration conf) throws IOException {
        ParamChecker.notEmpty(user, "user");
        try {
            UserGroupInformation ugi = this.getUGI(user);
            ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                @Override
                public Void run() throws Exception {
                    JobUtils.addFileToClassPath(file, conf, null);
                    return null;
                }
            });
        }
        catch (InterruptedException ex) {
            throw new IOException(ex);
        }
    }

    public void checkSupportedFilesystem(URI uri) throws HadoopAccessorException {
        if (this.allSchemesSupported) {
            return;
        }
        String uriScheme = uri.getScheme();
        if (uriScheme != null && !this.supportedSchemes.isEmpty()) {
            LOG.debug("Checking if filesystem " + uriScheme + " is supported");
            if (!this.supportedSchemes.contains(uriScheme)) {
                throw new HadoopAccessorException(ErrorCode.E0904, uriScheme, uri.toString());
            }
        }
    }

    public Set<String> getSupportedSchemes() {
        return this.supportedSchemes;
    }

    static {
        HADOOP_CONF_FILES = new String[]{"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml", "hadoop-site.xml", "ssl-client.xml"};
    }
}

