/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.UserCodeClassLoader;

@Internal
public class DeserializationSchemaAdapter
implements BulkFormat<RowData, FileSourceSplit> {
    private static final int BATCH_SIZE = 100;
    private final DeserializationSchema<RowData> deserializationSchema;

    public DeserializationSchemaAdapter(DeserializationSchema<RowData> deserializationSchema) {
        this.deserializationSchema = deserializationSchema;
    }

    private DeserializationSchema<RowData> createDeserialization() throws IOException {
        try {
            DeserializationSchema<RowData> deserialization = InstantiationUtil.clone(this.deserializationSchema);
            deserialization.open(new DeserializationSchema.InitializationContext(){

                @Override
                public MetricGroup getMetricGroup() {
                    return new UnregisteredMetricsGroup();
                }

                @Override
                public UserCodeClassLoader getUserCodeClassLoader() {
                    return (UserCodeClassLoader)((Object)Thread.currentThread().getContextClassLoader());
                }
            });
            return deserialization;
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public Reader createReader(Configuration config, FileSourceSplit split) throws IOException {
        return new Reader(config, split);
    }

    public Reader restoreReader(Configuration config, FileSourceSplit split) throws IOException {
        Reader reader = new Reader(config, split);
        reader.seek(split.getReaderPosition().get().getRecordsAfterOffset());
        return reader;
    }

    @Override
    public boolean isSplittable() {
        return true;
    }

    @Override
    public TypeInformation<RowData> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    private class LineBytesInputFormat
    extends DelimitedInputFormat<RowData> {
        private static final long serialVersionUID = 1L;
        private static final byte CARRIAGE_RETURN = 13;
        private static final byte NEW_LINE = 10;
        private final DeserializationSchema<RowData> deserializationSchema;
        private transient boolean end;
        private transient RecordCollector collector;

        public LineBytesInputFormat(Path path, Configuration config) throws IOException {
            super(path, config);
            this.deserializationSchema = DeserializationSchemaAdapter.this.createDeserialization();
        }

        @Override
        public void open(FileInputSplit split) throws IOException {
            super.open(split);
            this.end = false;
            this.collector = new RecordCollector();
        }

        @Override
        public boolean reachedEnd() {
            return this.end;
        }

        @Override
        public RowData readRecord(RowData reuse, byte[] bytes, int offset, int numBytes) throws IOException {
            if (this.getDelimiter() != null && this.getDelimiter().length == 1 && this.getDelimiter()[0] == 10 && offset + numBytes >= 1 && bytes[offset + numBytes - 1] == 13) {
                --numBytes;
            }
            byte[] trimBytes = Arrays.copyOfRange(bytes, offset, offset + numBytes);
            this.deserializationSchema.deserialize(trimBytes, this.collector);
            return null;
        }

        @Override
        public RowData nextRecord(RowData reuse) throws IOException {
            while (true) {
                RowData record;
                if ((record = (RowData)this.collector.records.poll()) != null) {
                    return record;
                }
                if (!this.readLine()) break;
                this.readRecord(reuse, this.currBuffer, this.currOffset, this.currLen);
            }
            this.end = true;
            return null;
        }

        private class RecordCollector
        implements Collector<RowData> {
            private final Queue<RowData> records = new ArrayDeque<RowData>();

            private RecordCollector() {
            }

            @Override
            public void collect(RowData record) {
                this.records.add(record);
            }

            @Override
            public void close() {
            }
        }
    }

    private class Reader
    implements BulkFormat.Reader<RowData> {
        private final LineBytesInputFormat inputFormat;
        private long numRead = 0L;

        private Reader(Configuration config, FileSourceSplit split) throws IOException {
            this.inputFormat = new LineBytesInputFormat(split.path(), config);
            this.inputFormat.open(new FileInputSplit(0, split.path(), split.offset(), split.length(), null));
        }

        @Override
        @Nullable
        public BulkFormat.RecordIterator<RowData> readBatch() throws IOException {
            RowData record;
            RowData[] records = new RowData[2048];
            int num = 0;
            long skipCount = this.numRead;
            for (int i = 0; i < 100 && (record = this.inputFormat.nextRecord(null)) != null; ++i) {
                records[num++] = record;
            }
            if (num == 0) {
                return null;
            }
            this.numRead += (long)num;
            ArrayResultIterator<RowData> iterator = new ArrayResultIterator<RowData>();
            iterator.set(records, num, -1L, skipCount);
            return iterator;
        }

        private void seek(long toSkip) throws IOException {
            while (toSkip > 0L) {
                this.inputFormat.nextRecord(null);
                --toSkip;
            }
        }

        @Override
        public void close() throws IOException {
            this.inputFormat.close();
        }
    }
}

