/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.BiFunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubtaskCheckpointCoordinatorImpl
implements SubtaskCheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
    private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
    private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30000;
    private final boolean enableCheckpointAfterTasksFinished;
    private final CachingCheckpointStorageWorkerView checkpointStorage;
    private final String taskName;
    private final ExecutorService asyncOperationsThreadPool;
    private final Environment env;
    private final AsyncExceptionHandler asyncExceptionHandler;
    private final ChannelStateWriter channelStateWriter;
    private final StreamTaskActionExecutor actionExecutor;
    private final BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot;
    private final Set<Long> abortedCheckpointIds;
    private long lastCheckpointId;
    private final Object lock;
    @GuardedBy(value="lock")
    private final Map<Long, AsyncCheckpointRunnable> checkpoints;
    @GuardedBy(value="lock")
    private boolean closed;
    private final BarrierAlignmentUtil.DelayableTimer registerTimer;
    private final Clock clock;
    private BarrierAlignmentUtil.Cancellable alignmentTimer;
    private long alignmentCheckpointId;

    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, boolean unalignedCheckpointEnabled, boolean enableCheckpointAfterTasksFinished, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot, BarrierAlignmentUtil.DelayableTimer registerTimer) throws IOException {
        this(checkpointStorage, taskName, actionExecutor, asyncOperationsThreadPool, env, asyncExceptionHandler, unalignedCheckpointEnabled, enableCheckpointAfterTasksFinished, prepareInputSnapshot, 128, registerTimer);
    }

    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, boolean unalignedCheckpointEnabled, boolean enableCheckpointAfterTasksFinished, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot, int maxRecordAbortedCheckpoints, BarrierAlignmentUtil.DelayableTimer registerTimer) throws IOException {
        this(checkpointStorage, taskName, actionExecutor, asyncOperationsThreadPool, env, asyncExceptionHandler, prepareInputSnapshot, maxRecordAbortedCheckpoints, unalignedCheckpointEnabled ? SubtaskCheckpointCoordinatorImpl.openChannelStateWriter(taskName, checkpointStorage, env) : ChannelStateWriter.NO_OP, enableCheckpointAfterTasksFinished, registerTimer);
    }

    @VisibleForTesting
    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot, int maxRecordAbortedCheckpoints, ChannelStateWriter channelStateWriter, boolean enableCheckpointAfterTasksFinished, BarrierAlignmentUtil.DelayableTimer registerTimer) throws IOException {
        this.checkpointStorage = new CachingCheckpointStorageWorkerView(Preconditions.checkNotNull(checkpointStorage));
        this.taskName = Preconditions.checkNotNull(taskName);
        this.checkpoints = new HashMap<Long, AsyncCheckpointRunnable>();
        this.lock = new Object();
        this.asyncOperationsThreadPool = Preconditions.checkNotNull(asyncOperationsThreadPool);
        this.env = Preconditions.checkNotNull(env);
        this.asyncExceptionHandler = Preconditions.checkNotNull(asyncExceptionHandler);
        this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
        this.channelStateWriter = Preconditions.checkNotNull(channelStateWriter);
        this.prepareInputSnapshot = prepareInputSnapshot;
        this.abortedCheckpointIds = this.createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
        this.lastCheckpointId = -1L;
        this.closed = false;
        this.enableCheckpointAfterTasksFinished = enableCheckpointAfterTasksFinished;
        this.registerTimer = registerTimer;
        this.clock = SystemClock.getInstance();
    }

    private static ChannelStateWriter openChannelStateWriter(String taskName, CheckpointStorageWorkerView checkpointStorage, Environment env) {
        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(taskName, env.getTaskInfo().getIndexOfThisSubtask(), checkpointStorage);
        writer.open();
        return writer;
    }

    @Override
    public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause, OperatorChain<?, ?> operatorChain) throws IOException {
        long next;
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", (Object)checkpointId, (Object)this.taskName);
        this.lastCheckpointId = Math.max(this.lastCheckpointId, checkpointId);
        Iterator<Long> iterator = this.abortedCheckpointIds.iterator();
        while (iterator.hasNext() && (next = iterator.next().longValue()) < this.lastCheckpointId) {
            iterator.remove();
        }
        this.checkpointStorage.clearCacheFor(checkpointId);
        this.channelStateWriter.abort(checkpointId, cause, true);
        this.env.declineCheckpoint(checkpointId, cause);
        this.actionExecutor.runThrowing(() -> {
            if (checkpointId == this.alignmentCheckpointId) {
                this.cancelAlignmentTimer();
            }
            operatorChain.abortCheckpoint(checkpointId, cause);
            operatorChain.broadcastEvent(new CancelCheckpointMarker(checkpointId));
        });
    }

    private void cancelAlignmentTimer() {
        if (this.alignmentTimer == null) {
            return;
        }
        this.alignmentTimer.cancel();
        this.alignmentTimer = null;
    }

    @Override
    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.checkpointStorage;
    }

    @Override
    public ChannelStateWriter getChannelStateWriter() {
        return this.channelStateWriter;
    }

    @Override
    public void checkpointState(CheckpointMetaData metadata, CheckpointOptions options, CheckpointMetricsBuilder metrics, OperatorChain<?, ?> operatorChain, boolean isTaskFinished, Supplier<Boolean> isRunning) throws Exception {
        Preconditions.checkNotNull(options);
        Preconditions.checkNotNull(metrics);
        if (this.lastCheckpointId >= metadata.getCheckpointId()) {
            LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", (Object)this.lastCheckpointId, (Object)metadata.getCheckpointId());
            this.channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
            this.checkAndClearAbortedStatus(metadata.getCheckpointId());
            return;
        }
        SubtaskCheckpointCoordinatorImpl.logCheckpointProcessingDelay(metadata);
        this.lastCheckpointId = metadata.getCheckpointId();
        if (this.checkAndClearAbortedStatus(metadata.getCheckpointId())) {
            operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
            LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", (Object)metadata.getCheckpointId());
            return;
        }
        if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
            options = options.withUnalignedSupported();
            this.initInputsCheckpoint(metadata.getCheckpointId(), options);
        }
        operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
        LOG.debug("Task {} broadcastEvent at {}, triggerTime {}, passed time {}", new Object[]{this.taskName, System.currentTimeMillis(), metadata.getTimestamp(), System.currentTimeMillis() - metadata.getTimestamp()});
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
        operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());
        this.registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);
        if (options.needsChannelState()) {
            this.channelStateWriter.finishOutput(metadata.getCheckpointId());
        }
        HashMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<OperatorID, OperatorSnapshotFutures>(operatorChain.getNumberOfOperators());
        try {
            if (this.takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
                this.finishAndReportAsync(snapshotFutures, metadata, metrics, operatorChain.isTaskDeployedAsFinished(), isTaskFinished, isRunning);
            } else {
                this.cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
            }
        }
        catch (Exception ex) {
            this.cleanup(snapshotFutures, metadata, metrics, ex);
            throw ex;
        }
    }

    private void registerAlignmentTimer(long checkpointId, OperatorChain<?, ?> operatorChain, CheckpointBarrier checkpointBarrier) {
        this.cancelAlignmentTimer();
        if (!checkpointBarrier.getCheckpointOptions().isTimeoutable()) {
            return;
        }
        long timerDelay = BarrierAlignmentUtil.getTimerDelay(this.clock, checkpointBarrier);
        this.alignmentTimer = this.registerTimer.registerTask(() -> {
            try {
                operatorChain.alignedBarrierTimeout(checkpointId);
            }
            catch (Exception e) {
                ExceptionUtils.rethrowIOException(e);
            }
            this.alignmentTimer = null;
            return null;
        }, Duration.ofMillis(timerDelay));
        this.alignmentCheckpointId = checkpointId;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        this.notifyCheckpoint(checkpointId, operatorChain, isRunning, Task.NotifyCheckpointOperation.COMPLETE);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        this.notifyCheckpoint(checkpointId, operatorChain, isRunning, Task.NotifyCheckpointOperation.ABORT);
    }

    @Override
    public void notifyCheckpointSubsumed(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        this.notifyCheckpoint(checkpointId, operatorChain, isRunning, Task.NotifyCheckpointOperation.SUBSUME);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyCheckpoint(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning, Task.NotifyCheckpointOperation notifyCheckpointOperation) throws Exception {
        Exception previousException;
        block24: {
            previousException = null;
            try {
                if (!isRunning.get().booleanValue()) {
                    LOG.debug("Ignoring notification of checkpoint {} {} for not-running task {}", new Object[]{notifyCheckpointOperation, checkpointId, this.taskName});
                    break block24;
                }
                LOG.debug("Notification of checkpoint {} {} for task {}", new Object[]{notifyCheckpointOperation, checkpointId, this.taskName});
                if (notifyCheckpointOperation.equals((Object)Task.NotifyCheckpointOperation.ABORT)) {
                    boolean canceled = this.cancelAsyncCheckpointRunnable(checkpointId);
                    if (!canceled && checkpointId > this.lastCheckpointId) {
                        this.abortedCheckpointIds.add(checkpointId);
                    }
                    this.channelStateWriter.abort(checkpointId, new CancellationException("checkpoint aborted via notification"), false);
                }
                try {
                    switch (notifyCheckpointOperation) {
                        case ABORT: {
                            operatorChain.notifyCheckpointAborted(checkpointId);
                            break;
                        }
                        case COMPLETE: {
                            operatorChain.notifyCheckpointComplete(checkpointId);
                            break;
                        }
                        case SUBSUME: {
                            operatorChain.notifyCheckpointSubsumed(checkpointId);
                        }
                    }
                }
                catch (Exception e) {
                    previousException = ExceptionUtils.firstOrSuppressed(e, previousException);
                }
            }
            catch (Throwable throwable) {
                try {
                    switch (notifyCheckpointOperation) {
                        case ABORT: {
                            this.env.getTaskStateManager().notifyCheckpointAborted(checkpointId);
                            break;
                        }
                        case COMPLETE: {
                            this.env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
                        }
                    }
                }
                catch (Exception e) {
                    previousException = ExceptionUtils.firstOrSuppressed(e, previousException);
                }
                throw throwable;
            }
        }
        try {
            switch (notifyCheckpointOperation) {
                case ABORT: {
                    this.env.getTaskStateManager().notifyCheckpointAborted(checkpointId);
                    break;
                }
                case COMPLETE: {
                    this.env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
                }
            }
        }
        catch (Exception e) {
            previousException = ExceptionUtils.firstOrSuppressed(e, previousException);
        }
        ExceptionUtils.tryRethrowException(previousException);
    }

    @Override
    public void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions) throws CheckpointException {
        if (checkpointOptions.isUnalignedCheckpoint()) {
            this.channelStateWriter.start(id, checkpointOptions);
            this.prepareInflightDataSnapshot(id);
        } else if (checkpointOptions.isTimeoutable()) {
            this.channelStateWriter.start(id, checkpointOptions);
            this.channelStateWriter.finishInput(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void waitForPendingCheckpoints() throws Exception {
        ArrayList<AsyncCheckpointRunnable> asyncCheckpointRunnables;
        if (!this.enableCheckpointAfterTasksFinished) {
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            asyncCheckpointRunnables = new ArrayList<AsyncCheckpointRunnable>(this.checkpoints.values());
        }
        asyncCheckpointRunnables.forEach(ar -> {
            try {
                ar.getFinishedFuture().get();
            }
            catch (Exception e) {
                LOG.debug("Async runnable for checkpoint " + ar.getCheckpointId() + " throws exception and exit", (Throwable)e);
            }
        });
    }

    @Override
    public void close() throws IOException {
        this.cancelAlignmentTimer();
        this.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() throws IOException {
        ArrayList<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
        Object object = this.lock;
        synchronized (object) {
            if (!this.closed) {
                this.closed = true;
                asyncCheckpointRunnables = new ArrayList<AsyncCheckpointRunnable>(this.checkpoints.values());
                this.checkpoints.clear();
            }
        }
        IOUtils.closeAllQuietly(asyncCheckpointRunnables);
        this.channelStateWriter.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getAsyncCheckpointRunnableSize() {
        Object object = this.lock;
        synchronized (object) {
            return this.checkpoints.size();
        }
    }

    @VisibleForTesting
    int getAbortedCheckpointSize() {
        return this.abortedCheckpointIds.size();
    }

    private boolean checkAndClearAbortedStatus(long checkpointId) {
        return this.abortedCheckpointIds.remove(checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerAsyncCheckpointRunnable(long checkpointId, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                LOG.debug("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
                IOUtils.closeQuietly(asyncCheckpointRunnable);
                Preconditions.checkState(!this.checkpoints.containsKey(checkpointId), "SubtaskCheckpointCoordinator was closed without releasing asyncCheckpointRunnable for checkpoint %s", checkpointId);
            } else {
                if (this.checkpoints.containsKey(checkpointId)) {
                    IOUtils.closeQuietly(asyncCheckpointRunnable);
                    throw new IOException(String.format("Cannot register Closeable, async checkpoint %d runnable has been register. Closing argument.", checkpointId));
                }
                this.checkpoints.put(checkpointId, asyncCheckpointRunnable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean unregisterAsyncCheckpointRunnable(long checkpointId) {
        Object object = this.lock;
        synchronized (object) {
            return this.checkpoints.remove(checkpointId) != null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean cancelAsyncCheckpointRunnable(long checkpointId) {
        AsyncCheckpointRunnable asyncCheckpointRunnable;
        Object object = this.lock;
        synchronized (object) {
            asyncCheckpointRunnable = this.checkpoints.remove(checkpointId);
        }
        IOUtils.closeQuietly(asyncCheckpointRunnable);
        return asyncCheckpointRunnable != null;
    }

    private void cleanup(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData metadata, CheckpointMetricsBuilder metrics, Exception ex) {
        this.channelStateWriter.abort(metadata.getCheckpointId(), ex, true);
        for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
            if (operatorSnapshotResult == null) continue;
            try {
                operatorSnapshotResult.cancel();
            }
            catch (Exception e) {
                LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.taskName, metadata.getCheckpointId(), metrics.getAlignmentDurationNanosOrDefault() / 1000000L, metrics.getSyncDurationMillis()});
        }
    }

    private void prepareInflightDataSnapshot(long checkpointId) throws CheckpointException {
        this.prepareInputSnapshot.apply(this.channelStateWriter, checkpointId).whenComplete((unused, ex) -> {
            if (ex != null) {
                this.channelStateWriter.abort(checkpointId, (Throwable)ex, false);
            } else {
                this.channelStateWriter.finishInput(checkpointId);
            }
        });
    }

    private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetricsBuilder metrics, boolean isTaskDeployedAsFinished, boolean isTaskFinished, Supplier<Boolean> isRunning) throws IOException {
        AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(snapshotFutures, metadata, metrics, System.nanoTime(), this.taskName, this.unregisterConsumer(), this.env, this.asyncExceptionHandler, isTaskDeployedAsFinished, isTaskFinished, isRunning);
        this.registerAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId(), asyncCheckpointRunnable);
        this.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
    }

    private Consumer<AsyncCheckpointRunnable> unregisterConsumer() {
        return asyncCheckpointRunnable -> this.unregisterAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean takeSnapshotSync(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetrics, CheckpointOptions checkpointOptions, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        Preconditions.checkState(!operatorChain.isClosed(), "OperatorChain and Task should never be closed at this point");
        long checkpointId = checkpointMetaData.getCheckpointId();
        long started = System.nanoTime();
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = checkpointOptions.needsChannelState() ? this.channelStateWriter.getAndRemoveWriteResult(checkpointId) : ChannelStateWriter.ChannelStateWriteResult.EMPTY;
        CheckpointStreamFactory storage = this.checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
        try {
            operatorChain.snapshotState(operatorSnapshotsInProgress, checkpointMetaData, checkpointOptions, isRunning, channelStateWriteResult, storage);
        }
        finally {
            this.checkpointStorage.clearCacheFor(checkpointId);
        }
        LOG.debug("{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms, is unaligned checkpoint : {}", new Object[]{this.taskName, checkpointId, checkpointMetrics.getAlignmentDurationNanosOrDefault() / 1000000L, checkpointMetrics.getSyncDurationMillis(), checkpointOptions.isUnalignedCheckpoint()});
        checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1000000L);
        checkpointMetrics.setUnalignedCheckpoint(checkpointOptions.isUnalignedCheckpoint());
        return true;
    }

    private Set<Long> createAbortedCheckpointSetWithLimitSize(final int maxRecordAbortedCheckpoints) {
        return Collections.newSetFromMap(new LinkedHashMap<Long, Boolean>(){
            private static final long serialVersionUID = 1L;

            @Override
            protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
                return this.size() > maxRecordAbortedCheckpoints;
            }
        });
    }

    private static void logCheckpointProcessingDelay(CheckpointMetaData checkpointMetaData) {
        long delay = System.currentTimeMillis() - checkpointMetaData.getReceiveTimestamp();
        if (delay >= 30000L) {
            LOG.warn("Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: {}ms", (Object)delay);
        }
    }

    private static class CachingCheckpointStorageWorkerView
    implements CheckpointStorageWorkerView {
        private final Map<Long, CheckpointStreamFactory> cache = new ConcurrentHashMap<Long, CheckpointStreamFactory>();
        private final CheckpointStorageWorkerView delegate;

        private CachingCheckpointStorageWorkerView(CheckpointStorageWorkerView delegate) {
            this.delegate = delegate;
        }

        void clearCacheFor(long checkpointId) {
            this.cache.remove(checkpointId);
        }

        @Override
        public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
            return this.cache.computeIfAbsent(checkpointId, id -> {
                try {
                    return this.delegate.resolveCheckpointStorageLocation(checkpointId, reference);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException(e);
                }
            });
        }

        @Override
        public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
            return this.delegate.createTaskOwnedStateStream();
        }

        @Override
        public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
            return this.delegate.createTaskOwnedCheckpointStateToolset();
        }
    }
}

