/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.dfs;

import java.io.File;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.Block;
import org.apache.hadoop.dfs.BlockCrcUpgradeUtils;
import org.apache.hadoop.dfs.DNBlockUpgradeInfo;
import org.apache.hadoop.dfs.DataNode;
import org.apache.hadoop.dfs.DatanodeProtocol;
import org.apache.hadoop.dfs.FSDataset;
import org.apache.hadoop.dfs.NamespaceInfo;
import org.apache.hadoop.dfs.UpgradeCommand;
import org.apache.hadoop.dfs.UpgradeObjectDatanode;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.StringUtils;

class BlockCrcUpgradeObjectDatanode
extends UpgradeObjectDatanode {
    public static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.dfs.BlockCrcUpgradeObjectDatanode");
    DatanodeProtocol namenode;
    int blocksPreviouslyUpgraded;
    int blocksToUpgrade;
    int blocksUpgraded;
    int errors;
    static final int poolSize = 5;
    List<UpgradeExecutor> completedList = new LinkedList<UpgradeExecutor>();
    boolean offlineUpgrade = false;
    boolean upgradeCompleted = false;

    boolean isOfflineUpgradeOn() {
        return this.offlineUpgrade;
    }

    public int getVersion() {
        return -6;
    }

    public synchronized UpgradeCommand startUpgrade() throws IOException {
        if (this.offlineUpgrade) {
            this.doUpgrade();
        }
        return null;
    }

    public String getDescription() {
        return "Block CRC Upgrade at Datanode";
    }

    public short getUpgradeStatus() {
        return this.blocksToUpgrade == this.blocksUpgraded ? (short)100 : (short)Math.floor((double)this.blocksUpgraded * 100.0 / (double)this.blocksToUpgrade);
    }

    public UpgradeCommand completeUpgrade() throws IOException {
        assert (this.getUpgradeStatus() == 100);
        return new BlockCrcUpgradeUtils.DatanodeStatsCommand(this.getUpgradeStatus(), this.getDatanode().dnRegistration, this.blocksPreviouslyUpgraded + this.blocksUpgraded, this.blocksToUpgrade - this.blocksUpgraded, this.errors);
    }

    boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
        int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
        if (nsUpgradeVersion >= this.getVersion()) {
            return false;
        }
        LOG.info((Object)"\n  This Datanode has missed a cluster wide Block CRC Upgrade.\n  Will perform an 'offline' upgrade of the blocks.\n  During this time, Datanode does not heartbeat.");
        DataNode dataNode = this.getDatanode();
        try {
            dataNode.namenode.errorReport(dataNode.dnRegistration, 0, "Performing an offline upgrade. Will be back online once the ugprade completes. Please see datanode logs.");
        }
        catch (IOException ignored) {
            // empty catch block
        }
        this.offlineUpgrade = true;
        return true;
    }

    void doUpgrade() throws IOException {
        this.doUpgradeInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doUpgradeInternal() {
        Block[] blockArr;
        if (this.upgradeCompleted) {
            assert (this.offlineUpgrade) : "Multiple calls to doUpgrade is expected only during offline upgrade";
            return;
        }
        FSDataset dataset = (FSDataset)this.getDatanode().data;
        Configuration conf = new Configuration();
        conf.set("ipc.client.timeout", "60000");
        RetryPolicy timeoutPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 1L, TimeUnit.MILLISECONDS);
        HashMap<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>();
        exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
        RetryPolicy methodPolicy = RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
        HashMap<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
        methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy);
        methodNameToPolicyMap.put("blockCrcUpgradeGetBlockLocations", methodPolicy);
        LOG.info((Object)"Starting Block CRC upgrade.");
        while (true) {
            try {
                this.namenode = (DatanodeProtocol)RetryProxy.create(DatanodeProtocol.class, (Object)RPC.waitForProxy(DatanodeProtocol.class, 12L, this.getDatanode().getNameNodeAddr(), conf), methodNameToPolicyMap);
            }
            catch (IOException e) {
                LOG.warn((Object)("Exception while trying to connect to NameNode at " + this.getDatanode().getNameNodeAddr().toString() + " : " + StringUtils.stringifyException(e)));
                BlockCrcUpgradeUtils.sleep(10, "will retry connecting to NameNode");
                continue;
            }
            break;
        }
        conf = null;
        LinkedList<UpgradeExecutor> blockList = new LinkedList<UpgradeExecutor>();
        for (Block b : blockArr = dataset.getBlockReport()) {
            File blockFile = null;
            try {
                blockFile = dataset.getBlockFile(b);
            }
            catch (IOException e) {
                LOG.warn((Object)("Could not find file location for " + b + ". It might already be deleted. Exception : " + StringUtils.stringifyException(e)));
                ++this.errors;
                continue;
            }
            File metaFile = FSDataset.getMetaFile(blockFile);
            if (metaFile.exists()) {
                ++this.blocksPreviouslyUpgraded;
                continue;
            }
            ++this.blocksToUpgrade;
            blockList.add(new UpgradeExecutor(b));
        }
        blockArr = null;
        ExecutorService pool = Executors.newFixedThreadPool(5);
        LOG.info((Object)("Starting upgrade of " + this.blocksToUpgrade + " blocks out of " + (this.blocksToUpgrade + this.blocksPreviouslyUpgraded)));
        Iterator it = blockList.iterator();
        while (it.hasNext()) {
            pool.submit((Runnable)it.next());
        }
        this.sendStatus();
        int nLeft = blockList.size();
        long now = System.currentTimeMillis();
        long statusReportIntervalMilliSec = 60000L;
        long lastStatusReportTime = now;
        long lastUpdateTime = now;
        long lastWarnTime = now;
        while (nLeft > 0) {
            List<UpgradeExecutor> list = this.completedList;
            synchronized (list) {
                if (this.completedList.size() <= 0) {
                    try {
                        this.completedList.wait(1000L);
                    }
                    catch (InterruptedException ignored) {
                        // empty catch block
                    }
                }
                now = System.currentTimeMillis();
                if (this.completedList.size() > 0) {
                    UpgradeExecutor exe = this.completedList.remove(0);
                    --nLeft;
                    if (exe.throwable != null) {
                        ++this.errors;
                        LOG.error((Object)("Got an exception during upgrade of " + exe.block + ": " + StringUtils.stringifyException(exe.throwable)));
                    }
                    ++this.blocksUpgraded;
                    lastUpdateTime = now;
                } else if (now - lastUpdateTime >= 300000L && now - lastWarnTime >= 300000L) {
                    lastWarnTime = now;
                    LOG.warn((Object)"No block was updated in last 5 minutes! will keep waiting... ");
                }
            }
            if (now - lastStatusReportTime <= statusReportIntervalMilliSec) continue;
            this.sendStatus();
            lastStatusReportTime = System.currentTimeMillis();
        }
        this.upgradeCompleted = true;
        LOG.info((Object)("Completed BlockCrcUpgrade. total of " + (this.blocksPreviouslyUpgraded + this.blocksToUpgrade) + " blocks : " + this.blocksPreviouslyUpgraded + " blocks previously " + "upgraded, " + this.blocksUpgraded + " blocks upgraded this time " + "with " + this.errors + " errors."));
        while (!this.sendStatus()) {
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean sendStatus() {
        BlockCrcUpgradeUtils.DatanodeStatsCommand cmd = null;
        LOG.info((Object)((this.offlineUpgrade ? "Offline " : "") + "Block CRC Upgrade : " + this.getUpgradeStatus() + "% completed."));
        if (this.offlineUpgrade) {
            return true;
        }
        BlockCrcUpgradeObjectDatanode blockCrcUpgradeObjectDatanode = this;
        synchronized (blockCrcUpgradeObjectDatanode) {
            cmd = new BlockCrcUpgradeUtils.DatanodeStatsCommand(this.getUpgradeStatus(), this.getDatanode().dnRegistration, this.blocksPreviouslyUpgraded + this.blocksUpgraded, this.blocksToUpgrade - this.blocksUpgraded, this.errors);
        }
        UpgradeCommand reply = BlockCrcUpgradeUtils.sendCommand(this.namenode, cmd, 0);
        if (reply == null) {
            LOG.warn((Object)"Could not send status to Namenode. Namenode might be over loaded or down.");
        }
        return reply != null;
    }

    class UpgradeExecutor
    implements Runnable {
        Block block;
        Throwable throwable;

        UpgradeExecutor(Block b) {
            this.block = b;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                DNBlockUpgradeInfo blockInfo = new DNBlockUpgradeInfo();
                blockInfo.block = this.block;
                blockInfo.dataNode = BlockCrcUpgradeObjectDatanode.this.getDatanode();
                blockInfo.namenode = BlockCrcUpgradeObjectDatanode.this.namenode;
                blockInfo.offlineUpgrade = BlockCrcUpgradeObjectDatanode.this.offlineUpgrade;
                BlockCrcUpgradeUtils.upgradeBlock(blockInfo);
            }
            catch (Throwable t) {
                this.throwable = t;
            }
            List<UpgradeExecutor> list = BlockCrcUpgradeObjectDatanode.this.completedList;
            synchronized (list) {
                BlockCrcUpgradeObjectDatanode.this.completedList.add(this);
                BlockCrcUpgradeObjectDatanode.this.completedList.notify();
            }
        }
    }
}

