/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window.processors;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
import org.apache.flink.util.Preconditions;

public final class SliceSharedWindowAggProcessor
extends AbstractWindowAggProcessor
implements SliceSharedAssigner.MergeCallback {
    private static final long serialVersionUID = 1L;
    private final SliceSharedAssigner sliceSharedAssigner;
    private final WindowIsEmptySupplier emptySupplier;
    private final SliceMergeTargetHelper mergeTargetHelper;

    public SliceSharedWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler, WindowBuffer.Factory bufferFactory, SliceSharedAssigner sliceAssigner, TypeSerializer<RowData> accSerializer, int indexOfCountStar, ZoneId shiftTimeZone) {
        super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
        this.sliceSharedAssigner = sliceAssigner;
        this.mergeTargetHelper = new SliceMergeTargetHelper();
        this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner);
    }

    @Override
    public void fireWindow(Long windowEnd) throws Exception {
        Optional<Long> nextWindowEndOptional;
        this.sliceSharedAssigner.mergeSlices(windowEnd, this);
        RowData aggResult = this.aggregator.getValue(windowEnd);
        if (!this.isWindowEmpty()) {
            this.collect(aggResult);
        }
        if ((nextWindowEndOptional = this.sliceSharedAssigner.nextTriggerWindow(windowEnd, this.emptySupplier)).isPresent()) {
            long nextWindowEnd = nextWindowEndOptional.get();
            if (this.sliceSharedAssigner.isEventTime()) {
                this.windowTimerService.registerEventTimeWindowTimer(nextWindowEnd);
            } else {
                this.windowTimerService.registerProcessingTimeWindowTimer(nextWindowEnd);
            }
        }
    }

    @Override
    public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
        RowData stateAcc;
        RowData acc = mergeResult == null ? this.aggregator.createAccumulators() : ((stateAcc = this.windowState.value(mergeResult)) == null ? this.aggregator.createAccumulators() : stateAcc);
        this.aggregator.setAccumulators(mergeResult, acc);
        for (Long slice : toBeMerged) {
            RowData sliceAcc = this.windowState.value(slice);
            if (sliceAcc == null) continue;
            this.aggregator.merge(slice, sliceAcc);
        }
        if (mergeResult != null) {
            this.windowState.update(mergeResult, this.aggregator.getAccumulators());
        }
    }

    @Override
    protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
        this.mergeTargetHelper.setMergeTarget(null);
        this.sliceSharedAssigner.mergeSlices(sliceToMerge, this.mergeTargetHelper);
        if (this.mergeTargetHelper.getMergeTarget() != null) {
            return this.mergeTargetHelper.getMergeTarget();
        }
        return sliceToMerge;
    }

    private boolean isWindowEmpty() {
        if (this.emptySupplier.indexOfCountStar < 0) {
            return false;
        }
        return this.emptySupplier.get();
    }

    private static final class SliceMergeTargetHelper
    implements SliceSharedAssigner.MergeCallback,
    Serializable {
        private static final long serialVersionUID = 1L;
        private Long mergeTarget = null;

        private SliceMergeTargetHelper() {
        }

        @Override
        public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
            this.mergeTarget = mergeResult;
        }

        public Long getMergeTarget() {
            return this.mergeTarget;
        }

        public void setMergeTarget(Long mergeTarget) {
            this.mergeTarget = mergeTarget;
        }
    }

    private final class WindowIsEmptySupplier
    implements Supplier<Boolean>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final int indexOfCountStar;

        private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) {
            if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
                Preconditions.checkArgument(indexOfCountStar >= 0, "Hopping window requires a COUNT(*) in the aggregate functions.");
            }
            this.indexOfCountStar = indexOfCountStar;
        }

        @Override
        public Boolean get() {
            if (this.indexOfCountStar < 0) {
                return false;
            }
            try {
                RowData acc = SliceSharedWindowAggProcessor.this.aggregator.getAccumulators();
                return acc == null || acc.getLong(this.indexOfCountStar) == 0L;
            }
            catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }
}

