001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.eclipse.aether.spi.connector.transport;
020
021import java.io.FilterInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InterruptedIOException;
025import java.nio.ByteBuffer;
026
027import org.eclipse.aether.transfer.TransferCancelledException;
028
029/**
030 * An {@code InputStream} wrapper that notifies a {@link TransportListener} about progress when data is read.
031 * It throws {@link InterruptedIOException} with a cause of {@link TransferCancelledException} when the transfer is cancelled in the transport listener.
032 * The start notification is sent lazily on the first read.
033 */
034public class TransportListenerNotifyingInputStream extends FilterInputStream {
035
036    private final TransportListener transportListener;
037    private final long size;
038    private boolean isStarted = false;
039
040    public TransportListenerNotifyingInputStream(InputStream in, TransportListener transportListener, long size) {
041        super(in);
042        this.transportListener = transportListener;
043        this.size = size;
044    }
045
046    @Override
047    public int read() throws IOException {
048        int byteRead = super.read();
049        if (byteRead != -1) {
050            if (!isStarted) {
051                notifyStarted();
052            }
053            notifyProgress(new byte[] {(byte) byteRead}, 0, 1);
054        }
055        return byteRead;
056    }
057
058    @Override
059    public int read(byte[] b) throws IOException {
060        int numBytesRead = super.read(b);
061        if (numBytesRead != -1) {
062            if (!isStarted) {
063                notifyStarted();
064            }
065            notifyProgress(b, 0, numBytesRead);
066        }
067        return numBytesRead;
068    }
069
070    @Override
071    public int read(byte[] b, int off, int len) throws IOException {
072        int numBytesRead = super.read(b, off, len);
073        if (numBytesRead != -1) {
074            if (!isStarted) {
075                notifyStarted();
076            }
077            notifyProgress(b, off, numBytesRead);
078        }
079        return numBytesRead;
080    }
081
082    private void notifyProgress(byte[] buffer, int offset, int numBytesRead) throws IOException {
083        try {
084            transportListener.transportProgressed(ByteBuffer.wrap(buffer, offset, numBytesRead));
085        } catch (TransferCancelledException e) {
086            throw (IOException) new InterruptedIOException().initCause(e);
087        }
088    }
089
090    private void notifyStarted() throws IOException {
091        try {
092            transportListener.transportStarted(0, size);
093        } catch (TransferCancelledException e) {
094            throw (IOException) new InterruptedIOException().initCause(e);
095        }
096        isStarted = true;
097    }
098}