/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table.lookup;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
import org.apache.flink.util.Preconditions;

@Internal
public class CachingLookupFunction
extends LookupFunction {
    private static final long serialVersionUID = 1L;
    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";
    @Nullable
    private final LookupFunction delegate;
    private LookupCache cache;
    private transient String cacheIdentifier;
    private transient CacheMetricGroup cacheMetricGroup;
    private transient Counter loadCounter;
    private transient Counter numLoadFailuresCounter;
    private volatile long latestLoadTime = -1L;

    public CachingLookupFunction(LookupCache cache, @Nullable LookupFunction delegate) {
        this.cache = cache;
        this.delegate = delegate;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        this.cacheIdentifier = this.functionIdentifier();
        this.cache = LookupCacheManager.getInstance().registerCacheIfAbsent(this.cacheIdentifier, this.cache);
        this.cacheMetricGroup = new InternalCacheMetricGroup(context.getMetricGroup(), LOOKUP_CACHE_METRIC_GROUP_NAME);
        if (!(this.cache instanceof LookupFullCache)) {
            this.loadCounter = new SimpleCounter();
            this.cacheMetricGroup.loadCounter(this.loadCounter);
            this.numLoadFailuresCounter = new SimpleCounter();
            this.cacheMetricGroup.numLoadFailuresCounter(this.numLoadFailuresCounter);
        }
        this.cache.open(this.cacheMetricGroup);
        if (this.cache instanceof LookupFullCache) {
            ((LookupFullCache)this.cache).open(new Configuration());
        }
        if (this.delegate != null) {
            this.delegate.open(context);
        }
    }

    @Override
    public Collection<RowData> lookup(RowData keyRow) throws IOException {
        Collection<RowData> cachedValues = this.cache.getIfPresent(keyRow);
        if (cachedValues != null) {
            return cachedValues;
        }
        Collection<RowData> lookupValues = this.lookupByDelegate(keyRow);
        if (lookupValues == null || lookupValues.isEmpty()) {
            this.cache.put(keyRow, Collections.emptyList());
        } else {
            this.cache.put(keyRow, lookupValues);
        }
        return lookupValues;
    }

    @Override
    public void close() throws Exception {
        if (this.delegate != null) {
            this.delegate.close();
        }
        if (this.cacheIdentifier != null) {
            LookupCacheManager.getInstance().unregisterCache(this.cacheIdentifier);
        }
    }

    @VisibleForTesting
    public LookupCache getCache() {
        return this.cache;
    }

    private Collection<RowData> lookupByDelegate(RowData keyRow) throws IOException {
        try {
            Preconditions.checkState(this.delegate != null, "User's lookup function can't be null, if there are possible cache misses.");
            long loadStart = System.currentTimeMillis();
            Collection<RowData> lookupValues = this.delegate.lookup(keyRow);
            this.updateLatestLoadTime(System.currentTimeMillis() - loadStart);
            this.loadCounter.inc();
            return lookupValues;
        }
        catch (Exception e) {
            this.numLoadFailuresCounter.inc();
            throw new IOException(String.format("Failed to lookup with key '%s'", keyRow), e);
        }
    }

    private void updateLatestLoadTime(long loadTime) {
        Preconditions.checkNotNull(this.cacheMetricGroup, "Could not register metric '%s' as cache metric group is not initialized", "latestLoadTime");
        if (this.latestLoadTime == -1L) {
            this.cacheMetricGroup.latestLoadTimeGauge(() -> this.latestLoadTime);
        }
        this.latestLoadTime = loadTime;
    }
}

