/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window.buffers;

import java.io.EOFException;
import java.time.ZoneId;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.state.WindowState;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.BytesMap;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
import org.apache.flink.util.Collector;

public final class RecordsWindowBuffer
implements WindowBuffer {
    private final RecordsCombiner combineFunction;
    private final WindowBytesMultiMap recordsBuffer;
    private final WindowKey reuseWindowKey;
    private final AbstractRowDataSerializer<RowData> recordSerializer;
    private final ZoneId shiftTimeZone;
    private final boolean requiresCopy;
    private long minSliceEnd = Long.MAX_VALUE;

    public RecordsWindowBuffer(Object operatorOwner, MemoryManager memoryManager, long memorySize, RecordsCombiner combineFunction, PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer, boolean requiresCopy, ZoneId shiftTimeZone) {
        this.combineFunction = combineFunction;
        this.recordsBuffer = new WindowBytesMultiMap(operatorOwner, memoryManager, memorySize, keySer, inputSer.getArity());
        this.recordSerializer = inputSer;
        this.reuseWindowKey = new WindowKeySerializer(keySer).createInstance();
        this.requiresCopy = requiresCopy;
        this.shiftTimeZone = shiftTimeZone;
    }

    @Override
    public void addElement(RowData key, long sliceEnd, RowData element) throws Exception {
        this.minSliceEnd = Math.min(sliceEnd, this.minSliceEnd);
        this.reuseWindowKey.replace(sliceEnd, key);
        BytesMap.LookupInfo lookup = this.recordsBuffer.lookup(this.reuseWindowKey);
        try {
            this.recordsBuffer.append(lookup, this.recordSerializer.toBinaryRow(element));
        }
        catch (EOFException e) {
            this.flush();
            this.addElement(key, sliceEnd, element);
        }
    }

    @Override
    public void advanceProgress(long progress) throws Exception {
        if (TimeWindowUtil.isWindowFired(this.minSliceEnd, progress, this.shiftTimeZone)) {
            this.flush();
        }
    }

    @Override
    public void flush() throws Exception {
        if (this.recordsBuffer.getNumKeys() > 0L) {
            KeyValueIterator entryIterator = this.recordsBuffer.getEntryIterator(this.requiresCopy);
            while (entryIterator.advanceNext()) {
                this.combineFunction.combine((WindowKey)entryIterator.getKey(), entryIterator.getValue());
            }
            this.recordsBuffer.reset();
            this.minSliceEnd = Long.MAX_VALUE;
        }
    }

    @Override
    public void close() throws Exception {
        this.recordsBuffer.free();
        this.combineFunction.close();
    }

    public static final class LocalFactory
    implements WindowBuffer.LocalFactory {
        private static final long serialVersionUID = 1L;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;
        private final RecordsCombiner.LocalFactory localFactory;

        public LocalFactory(PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer, RecordsCombiner.LocalFactory localFactory) {
            this.keySer = keySer;
            this.inputSer = inputSer;
            this.localFactory = localFactory;
        }

        @Override
        public WindowBuffer create(Object operatorOwner, MemoryManager memoryManager, long memorySize, RuntimeContext runtimeContext, Collector<RowData> collector, ZoneId shiftTimeZone) throws Exception {
            RecordsCombiner combiner = this.localFactory.createRecordsCombiner(runtimeContext, collector);
            return new RecordsWindowBuffer(operatorOwner, memoryManager, memorySize, combiner, this.keySer, this.inputSer, false, shiftTimeZone);
        }
    }

    public static final class Factory
    implements WindowBuffer.Factory {
        private static final long serialVersionUID = 1L;
        private final PagedTypeSerializer<RowData> keySer;
        private final AbstractRowDataSerializer<RowData> inputSer;
        private final RecordsCombiner.Factory factory;

        public Factory(PagedTypeSerializer<RowData> keySer, AbstractRowDataSerializer<RowData> inputSer, RecordsCombiner.Factory combinerFactory) {
            this.keySer = keySer;
            this.inputSer = inputSer;
            this.factory = combinerFactory;
        }

        @Override
        public WindowBuffer create(Object operatorOwner, MemoryManager memoryManager, long memorySize, RuntimeContext runtimeContext, WindowTimerService<Long> timerService, KeyedStateBackend<RowData> stateBackend, WindowState<Long> windowState, boolean isEventTime, ZoneId shiftTimeZone) throws Exception {
            RecordsCombiner combiner = this.factory.createRecordsCombiner(runtimeContext, timerService, stateBackend, windowState, isEventTime);
            boolean requiresCopy = !stateBackend.isSafeToReuseKVState();
            return new RecordsWindowBuffer(operatorOwner, memoryManager, memorySize, combiner, this.keySer, this.inputSer, requiresCopy, shiftTimeZone);
        }
    }
}

