/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred.lib;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class MultithreadedMapRunner<K1 extends WritableComparable, V1 extends Writable, K2 extends WritableComparable, V2 extends Writable>
implements MapRunnable<K1, V1, K2, V2> {
    private static final Log LOG = LogFactory.getLog((String)MultithreadedMapRunner.class.getName());
    private JobConf job;
    private Mapper<K1, V1, K2, V2> mapper;
    private ExecutorService executorService;
    private volatile IOException ioException;
    private volatile RuntimeException runtimeException;

    @Override
    public void configure(JobConf jobConf) {
        int numberOfThreads = jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Configuring jobConf " + jobConf.getJobName() + " to use " + numberOfThreads + " threads"));
        }
        this.job = jobConf;
        this.mapper = (Mapper)ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
        this.executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue(numberOfThreads));
    }

    private void checkForExceptionsFromProcessingThreads() throws IOException, RuntimeException {
        if (this.ioException != null) {
            throw this.ioException;
        }
        if (this.runtimeException != null) {
            throw this.runtimeException;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException {
        try {
            WritableComparable key = (WritableComparable)input.createKey();
            Writable value = (Writable)input.createValue();
            while (input.next(key, value)) {
                this.executorService.execute(new MapperInvokeRunable(this, key, value, output, reporter));
                this.checkForExceptionsFromProcessingThreads();
                key = (WritableComparable)input.createKey();
                value = (Writable)input.createValue();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Finished dispatching all Mappper.map calls, job " + this.job.getJobName()));
            }
            this.executorService.shutdown();
            try {
                while (!this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Awaiting all running Mappper.map calls to finish, job " + this.job.getJobName()));
                    }
                    this.checkForExceptionsFromProcessingThreads();
                }
                this.checkForExceptionsFromProcessingThreads();
            }
            catch (IOException ioEx) {
                this.executorService.shutdownNow();
                throw ioEx;
            }
            catch (InterruptedException iEx) {
                throw new RuntimeException(iEx);
            }
        }
        finally {
            this.mapper.close();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class MapperInvokeRunable
    implements Runnable {
        private K1 key;
        private V1 value;
        private OutputCollector<K2, V2> output;
        private Reporter reporter;
        final /* synthetic */ MultithreadedMapRunner this$0;

        public MapperInvokeRunable(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) {
            this.this$0 = var1_1;
            this.key = key;
            this.value = value;
            this.output = output;
            this.reporter = reporter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                this.this$0.mapper.map(this.key, this.value, this.output, this.reporter);
            }
            catch (IOException ex) {
                MultithreadedMapRunner multithreadedMapRunner = this.this$0;
                synchronized (multithreadedMapRunner) {
                    if (this.this$0.ioException == null) {
                        this.this$0.ioException = ex;
                    }
                }
            }
            catch (RuntimeException ex) {
                MultithreadedMapRunner multithreadedMapRunner = this.this$0;
                synchronized (multithreadedMapRunner) {
                    if (this.this$0.runtimeException == null) {
                        this.this$0.runtimeException = ex;
                    }
                }
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class BlockingArrayQueue
    extends ArrayBlockingQueue<Runnable> {
        public BlockingArrayQueue(int capacity) {
            super(capacity);
        }

        @Override
        public boolean offer(Runnable r) {
            return this.add(r);
        }

        @Override
        public boolean add(Runnable r) {
            try {
                this.put(r);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            return true;
        }
    }
}

