/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

interface ChannelStateWriteRequest {
    public static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequest.class);

    public long getCheckpointId();

    public void cancel(Throwable var1) throws Exception;

    public static CheckpointInProgressRequest completeInput(long checkpointId) {
        return new CheckpointInProgressRequest("completeInput", checkpointId, ChannelStateCheckpointWriter::completeInput, false);
    }

    public static CheckpointInProgressRequest completeOutput(long checkpointId) {
        return new CheckpointInProgressRequest("completeOutput", checkpointId, ChannelStateCheckpointWriter::completeOutput, false);
    }

    public static ChannelStateWriteRequest write(long checkpointId, InputChannelInfo info, CloseableIterator<Buffer> iterator) {
        return ChannelStateWriteRequest.buildWriteRequest(checkpointId, "writeInput", iterator, (writer, buffer) -> writer.writeInput(info, (Buffer)buffer));
    }

    public static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, Buffer ... buffers) {
        return ChannelStateWriteRequest.buildWriteRequest(checkpointId, "writeOutput", CloseableIterator.ofElements(Buffer::recycleBuffer, buffers), (writer, buffer) -> writer.writeOutput(info, (Buffer)buffer));
    }

    public static ChannelStateWriteRequest write(long checkpointId, ResultSubpartitionInfo info, CompletableFuture<List<Buffer>> dataFuture) {
        return ChannelStateWriteRequest.buildFutureWriteRequest(checkpointId, "writeOutputFuture", dataFuture, (writer, buffer) -> writer.writeOutput(info, (Buffer)buffer));
    }

    public static ChannelStateWriteRequest buildFutureWriteRequest(long checkpointId, String name, CompletableFuture<List<Buffer>> dataFuture, BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
        return new CheckpointInProgressRequest(name, checkpointId, writer -> {
            List buffers;
            try {
                buffers = (List)dataFuture.get();
            }
            catch (ExecutionException e) {
                writer.fail(e);
                return;
            }
            for (Buffer buffer : buffers) {
                ChannelStateWriteRequest.checkBufferIsBuffer(buffer);
                bufferConsumer.accept((ChannelStateCheckpointWriter)writer, buffer);
            }
        }, throwable -> dataFuture.thenAccept(buffers -> {
            try {
                CloseableIterator.fromList(buffers, Buffer::recycleBuffer).close();
            }
            catch (Exception e) {
                LOG.error("Failed to recycle the output buffer of channel state.", (Throwable)e);
            }
        }), false);
    }

    public static ChannelStateWriteRequest buildWriteRequest(long checkpointId, String name, CloseableIterator<Buffer> iterator, BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
        return new CheckpointInProgressRequest(name, checkpointId, writer -> {
            while (iterator.hasNext()) {
                Buffer buffer = (Buffer)iterator.next();
                ChannelStateWriteRequest.checkBufferIsBuffer(buffer);
                bufferConsumer.accept((ChannelStateCheckpointWriter)writer, buffer);
            }
        }, throwable -> iterator.close(), false);
    }

    public static void checkBufferIsBuffer(Buffer buffer) {
        try {
            Preconditions.checkArgument(buffer.isBuffer());
        }
        catch (Exception e) {
            buffer.recycleBuffer();
            throw e;
        }
    }

    public static ChannelStateWriteRequest start(long checkpointId, ChannelStateWriter.ChannelStateWriteResult targetResult, CheckpointStorageLocationReference locationReference) {
        return new CheckpointStartRequest(checkpointId, targetResult, locationReference);
    }

    public static ChannelStateWriteRequest abort(long checkpointId, Throwable cause) {
        return new CheckpointInProgressRequest("abort", checkpointId, writer -> writer.fail(cause), true);
    }

    public static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] flinkBuffers) {
        return unused -> {
            for (Buffer b : flinkBuffers) {
                b.recycleBuffer();
            }
        };
    }
}

