/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.compactor.operator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactService;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;

@Internal
public class CompactorOperatorStateHandler
extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
implements OneInputStreamOperator<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>, CommittableMessage<FileSinkCommittable>>,
BoundedOneInput,
CheckpointListener {
    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;
    private final BucketWriter<?, String> bucketWriter;
    private transient CompactService compactService;
    private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> compactingRequests = new LinkedList<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>>();
    private SimpleVersionedListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
    private Iterable<Map<Long, List<CompactorRequest>>> stateRemaining;

    public CompactorOperatorStateHandler(SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, BucketWriter<?, String> bucketWriter) {
        this.committableSerializer = committableSerializer;
        this.bucketWriter = bucketWriter;
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.remainingRequestsState = new SimpleVersionedListState<Map<Long, List<CompactorRequest>>>(context.getOperatorStateStore().getListState(CompactorOperator.REMAINING_REQUESTS_RAW_STATES_DESC), new CompactorOperator.RemainingRequestsSerializer(new CompactorRequestSerializer(this.committableSerializer)));
        this.stateRemaining = this.remainingRequestsState.get();
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.compactService = new CompactService(1, new IdenticalFileCompactor(), this.bucketWriter);
        this.compactService.open();
        if (this.stateRemaining != null) {
            for (Map<Long, List<CompactorRequest>> requests : this.stateRemaining) {
                for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) {
                    for (CompactorRequest request : e.getValue()) {
                        List<FileSinkCommittable> toCompactList = request.getCommittableToCompact();
                        List<FileSinkCommittable> toPassThrough = request.getCommittableToPassthrough();
                        String bucketId = request.getBucketId();
                        for (FileSinkCommittable toCompact : toCompactList) {
                            CompactorRequest compactRequest = new CompactorRequest(bucketId);
                            compactRequest.addToCompact(toCompact);
                            this.compactingRequests.add(new Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>(compactRequest, this.submit(compactRequest)));
                        }
                        CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
                        toPassThrough.forEach(passThroughRequest::addToPassthrough);
                        this.compactingRequests.add(new Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>(passThroughRequest, this.submit(passThroughRequest)));
                    }
                }
            }
        }
        this.stateRemaining = null;
    }

    @Override
    public void processElement(StreamRecord<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>> element) throws Exception {
        Either<CommittableMessage<FileSinkCommittable>, CompactorRequest> record = element.getValue();
        if (record.isLeft()) {
            CommittableMessage<FileSinkCommittable> message = record.left();
            if (message instanceof CommittableWithLineage) {
                if (this.isHiddenCommittable((CommittableWithLineage)message)) {
                    this.handleHiddenCommittable((CommittableWithLineage)message);
                } else {
                    this.output.collect(new StreamRecord<CommittableMessage<FileSinkCommittable>>(message));
                }
            } else if (this.compactingRequests.isEmpty()) {
                this.output.collect(new StreamRecord<CommittableMessage<FileSinkCommittable>>(message));
            } else {
                this.appendCompactingResultsToSummary((CommittableSummary)message);
            }
        } else {
            CompactorRequest request = element.getValue().right();
            this.compactingRequests.add(new Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>(request, this.submit(request)));
        }
    }

    private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommittable> summary) throws ExecutionException, InterruptedException {
        ArrayList results = new ArrayList();
        for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t : this.compactingRequests) {
            ((Iterable)((CompletableFuture)t.f1).get()).forEach(results::add);
        }
        this.compactingRequests.clear();
        this.output.collect(new StreamRecord(new CommittableSummary(summary.getSubtaskId(), summary.getNumberOfSubtasks(), this.getCheckpointId(summary), summary.getNumberOfCommittables() + results.size(), summary.getNumberOfPendingCommittables() + results.size(), summary.getNumberOfFailedCommittables())));
        for (FileSinkCommittable committable : results) {
            this.output.collect(new StreamRecord<CommittableWithLineage<FileSinkCommittable>>(new CommittableWithLineage<FileSinkCommittable>(committable, this.getCheckpointId(summary), summary.getSubtaskId())));
        }
    }

    private boolean isHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message) {
        return message.getCommittable().hasPendingFile() && message.getCommittable().getPendingFile().getPath() != null && message.getCommittable().getPendingFile().getPath().getName().startsWith(".");
    }

    private void handleHiddenCommittable(CommittableWithLineage<FileSinkCommittable> message) throws ExecutionException, InterruptedException {
        FileSinkCommittable committable = message.getCommittable();
        CompactorRequest request = new CompactorRequest(committable.getBucketId());
        request.addToCompact(committable);
        Iterable<FileSinkCommittable> result = this.submit(request).get();
        Long checkpointId = this.getCheckpointId(message);
        boolean pendingFileSent = false;
        for (FileSinkCommittable c : result) {
            if (c.hasPendingFile()) {
                Preconditions.checkState(!pendingFileSent, "A in-progress file should not be converted to multiple pending files");
                pendingFileSent = true;
                this.output.collect(new StreamRecord<CommittableWithLineage<FileSinkCommittable>>(new CommittableWithLineage<FileSinkCommittable>(c, checkpointId, message.getSubtaskId())));
                continue;
            }
            CompactorRequest passThroughRequest = new CompactorRequest(c.getBucketId());
            passThroughRequest.addToPassthrough(c);
            this.compactingRequests.add(new Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>(passThroughRequest, this.submit(passThroughRequest)));
        }
    }

    @Override
    public void endInput() throws Exception {
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.compactService != null) {
            this.compactService.close();
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        if (this.compactingRequests.isEmpty()) {
            return;
        }
        ArrayList<CompactorRequest> remainingRequests = new ArrayList<CompactorRequest>();
        for (Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>> t : this.compactingRequests) {
            if (((CompletableFuture)t.f1).isDone()) {
                Iterable result = (Iterable)((CompletableFuture)t.f1).get();
                if (!result.iterator().hasNext()) continue;
                String bucketId = ((FileSinkCommittable)result.iterator().next()).getBucketId();
                CompactorRequest passThroughRequest = new CompactorRequest(bucketId);
                result.forEach(passThroughRequest::addToPassthrough);
                remainingRequests.add(passThroughRequest);
                continue;
            }
            remainingRequests.add((CompactorRequest)t.f0);
        }
        HashMap<Long, ArrayList<CompactorRequest>> requestsMap = new HashMap<Long, ArrayList<CompactorRequest>>();
        requestsMap.put(-1L, remainingRequests);
        this.remainingRequestsState.update(Collections.singletonList(requestsMap));
    }

    private Long getCheckpointId(CommittableMessage<FileSinkCommittable> message) {
        return message.getCheckpointId().isPresent() ? Long.valueOf(message.getCheckpointId().getAsLong()) : null;
    }

    private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) {
        CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<Iterable<FileSinkCommittable>>();
        this.compactService.submit(request, resultFuture);
        return resultFuture;
    }
}

