/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.OptionalLong;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.InitContextInitializationContextAdapter;
import org.apache.flink.streaming.runtime.operators.sink.SinkV1WriterCommittableSerializer;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterStateHandler;
import org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler;
import org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

class SinkWriterOperator<InputT, CommT>
extends AbstractStreamOperator<CommittableMessage<CommT>>
implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>,
BoundedOneInput {
    private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<byte[]>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    @Nullable
    private final SimpleVersionedSerializer<CommT> committableSerializer;
    private final List<CommT> legacyCommittables = new ArrayList<CommT>();
    private final Context<InputT> context;
    private final boolean emitDownstream;
    private Long currentWatermark = Long.MIN_VALUE;
    private SinkWriter<InputT> sinkWriter;
    private final SinkWriterStateHandler<InputT> writerStateHandler;
    private final MailboxExecutor mailboxExecutor;
    private boolean endOfInput = false;

    SinkWriterOperator(Sink<InputT> sink, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor) {
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
        this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor);
        this.context = new Context();
        this.emitDownstream = sink instanceof TwoPhaseCommittingSink;
        this.writerStateHandler = sink instanceof StatefulSink ? new StatefulSinkWriterStateHandler((StatefulSink)sink) : new StatelessSinkWriterStateHandler<InputT>(sink);
        this.committableSerializer = sink instanceof TwoPhaseCommittingSink ? ((TwoPhaseCommittingSink)sink).getCommittableSerializer() : null;
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        OptionalLong checkpointId = context.getRestoredCheckpointId();
        Sink.InitContext initContext = this.createInitContext(checkpointId.isPresent() ? Long.valueOf(checkpointId.getAsLong()) : null);
        if (context.isRestored() && this.committableSerializer != null) {
            SimpleVersionedListState<CommT> legacyCommitterState = new SimpleVersionedListState<CommT>(context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new SinkV1WriterCommittableSerializer<CommT>(this.committableSerializer));
            ((Iterable)legacyCommitterState.get()).forEach(this.legacyCommittables::addAll);
        }
        this.sinkWriter = this.writerStateHandler.createWriter(initContext, context);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.writerStateHandler.snapshotState(context.getCheckpointId());
    }

    @Override
    public void processElement(StreamRecord<InputT> element) throws Exception {
        ((Context)this.context).element = element;
        this.sinkWriter.write(element.getValue(), this.context);
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        super.prepareSnapshotPreBarrier(checkpointId);
        if (!this.endOfInput) {
            this.sinkWriter.flush(false);
            this.emitCommittables(checkpointId);
        }
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
        this.sinkWriter.writeWatermark(new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
    }

    @Override
    public void endInput() throws Exception {
        this.endOfInput = true;
        this.sinkWriter.flush(true);
        this.emitCommittables(Long.MAX_VALUE);
    }

    private void emitCommittables(Long checkpointId) throws IOException, InterruptedException {
        if (!this.emitDownstream) {
            if (this.sinkWriter instanceof TwoPhaseCommittingSink.PrecommittingSinkWriter) {
                ((TwoPhaseCommittingSink.PrecommittingSinkWriter)this.sinkWriter).prepareCommit();
            }
            return;
        }
        Collection committables = ((TwoPhaseCommittingSink.PrecommittingSinkWriter)this.sinkWriter).prepareCommit();
        StreamingRuntimeContext runtimeContext = this.getRuntimeContext();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        if (!this.legacyCommittables.isEmpty()) {
            Preconditions.checkState(checkpointId > 1L);
            this.emit(indexOfThisSubtask, numberOfParallelSubtasks, 1L, this.legacyCommittables);
            this.legacyCommittables.clear();
        }
        this.emit(indexOfThisSubtask, numberOfParallelSubtasks, checkpointId, committables);
    }

    @Override
    public void close() throws Exception {
        IOUtils.closeAll(this.sinkWriter, () -> super.close());
    }

    private void emit(int indexOfThisSubtask, int numberOfParallelSubtasks, long checkpointId, Collection<CommT> committables) {
        this.output.collect(new StreamRecord(new CommittableSummary(indexOfThisSubtask, numberOfParallelSubtasks, checkpointId, committables.size(), committables.size(), 0)));
        for (CommT committable : committables) {
            this.output.collect(new StreamRecord<CommittableWithLineage<CommT>>(new CommittableWithLineage<CommT>(committable, checkpointId, indexOfThisSubtask)));
        }
    }

    private Sink.InitContext createInitContext(@Nullable Long restoredCheckpointId) {
        return new InitContextImpl(this.getRuntimeContext(), this.processingTimeService, this.mailboxExecutor, InternalSinkWriterMetricGroup.wrap(this.getMetricGroup()), restoredCheckpointId);
    }

    private static class InitContextImpl
    implements Sink.InitContext {
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final SinkWriterMetricGroup metricGroup;
        @Nullable
        private final Long restoredCheckpointId;
        private final StreamingRuntimeContext runtimeContext;

        public InitContextImpl(StreamingRuntimeContext runtimeContext, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup, @Nullable Long restoredCheckpointId) {
            this.runtimeContext = Preconditions.checkNotNull(runtimeContext);
            this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor);
            this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
            this.metricGroup = Preconditions.checkNotNull(metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
        }

        @Override
        public UserCodeClassLoader getUserCodeClassLoader() {
            return new UserCodeClassLoader(){

                @Override
                public ClassLoader asClassLoader() {
                    return runtimeContext.getUserCodeClassLoader();
                }

                @Override
                public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
                    runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
                }
            };
        }

        @Override
        public int getNumberOfParallelSubtasks() {
            return this.runtimeContext.getNumberOfParallelSubtasks();
        }

        @Override
        public MailboxExecutor getMailboxExecutor() {
            return this.mailboxExecutor;
        }

        @Override
        public org.apache.flink.api.common.operators.ProcessingTimeService getProcessingTimeService() {
            return this.processingTimeService;
        }

        @Override
        public int getSubtaskId() {
            return this.runtimeContext.getIndexOfThisSubtask();
        }

        @Override
        public SinkWriterMetricGroup metricGroup() {
            return this.metricGroup;
        }

        @Override
        public OptionalLong getRestoredCheckpointId() {
            return this.restoredCheckpointId == null ? OptionalLong.empty() : OptionalLong.of(this.restoredCheckpointId);
        }

        @Override
        public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
            return new InitContextInitializationContextAdapter(this.getUserCodeClassLoader(), () -> this.metricGroup.addGroup("user"));
        }
    }

    private class Context<IN>
    implements SinkWriter.Context {
        private StreamRecord<IN> element;

        private Context() {
        }

        @Override
        public long currentWatermark() {
            return SinkWriterOperator.this.currentWatermark;
        }

        @Override
        public Long timestamp() {
            if (this.element.hasTimestamp() && this.element.getTimestamp() != Long.MIN_VALUE) {
                return this.element.getTimestamp();
            }
            return null;
        }
    }
}

