/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.spark;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.spark.SparkRowData;
import org.apache.flink.table.store.table.SupportsWrite;
import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SerializableCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.types.logical.RowType;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.connector.write.V1Write;
import org.apache.spark.sql.sources.InsertableRelation;

public class SparkWrite
implements V1Write {
    private final SupportsWrite table;
    private final String queryId;
    private final Lock.Factory lockFactory;
    private final Configuration conf;

    public SparkWrite(SupportsWrite table, String queryId, Lock.Factory lockFactory, Configuration conf) {
        this.table = table;
        this.queryId = queryId;
        this.lockFactory = lockFactory;
        this.conf = conf;
    }

    public InsertableRelation toInsertableRelation() {
        return (data, overwrite) -> {
            if (overwrite) {
                throw new UnsupportedOperationException("Overwrite is unsupported.");
            }
            long identifier = 0L;
            List committables = (List)data.toJavaRDD().groupBy((Function)new ComputeBucket(this.table, this.conf)).mapValues((Function)new WriteRecords(this.table, this.queryId, identifier, this.conf)).values().reduce(new ListConcat());
            try (TableCommit tableCommit = this.table.newCommit(this.queryId).withLock(this.lockFactory.create());){
                tableCommit.commit(identifier, committables.stream().map(SerializableCommittable::delegate).collect(Collectors.toList()));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private static class ListConcat<T>
    implements Function2<List<T>, List<T>, List<T>> {
        private ListConcat() {
        }

        public List<T> call(List<T> v1, List<T> v2) {
            ArrayList<T> ret = new ArrayList<T>();
            ret.addAll(v1);
            ret.addAll(v2);
            return ret;
        }
    }

    private static class WriteRecords
    implements Function<Iterable<Row>, List<SerializableCommittable>> {
        private final SupportsWrite table;
        private final RowType type;
        private final String queryId;
        private final long commitIdentifier;
        private final Configuration conf;

        private WriteRecords(SupportsWrite table, String queryId, long commitIdentifier, Configuration conf) {
            this.table = table;
            this.type = table.rowType();
            this.queryId = queryId;
            this.commitIdentifier = commitIdentifier;
            this.conf = conf;
        }

        public List<SerializableCommittable> call(Iterable<Row> iterables) throws Exception {
            try (TableWrite write = this.table.newWrite(this.queryId);){
                for (Row row : iterables) {
                    write.write(new SparkRowData(this.type, row));
                }
                List<FileCommittable> committables = write.prepareCommit(true, this.commitIdentifier);
                List<SerializableCommittable> list = committables.stream().map(SerializableCommittable::wrap).collect(Collectors.toList());
                return list;
            }
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            FileSystems.initialize(this.table.location(), this.conf);
        }
    }

    private static class ComputeBucket
    implements Function<Row, Integer> {
        private final SupportsWrite table;
        private final RowType type;
        private final Configuration conf;
        private transient BucketComputer lazyComputer;

        private ComputeBucket(SupportsWrite table, Configuration conf) {
            this.table = table;
            this.type = table.rowType();
            this.conf = conf;
        }

        private BucketComputer computer() {
            if (this.lazyComputer == null) {
                this.lazyComputer = this.table.bucketComputer();
            }
            return this.lazyComputer;
        }

        public Integer call(Row row) {
            return this.computer().bucket(new SparkRowData(this.type, row));
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            FileSystems.initialize(this.table.location(), this.conf);
        }
    }
}

