/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdActions;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultJobLeaderIdService
implements JobLeaderIdService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobLeaderIdService.class);
    private final HighAvailabilityServices highAvailabilityServices;
    private final ScheduledExecutor scheduledExecutor;
    private final Time jobTimeout;
    private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;
    private JobLeaderIdActions jobLeaderIdActions;

    public DefaultJobLeaderIdService(HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor, Time jobTimeout) {
        this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor");
        this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");
        this.jobLeaderIdListeners = new HashMap<JobID, JobLeaderIdListener>(4);
        this.jobLeaderIdActions = null;
    }

    @Override
    public void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception {
        if (this.isStarted()) {
            this.clear();
        }
        this.jobLeaderIdActions = Preconditions.checkNotNull(initialJobLeaderIdActions);
    }

    @Override
    public void stop() throws Exception {
        this.clear();
        this.jobLeaderIdActions = null;
    }

    public boolean isStarted() {
        return this.jobLeaderIdActions != null;
    }

    @Override
    public void clear() throws Exception {
        Exception exception = null;
        for (JobLeaderIdListener listener : this.jobLeaderIdListeners.values()) {
            try {
                listener.stop();
            }
            catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }
        }
        if (exception != null) {
            ExceptionUtils.rethrowException(exception, "Could not properly stop the " + DefaultJobLeaderIdService.class.getSimpleName() + '.');
        }
        this.jobLeaderIdListeners.clear();
    }

    @Override
    public void addJob(JobID jobId) throws Exception {
        Preconditions.checkNotNull(this.jobLeaderIdActions);
        LOG.debug("Add job {} to job leader id monitoring.", (Object)jobId);
        if (!this.jobLeaderIdListeners.containsKey(jobId)) {
            LeaderRetrievalService leaderRetrievalService = this.highAvailabilityServices.getJobManagerLeaderRetriever(jobId);
            JobLeaderIdListener jobIdListener = new JobLeaderIdListener(jobId, this.jobLeaderIdActions, leaderRetrievalService);
            this.jobLeaderIdListeners.put(jobId, jobIdListener);
        }
    }

    @Override
    public void removeJob(JobID jobId) throws Exception {
        LOG.debug("Remove job {} from job leader id monitoring.", (Object)jobId);
        JobLeaderIdListener listener = this.jobLeaderIdListeners.remove(jobId);
        if (listener != null) {
            listener.stop();
        }
    }

    @Override
    public boolean containsJob(JobID jobId) {
        return this.jobLeaderIdListeners.containsKey(jobId);
    }

    @Override
    public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception {
        if (!this.jobLeaderIdListeners.containsKey(jobId)) {
            this.addJob(jobId);
        }
        JobLeaderIdListener listener = this.jobLeaderIdListeners.get(jobId);
        return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
    }

    @Override
    public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
        JobLeaderIdListener jobLeaderIdListener = this.jobLeaderIdListeners.get(jobId);
        if (null != jobLeaderIdListener) {
            return Objects.equals(timeoutId, jobLeaderIdListener.getTimeoutId());
        }
        return false;
    }

    private final class JobLeaderIdListener
    implements LeaderRetrievalListener {
        private final Object timeoutLock = new Object();
        private final JobID jobId;
        private final JobLeaderIdActions listenerJobLeaderIdActions;
        private final LeaderRetrievalService leaderRetrievalService;
        private volatile CompletableFuture<UUID> leaderIdFuture;
        private volatile boolean running = true;
        @Nullable
        private volatile ScheduledFuture<?> timeoutFuture;
        @Nullable
        private volatile UUID timeoutId;

        private JobLeaderIdListener(JobID jobId, JobLeaderIdActions listenerJobLeaderIdActions, LeaderRetrievalService leaderRetrievalService) throws Exception {
            this.jobId = Preconditions.checkNotNull(jobId);
            this.listenerJobLeaderIdActions = Preconditions.checkNotNull(listenerJobLeaderIdActions);
            this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
            this.leaderIdFuture = new CompletableFuture();
            this.activateTimeout();
            leaderRetrievalService.start(this);
        }

        public CompletableFuture<UUID> getLeaderIdFuture() {
            return this.leaderIdFuture;
        }

        @Nullable
        public UUID getTimeoutId() {
            return this.timeoutId;
        }

        public void stop() throws Exception {
            this.running = false;
            this.leaderRetrievalService.stop();
            this.cancelTimeout();
            this.leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped."));
        }

        @Override
        public void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionId) {
            if (this.running) {
                UUID previousJobLeaderId = null;
                if (this.leaderIdFuture.isDone()) {
                    try {
                        previousJobLeaderId = this.leaderIdFuture.getNow(null);
                    }
                    catch (CompletionException e) {
                        this.handleError(e);
                    }
                    if (leaderSessionId == null) {
                        LOG.debug("Job {} no longer has a job leader.", (Object)this.jobId);
                        this.leaderIdFuture = new CompletableFuture();
                    } else {
                        LOG.debug("Job {} has a new job leader {}@{}.", new Object[]{this.jobId, leaderSessionId, leaderAddress});
                        this.leaderIdFuture = CompletableFuture.completedFuture(leaderSessionId);
                    }
                } else if (leaderSessionId != null) {
                    LOG.debug("Job {} has a new job leader {}@{}.", new Object[]{this.jobId, leaderSessionId, leaderAddress});
                    this.leaderIdFuture.complete(leaderSessionId);
                }
                if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId)) {
                    this.listenerJobLeaderIdActions.jobLeaderLostLeadership(this.jobId, new JobMasterId(previousJobLeaderId));
                    if (null == leaderSessionId) {
                        this.activateTimeout();
                        if (!this.running) {
                            this.cancelTimeout();
                        }
                    }
                } else if (null != leaderSessionId) {
                    this.cancelTimeout();
                }
            } else {
                LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.", (Object)leaderSessionId, (Object)leaderAddress);
            }
        }

        @Override
        public void handleError(Exception exception) {
            if (this.running) {
                this.listenerJobLeaderIdActions.handleError(exception);
            } else {
                LOG.debug("An error occurred in the {} after the listener has been stopped.", (Object)JobLeaderIdListener.class.getSimpleName(), (Object)exception);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void activateTimeout() {
            Object object = this.timeoutLock;
            synchronized (object) {
                UUID newTimeoutId;
                this.cancelTimeout();
                this.timeoutId = newTimeoutId = UUID.randomUUID();
                this.timeoutFuture = DefaultJobLeaderIdService.this.scheduledExecutor.schedule(new Runnable(){

                    @Override
                    public void run() {
                        JobLeaderIdListener.this.listenerJobLeaderIdActions.notifyJobTimeout(JobLeaderIdListener.this.jobId, newTimeoutId);
                    }
                }, DefaultJobLeaderIdService.this.jobTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cancelTimeout() {
            Object object = this.timeoutLock;
            synchronized (object) {
                if (this.timeoutFuture != null) {
                    this.timeoutFuture.cancel(true);
                }
                this.timeoutFuture = null;
                this.timeoutId = null;
            }
        }
    }
}

