/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.eventtime;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.CombinedWatermarkStatus;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;

@Internal
public class WatermarkOutputMultiplexer {
    private final WatermarkOutput underlyingOutput;
    private final Map<String, CombinedWatermarkStatus.PartialWatermark> watermarkPerOutputId;
    private final CombinedWatermarkStatus combinedWatermarkStatus;

    public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
        this.underlyingOutput = underlyingOutput;
        this.watermarkPerOutputId = new HashMap<String, CombinedWatermarkStatus.PartialWatermark>();
        this.combinedWatermarkStatus = new CombinedWatermarkStatus();
    }

    public void registerNewOutput(String id) {
        CombinedWatermarkStatus.PartialWatermark outputState = new CombinedWatermarkStatus.PartialWatermark();
        CombinedWatermarkStatus.PartialWatermark previouslyRegistered = this.watermarkPerOutputId.putIfAbsent(id, outputState);
        Preconditions.checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
        this.combinedWatermarkStatus.add(outputState);
    }

    public boolean unregisterOutput(String id) {
        CombinedWatermarkStatus.PartialWatermark output = this.watermarkPerOutputId.remove(id);
        if (output != null) {
            this.combinedWatermarkStatus.remove(output);
            return true;
        }
        return false;
    }

    public WatermarkOutput getImmediateOutput(String outputId) {
        CombinedWatermarkStatus.PartialWatermark outputState = this.watermarkPerOutputId.get(outputId);
        Preconditions.checkArgument(outputState != null, "no output registered under id %s", outputId);
        return new ImmediateOutput(outputState);
    }

    public WatermarkOutput getDeferredOutput(String outputId) {
        CombinedWatermarkStatus.PartialWatermark outputState = this.watermarkPerOutputId.get(outputId);
        Preconditions.checkArgument(outputState != null, "no output registered under id %s", outputId);
        return new DeferredOutput(outputState);
    }

    public void onPeriodicEmit() {
        this.updateCombinedWatermark();
    }

    private void updateCombinedWatermark() {
        if (this.combinedWatermarkStatus.updateCombinedWatermark()) {
            this.underlyingOutput.emitWatermark(new Watermark(this.combinedWatermarkStatus.getCombinedWatermark()));
        } else if (this.combinedWatermarkStatus.isIdle()) {
            this.underlyingOutput.markIdle();
        }
    }

    private static class DeferredOutput
    implements WatermarkOutput {
        private final CombinedWatermarkStatus.PartialWatermark state;

        public DeferredOutput(CombinedWatermarkStatus.PartialWatermark state) {
            this.state = state;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            this.state.setWatermark(watermark.getTimestamp());
        }

        @Override
        public void markIdle() {
            this.state.setIdle(true);
        }

        @Override
        public void markActive() {
            this.state.setIdle(false);
        }
    }

    private class ImmediateOutput
    implements WatermarkOutput {
        private final CombinedWatermarkStatus.PartialWatermark state;

        public ImmediateOutput(CombinedWatermarkStatus.PartialWatermark state) {
            this.state = state;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            long timestamp = watermark.getTimestamp();
            boolean wasUpdated = this.state.setWatermark(timestamp);
            if (wasUpdated && timestamp > WatermarkOutputMultiplexer.this.combinedWatermarkStatus.getCombinedWatermark()) {
                WatermarkOutputMultiplexer.this.updateCombinedWatermark();
            }
        }

        @Override
        public void markIdle() {
            this.state.setIdle(true);
            WatermarkOutputMultiplexer.this.updateCombinedWatermark();
        }

        @Override
        public void markActive() {
            this.state.setIdle(false);
            WatermarkOutputMultiplexer.this.updateCombinedWatermark();
        }
    }
}

