001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.eclipse.aether.spi.connector.transport; 020 021import java.io.FilterInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InterruptedIOException; 025import java.nio.ByteBuffer; 026 027import org.eclipse.aether.transfer.TransferCancelledException; 028 029/** 030 * An {@code InputStream} wrapper that notifies a {@link TransportListener} about progress when data is read. 031 * It throws {@link InterruptedIOException} with a cause of {@link TransferCancelledException} when the transfer is cancelled in the transport listener. 032 * The start notification is sent lazily on the first read. 033 */ 034public class TransportListenerNotifyingInputStream extends FilterInputStream { 035 036 private final TransportListener transportListener; 037 private final long size; 038 private boolean isStarted = false; 039 040 public TransportListenerNotifyingInputStream(InputStream in, TransportListener transportListener, long size) { 041 super(in); 042 this.transportListener = transportListener; 043 this.size = size; 044 } 045 046 @Override 047 public int read() throws IOException { 048 int byteRead = super.read(); 049 if (byteRead != -1) { 050 if (!isStarted) { 051 notifyStarted(); 052 } 053 notifyProgress(new byte[] {(byte) byteRead}, 0, 1); 054 } 055 return byteRead; 056 } 057 058 @Override 059 public int read(byte[] b) throws IOException { 060 int numBytesRead = super.read(b); 061 if (numBytesRead != -1) { 062 if (!isStarted) { 063 notifyStarted(); 064 } 065 notifyProgress(b, 0, numBytesRead); 066 } 067 return numBytesRead; 068 } 069 070 @Override 071 public int read(byte[] b, int off, int len) throws IOException { 072 int numBytesRead = super.read(b, off, len); 073 if (numBytesRead != -1) { 074 if (!isStarted) { 075 notifyStarted(); 076 } 077 notifyProgress(b, off, numBytesRead); 078 } 079 return numBytesRead; 080 } 081 082 private void notifyProgress(byte[] buffer, int offset, int numBytesRead) throws IOException { 083 try { 084 transportListener.transportProgressed(ByteBuffer.wrap(buffer, offset, numBytesRead)); 085 } catch (TransferCancelledException e) { 086 throw (IOException) new InterruptedIOException().initCause(e); 087 } 088 } 089 090 private void notifyStarted() throws IOException { 091 try { 092 transportListener.transportStarted(0, size); 093 } catch (TransferCancelledException e) { 094 throw (IOException) new InterruptedIOException().initCause(e); 095 } 096 isStarted = true; 097 } 098}