/*
 * Decompiled with CFR 0.152.
 */
package htsjdk.io;

import htsjdk.io.Writer;
import htsjdk.samtools.util.RuntimeIOException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class AsyncWriterPool
implements Closeable {
    private final ExecutorService executor;
    private final List<PooledWriter<?>> writers = new ArrayList();
    private int timeoutSeconds = 5;
    private boolean poolClosed = false;

    public AsyncWriterPool(int threads) {
        if (threads < 1) {
            throw new IllegalArgumentException("Threads must be >= 1: " + threads);
        }
        this.executor = Executors.newWorkStealingPool(threads);
    }

    public AsyncWriterPool() {
        this(Runtime.getRuntime().availableProcessors());
    }

    @Override
    public void close() throws IOException {
        if (this.poolClosed) {
            return;
        }
        this.poolClosed = true;
        CompletableFuture.allOf((CompletableFuture[])this.writers.stream().map(PooledWriter::nonBlockingClose).toArray(CompletableFuture[]::new)).join();
        this.executor.shutdown();
    }

    public int getTimeoutSeconds() {
        return this.timeoutSeconds;
    }

    public void setTimeoutSeconds(int timeoutSeconds) {
        this.timeoutSeconds = timeoutSeconds;
    }

    public <A> Writer<A> pool(Writer<A> writer, BlockingQueue<A> queue, int writeThreshold) {
        PooledWriter<A> pooledWriter = new PooledWriter<A>(writer, queue, writeThreshold);
        this.writers.add(pooledWriter);
        return pooledWriter;
    }

    private class PooledWriter<A>
    implements Writer<A> {
        private final BlockingQueue<A> queue;
        private final Writer<A> writer;
        private final int writeThreshold;
        private boolean isClosed = false;
        private Future<Void> currentTask;

        private PooledWriter(Writer<A> writer, BlockingQueue<A> queue, int writeThreshold) {
            if (writeThreshold <= 0) {
                throw new IllegalArgumentException("writeThreshold must be >= 1: " + writeThreshold);
            }
            if (writeThreshold > queue.remainingCapacity()) {
                throw new IllegalArgumentException("writeThreshold (" + writeThreshold + ") can't be larger then queue capacity (" + queue.remainingCapacity() + ").");
            }
            this.writer = writer;
            this.queue = queue;
            this.writeThreshold = writeThreshold;
        }

        private void nonBlockingCheckAndRethrow() {
            if (this.currentTask != null && this.currentTask.isDone()) {
                this.checkAndRethrow();
            }
        }

        private void blockingCheckAndRethrow() {
            if (this.currentTask != null) {
                this.checkAndRethrow();
            }
        }

        private void checkAndRethrow() {
            try {
                this.currentTask.get();
            }
            catch (InterruptedException | CancellationException | ExecutionException e) {
                this.isClosed = true;
                Throwable t2 = e instanceof ExecutionException ? e.getCause() : e;
                throw new RuntimeException("Exception while writing records asynchronously", t2);
            }
            finally {
                this.currentTask = null;
            }
        }

        @Override
        public void write(A item) {
            if (this.isClosed) {
                throw new RuntimeIOException("Attempt to add record to closed writer.");
            }
            this.nonBlockingCheckAndRethrow();
            try {
                while (!this.isClosed && !this.queue.offer(item, AsyncWriterPool.this.getTimeoutSeconds(), TimeUnit.SECONDS)) {
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Exception while placing item in queue", e);
            }
            if (this.currentTask == null && this.queue.size() >= this.writeThreshold) {
                this.drain();
            }
        }

        private void drain() {
            if (this.currentTask != null) {
                throw new IllegalStateException("drain() called while currentTask is not null");
            }
            this.currentTask = AsyncWriterPool.this.executor.submit(() -> {
                Object item = this.queue.poll();
                while (item != null) {
                    this.writer.write(item);
                    item = this.queue.poll();
                }
                return null;
            });
        }

        private CompletableFuture<Void> nonBlockingClose() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    this.close();
                    return null;
                }
                catch (Exception e) {
                    throw new RuntimeException("Caught exception while closing PooledWriter.", e);
                }
            }, AsyncWriterPool.this.executor);
        }

        @Override
        public void close() throws IOException {
            if (this.isClosed) {
                return;
            }
            this.isClosed = true;
            this.blockingCheckAndRethrow();
            this.drain();
            this.blockingCheckAndRethrow();
            this.writer.close();
        }
    }
}

