/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultCompletedCheckpointStoreUtils {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultCompletedCheckpointStoreUtils.class);

    private DefaultCompletedCheckpointStoreUtils() {
    }

    public static int getMaximumNumberOfRetainedCheckpoints(Configuration config, Logger logger) {
        int maxNumberOfCheckpointsToRetain = config.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
        if (maxNumberOfCheckpointsToRetain <= 0) {
            logger.warn("The setting for '{} : {}' is invalid. Using default value of {}", new Object[]{CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), maxNumberOfCheckpointsToRetain, CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()});
            return CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
        }
        return maxNumberOfCheckpointsToRetain;
    }

    public static <R extends ResourceVersion<R>> Collection<CompletedCheckpoint> retrieveCompletedCheckpoints(StateHandleStore<CompletedCheckpoint, R> checkpointStateHandleStore, CheckpointStoreUtil completedCheckpointStoreUtil) throws Exception {
        LOG.info("Recovering checkpoints from {}.", checkpointStateHandleStore);
        List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints = checkpointStateHandleStore.getAllAndLock();
        initialCheckpoints.sort(Comparator.comparing(o -> (String)o.f1));
        int numberOfInitialCheckpoints = initialCheckpoints.size();
        LOG.info("Found {} checkpoints in {}.", (Object)numberOfInitialCheckpoints, checkpointStateHandleStore);
        ArrayList<CompletedCheckpoint> retrievedCheckpoints = new ArrayList<CompletedCheckpoint>(numberOfInitialCheckpoints);
        LOG.info("Trying to fetch {} checkpoints from storage.", (Object)numberOfInitialCheckpoints);
        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
            retrievedCheckpoints.add(Preconditions.checkNotNull(DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoint(completedCheckpointStoreUtil, checkpointStateHandle)));
        }
        return Collections.unmodifiableList(retrievedCheckpoints);
    }

    private static CompletedCheckpoint retrieveCompletedCheckpoint(CheckpointStoreUtil completedCheckpointStoreUtil, Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandle) throws FlinkException {
        long checkpointId = completedCheckpointStoreUtil.nameToCheckpointID((String)stateHandle.f1);
        LOG.info("Trying to retrieve checkpoint {}.", (Object)checkpointId);
        try {
            return (CompletedCheckpoint)((RetrievableStateHandle)stateHandle.f0).retrieveState();
        }
        catch (ClassNotFoundException exception) {
            throw new FlinkException(String.format("Could not retrieve checkpoint %d from state handle under %s. This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", checkpointId, stateHandle.f1), exception);
        }
        catch (IOException exception) {
            throw new FlinkException(String.format("Could not retrieve checkpoint %d from state handle under %s. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", checkpointId, stateHandle.f1), exception);
        }
    }
}

