/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.RecordCounter;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

public class MiniBatchGlobalGroupAggFunction
extends MapBundleFunction<RowData, RowData, RowData, RowData> {
    private static final long serialVersionUID = 8349579876002001744L;
    private final GeneratedAggsHandleFunction genLocalAggsHandler;
    private final GeneratedAggsHandleFunction genGlobalAggsHandler;
    private final GeneratedRecordEqualiser genRecordEqualiser;
    private final LogicalType[] accTypes;
    private final RecordCounter recordCounter;
    private final boolean generateUpdateBefore;
    private final long stateRetentionTime;
    private transient JoinedRowData resultRow = new JoinedRowData();
    private transient AggsHandleFunction localAgg = null;
    private transient AggsHandleFunction globalAgg = null;
    private transient RecordEqualiser equaliser = null;
    private transient ValueState<RowData> accState = null;

    public MiniBatchGlobalGroupAggFunction(GeneratedAggsHandleFunction genLocalAggsHandler, GeneratedAggsHandleFunction genGlobalAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, LogicalType[] accTypes, int indexOfCountStar, boolean generateUpdateBefore, long stateRetentionTime) {
        this.genLocalAggsHandler = genLocalAggsHandler;
        this.genGlobalAggsHandler = genGlobalAggsHandler;
        this.genRecordEqualiser = genRecordEqualiser;
        this.accTypes = accTypes;
        this.recordCounter = RecordCounter.of(indexOfCountStar);
        this.generateUpdateBefore = generateUpdateBefore;
        this.stateRetentionTime = stateRetentionTime;
    }

    @Override
    public void open(ExecutionContext ctx) throws Exception {
        super.open(ctx);
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(this.stateRetentionTime);
        this.localAgg = (AggsHandleFunction)this.genLocalAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.localAgg.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext()));
        this.globalAgg = (AggsHandleFunction)this.genGlobalAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.globalAgg.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext(), ttlConfig));
        this.equaliser = (RecordEqualiser)this.genRecordEqualiser.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(this.accTypes);
        ValueStateDescriptor<RowData> accDesc = new ValueStateDescriptor<RowData>("accState", accTypeInfo);
        if (ttlConfig.isEnabled()) {
            accDesc.enableTimeToLive(ttlConfig);
        }
        this.accState = ctx.getRuntimeContext().getState(accDesc);
        this.resultRow = new JoinedRowData();
    }

    @Override
    public RowData addInput(@Nullable RowData previousAcc, RowData input) throws Exception {
        RowData currentAcc = previousAcc == null ? this.localAgg.createAccumulators() : previousAcc;
        this.localAgg.setAccumulators(currentAcc);
        this.localAgg.merge(input);
        return this.localAgg.getAccumulators();
    }

    @Override
    public void finishBundle(Map<RowData, RowData> buffer, Collector<RowData> out) throws Exception {
        for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
            RowData currentKey = entry.getKey();
            RowData bufferAcc = entry.getValue();
            boolean firstRow = false;
            this.ctx.setCurrentKey(currentKey);
            RowData stateAcc = this.accState.value();
            if (stateAcc == null) {
                stateAcc = this.globalAgg.createAccumulators();
                firstRow = true;
            }
            this.globalAgg.setAccumulators(stateAcc);
            RowData prevAggValue = this.globalAgg.getValue();
            this.globalAgg.merge(bufferAcc);
            RowData newAggValue = this.globalAgg.getValue();
            stateAcc = this.globalAgg.getAccumulators();
            if (!this.recordCounter.recordCountIsZero(stateAcc)) {
                this.accState.update(stateAcc);
                if (!firstRow) {
                    if (this.equaliser.equals(prevAggValue, newAggValue)) continue;
                    if (this.generateUpdateBefore) {
                        this.resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.UPDATE_BEFORE);
                        out.collect(this.resultRow);
                    }
                    this.resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
                    out.collect(this.resultRow);
                    continue;
                }
                this.resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
                out.collect(this.resultRow);
                continue;
            }
            if (!firstRow) {
                this.resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
                out.collect(this.resultRow);
            }
            this.accState.clear();
            this.globalAgg.cleanup();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.localAgg != null) {
            this.localAgg.close();
        }
        if (this.globalAgg != null) {
            this.globalAgg.close();
        }
    }
}

