/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.io;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.io.SinkUtils;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.FutureCallback;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public abstract class OutputFormatBase<OUT, V>
extends RichOutputFormat<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputFormatBase.class);
    private Semaphore semaphore;
    private Duration maxConcurrentRequestsTimeout = Duration.ofMillis(Long.MAX_VALUE);
    private int maxConcurrentRequests = Integer.MAX_VALUE;
    private transient FutureCallback<V> callback;
    private AtomicReference<Throwable> throwable;

    protected OutputFormatBase(int maxConcurrentRequests, Duration maxConcurrentRequestsTimeout) {
        Preconditions.checkArgument(maxConcurrentRequests > 0, "Max concurrent requests is expected to be positive");
        this.maxConcurrentRequests = maxConcurrentRequests;
        Preconditions.checkNotNull(maxConcurrentRequestsTimeout, "Max concurrent requests timeout cannot be null");
        Preconditions.checkArgument(!maxConcurrentRequestsTimeout.isNegative(), "Max concurrent requests timeout is expected to be positive");
        this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
    }

    @Override
    public final void open(int taskNumber, int numTasks) {
        this.throwable = new AtomicReference();
        this.semaphore = new Semaphore(this.maxConcurrentRequests);
        this.callback = new FutureCallback<V>(){

            @Override
            public void onSuccess(V ignored) {
                OutputFormatBase.this.semaphore.release();
            }

            @Override
            public void onFailure(Throwable t) {
                OutputFormatBase.this.throwable.compareAndSet(null, t);
                LOG.error("Error while writing value.", t);
                OutputFormatBase.this.semaphore.release();
            }
        };
        this.postOpen();
    }

    protected void postOpen() {
    }

    private void flush() throws IOException {
        this.tryAcquire(this.maxConcurrentRequests);
        this.semaphore.release(this.maxConcurrentRequests);
    }

    private void tryAcquire(int permits) throws IOException {
        try {
            SinkUtils.tryAcquire(permits, this.maxConcurrentRequests, this.maxConcurrentRequestsTimeout, this.semaphore);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override
    public final void writeRecord(OUT record) throws IOException {
        CompletionStage<V> completionStage;
        this.checkAsyncErrors();
        this.tryAcquire(1);
        try {
            completionStage = this.send(record);
        }
        catch (Throwable e) {
            this.semaphore.release();
            throw e;
        }
        completionStage.whenComplete((result, throwable) -> {
            if (throwable == null) {
                this.callback.onSuccess(result);
            } else {
                this.callback.onFailure((Throwable)throwable);
            }
        });
    }

    protected abstract CompletionStage<V> send(OUT var1);

    private void checkAsyncErrors() throws IOException {
        Throwable currentError = this.throwable.getAndSet(null);
        if (currentError != null) {
            throw new IOException("Write record failed", currentError);
        }
    }

    @Override
    public final void close() throws IOException {
        this.checkAsyncErrors();
        this.flush();
        this.checkAsyncErrors();
        this.postClose();
    }

    protected void postClose() {
    }

    @VisibleForTesting
    int getAvailablePermits() {
        return this.semaphore.availablePermits();
    }

    @VisibleForTesting
    int getAcquiredPermits() {
        return this.maxConcurrentRequests - this.semaphore.availablePermits();
    }
}

