/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class BufferManager
implements BufferListener,
BufferRecycler {
    private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();
    private final MemorySegmentProvider globalPool;
    private final InputChannel inputChannel;
    @GuardedBy(value="bufferQueue")
    private boolean isWaitingForFloatingBuffers;
    @GuardedBy(value="bufferQueue")
    private int numRequiredBuffers;

    public BufferManager(MemorySegmentProvider globalPool, InputChannel inputChannel, int numRequiredBuffers) {
        this.globalPool = Preconditions.checkNotNull(globalPool);
        this.inputChannel = Preconditions.checkNotNull(inputChannel);
        Preconditions.checkArgument(numRequiredBuffers >= 0);
        this.numRequiredBuffers = numRequiredBuffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    Buffer requestBuffer() {
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            --this.numRequiredBuffers;
            return this.bufferQueue.takeBuffer();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Buffer requestBufferBlocking() throws InterruptedException {
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            Buffer buffer;
            while ((buffer = this.bufferQueue.takeBuffer()) == null) {
                BufferPool bufferPool;
                if (this.inputChannel.isReleased()) {
                    throw new CancelTaskException("Input channel [" + this.inputChannel.channelInfo + "] has already been released.");
                }
                if (!this.isWaitingForFloatingBuffers && (buffer = (bufferPool = this.inputChannel.inputGate.getBufferPool()).requestBuffer()) == null && this.shouldContinueRequest(bufferPool)) continue;
                if (buffer != null) {
                    return buffer;
                }
                this.bufferQueue.wait();
            }
            return buffer;
        }
    }

    private boolean shouldContinueRequest(BufferPool bufferPool) {
        if (bufferPool.addBufferListener(this)) {
            this.isWaitingForFloatingBuffers = true;
            this.numRequiredBuffers = 1;
            return false;
        }
        if (bufferPool.isDestroyed()) {
            throw new CancelTaskException("Local buffer pool has already been released.");
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
        Preconditions.checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative.");
        if (numExclusiveBuffers == 0) {
            return;
        }
        Collection<MemorySegment> segments = this.globalPool.requestUnpooledMemorySegments(numExclusiveBuffers);
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            Preconditions.checkState(this.unsynchronizedGetFloatingBuffersAvailable() == 0, "Bug in buffer allocation logic: floating buffer is allocated before exclusive buffers are initialized.");
            for (MemorySegment segment : segments) {
                this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), this.numRequiredBuffers);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int requestFloatingBuffers(int numRequired) {
        int numRequestedBuffers = 0;
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            if (this.inputChannel.isReleased()) {
                return numRequestedBuffers;
            }
            this.numRequiredBuffers = numRequired;
            numRequestedBuffers = this.tryRequestBuffers();
        }
        return numRequestedBuffers;
    }

    private int tryRequestBuffers() {
        assert (Thread.holdsLock(this.bufferQueue));
        int numRequestedBuffers = 0;
        while (this.bufferQueue.getAvailableBufferSize() < this.numRequiredBuffers && !this.isWaitingForFloatingBuffers) {
            BufferPool bufferPool = this.inputChannel.inputGate.getBufferPool();
            Buffer buffer = bufferPool.requestBuffer();
            if (buffer != null) {
                this.bufferQueue.addFloatingBuffer(buffer);
                ++numRequestedBuffers;
                continue;
            }
            if (!bufferPool.addBufferListener(this)) continue;
            this.isWaitingForFloatingBuffers = true;
            break;
        }
        return numRequestedBuffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recycle(MemorySegment segment) {
        Buffer releasedFloatingBuffer = null;
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            try {
                if (this.inputChannel.isReleased()) {
                    this.globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment));
                    return;
                }
                releasedFloatingBuffer = this.bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), this.numRequiredBuffers);
            }
            catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
            finally {
                this.bufferQueue.notifyAll();
            }
        }
        if (releasedFloatingBuffer != null) {
            releasedFloatingBuffer.recycleBuffer();
        } else {
            try {
                this.inputChannel.notifyBufferAvailable(1);
            }
            catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void releaseFloatingBuffers() {
        Queue<Buffer> buffers;
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            this.numRequiredBuffers = 0;
            buffers = this.bufferQueue.clearFloatingBuffers();
        }
        while (!buffers.isEmpty()) {
            buffers.poll().recycleBuffer();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void releaseAllBuffers(ArrayDeque<Buffer> buffers) throws IOException {
        Buffer buffer;
        ArrayList<MemorySegment> exclusiveRecyclingSegments = new ArrayList<MemorySegment>();
        Exception err = null;
        while ((buffer = buffers.poll()) != null) {
            try {
                if (buffer.getRecycler() == this) {
                    exclusiveRecyclingSegments.add(buffer.getMemorySegment());
                    continue;
                }
                buffer.recycleBuffer();
            }
            catch (Exception e) {
                err = ExceptionUtils.firstOrSuppressed(e, err);
            }
        }
        try {
            AvailableBufferQueue e = this.bufferQueue;
            synchronized (e) {
                this.bufferQueue.releaseAll(exclusiveRecyclingSegments);
                this.bufferQueue.notifyAll();
            }
        }
        catch (Exception e) {
            err = ExceptionUtils.firstOrSuppressed(e, err);
        }
        try {
            if (exclusiveRecyclingSegments.size() > 0) {
                this.globalPool.recycleUnpooledMemorySegments(exclusiveRecyclingSegments);
            }
        }
        catch (Exception e) {
            err = ExceptionUtils.firstOrSuppressed(e, err);
        }
        if (err != null) {
            throw err instanceof IOException ? (IOException)err : new IOException(err);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean notifyBufferAvailable(Buffer buffer) {
        if (this.inputChannel.isReleased()) {
            return false;
        }
        int numBuffers = 0;
        boolean isBufferUsed = false;
        try {
            AvailableBufferQueue availableBufferQueue = this.bufferQueue;
            synchronized (availableBufferQueue) {
                Preconditions.checkState(this.isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers.");
                this.isWaitingForFloatingBuffers = false;
                if (this.inputChannel.isReleased() || this.bufferQueue.getAvailableBufferSize() >= this.numRequiredBuffers) {
                    return false;
                }
                this.bufferQueue.addFloatingBuffer(buffer);
                isBufferUsed = true;
                this.bufferQueue.notifyAll();
            }
            this.inputChannel.notifyBufferAvailable(numBuffers += 1 + this.tryRequestBuffers());
        }
        catch (Throwable t) {
            this.inputChannel.setError(t);
        }
        return isBufferUsed;
    }

    @Override
    public void notifyBufferDestroyed() {
    }

    @VisibleForTesting
    int unsynchronizedGetNumberOfRequiredBuffers() {
        return this.numRequiredBuffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getNumberOfRequiredBuffers() {
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            return this.numRequiredBuffers;
        }
    }

    @VisibleForTesting
    boolean unsynchronizedIsWaitingForFloatingBuffers() {
        return this.isWaitingForFloatingBuffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getNumberOfAvailableBuffers() {
        AvailableBufferQueue availableBufferQueue = this.bufferQueue;
        synchronized (availableBufferQueue) {
            return this.bufferQueue.getAvailableBufferSize();
        }
    }

    int unsynchronizedGetAvailableExclusiveBuffers() {
        return this.bufferQueue.exclusiveBuffers.size();
    }

    int unsynchronizedGetFloatingBuffersAvailable() {
        return this.bufferQueue.floatingBuffers.size();
    }

    static final class AvailableBufferQueue {
        final ArrayDeque<Buffer> floatingBuffers;
        final ArrayDeque<Buffer> exclusiveBuffers = new ArrayDeque();

        AvailableBufferQueue() {
            this.floatingBuffers = new ArrayDeque();
        }

        @Nullable
        Buffer addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
            this.exclusiveBuffers.add(buffer);
            if (this.getAvailableBufferSize() > numRequiredBuffers) {
                return this.floatingBuffers.poll();
            }
            return null;
        }

        void addFloatingBuffer(Buffer buffer) {
            this.floatingBuffers.add(buffer);
        }

        @Nullable
        Buffer takeBuffer() {
            if (this.floatingBuffers.size() > 0) {
                return this.floatingBuffers.poll();
            }
            return this.exclusiveBuffers.poll();
        }

        void releaseAll(List<MemorySegment> exclusiveSegments) {
            Buffer buffer;
            while ((buffer = this.floatingBuffers.poll()) != null) {
                buffer.recycleBuffer();
            }
            while ((buffer = this.exclusiveBuffers.poll()) != null) {
                exclusiveSegments.add(buffer.getMemorySegment());
            }
        }

        Queue<Buffer> clearFloatingBuffers() {
            ArrayDeque<Buffer> buffers = new ArrayDeque<Buffer>(this.floatingBuffers);
            this.floatingBuffers.clear();
            return buffers;
        }

        int getAvailableBufferSize() {
            return this.floatingBuffers.size() + this.exclusiveBuffers.size();
        }
    }
}

