/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, SessionId>
extends MessageAcknowledgingSourceBase<Type, UId> {
    private static final long serialVersionUID = 42L;
    private static final Logger LOG = LoggerFactory.getLogger(MultipleIdsMessageAcknowledgingSourceBase.class);
    protected transient Deque<Tuple2<Long, List<SessionId>>> sessionIdsPerSnapshot;
    protected transient List<SessionId> sessionIds;

    protected MultipleIdsMessageAcknowledgingSourceBase(Class<UId> idClass) {
        super(idClass);
    }

    protected MultipleIdsMessageAcknowledgingSourceBase(TypeInformation<UId> idTypeInfo) {
        super(idTypeInfo);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.sessionIds = new ArrayList<SessionId>(64);
        this.sessionIdsPerSnapshot = new ArrayDeque<Tuple2<Long, List<SessionId>>>();
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.sessionIds.clear();
        this.sessionIdsPerSnapshot.clear();
    }

    @Override
    protected final void acknowledgeIDs(long checkpointId, Set<UId> uniqueIds) {
        LOG.debug("Acknowledging ids for checkpoint {}", (Object)checkpointId);
        Iterator<Tuple2<Long, List<SessionId>>> iterator = this.sessionIdsPerSnapshot.iterator();
        while (iterator.hasNext()) {
            Tuple2<Long, List<SessionId>> next = iterator.next();
            long id = (Long)next.f0;
            if (id > checkpointId) continue;
            this.acknowledgeSessionIDs((List)next.f1);
            iterator.remove();
        }
    }

    protected abstract void acknowledgeSessionIDs(List<SessionId> var1);

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.sessionIdsPerSnapshot.add(new Tuple2<Long, List<SessionId>>(context.getCheckpointId(), this.sessionIds));
        this.sessionIds = new ArrayList<SessionId>(64);
        super.snapshotState(context);
    }
}

