001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
021
022import java.io.Closeable;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.util.zip.CRC32;
028import java.util.zip.Deflater;
029import java.util.zip.ZipEntry;
030
031/**
032 * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams.
033 * Currently {@link java.util.zip.ZipEntry#DEFLATED} and {@link java.util.zip.ZipEntry#STORED} are the only
034 * supported compression methods.
035 *
036 * @since 1.10
037 */
038public abstract class StreamCompressor implements Closeable {
039
040    /*
041     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs
042     * when it gets handed a really big buffer.  See
043     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
044     *
045     * Using a buffer size of 8 kB proved to be a good compromise
046     */
047    private static final int DEFLATER_BLOCK_SIZE = 8192;
048
049    private final Deflater def;
050
051    private final CRC32 crc = new CRC32();
052
053    private long writtenToOutputStreamForLastEntry = 0;
054    private long sourcePayloadLength = 0;
055    private long totalWrittenToOutputStream = 0;
056
057    private static final int bufferSize = 4096;
058    private final byte[] outputBuffer = new byte[bufferSize];
059    private final byte[] readerBuf = new byte[bufferSize];
060
061    StreamCompressor(final Deflater deflater) {
062        this.def = deflater;
063    }
064
065    /**
066     * Create a stream compressor with the given compression level.
067     *
068     * @param os       The stream to receive output
069     * @param deflater The deflater to use
070     * @return A stream compressor
071     */
072    static StreamCompressor create(final OutputStream os, final Deflater deflater) {
073        return new OutputStreamCompressor(deflater, os);
074    }
075
076    /**
077     * Create a stream compressor with the default compression level.
078     *
079     * @param os The stream to receive output
080     * @return A stream compressor
081     */
082    static StreamCompressor create(final OutputStream os) {
083        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
084    }
085
086    /**
087     * Create a stream compressor with the given compression level.
088     *
089     * @param os       The DataOutput to receive output
090     * @param deflater The deflater to use for the compressor
091     * @return A stream compressor
092     */
093    static StreamCompressor create(final DataOutput os, final Deflater deflater) {
094        return new DataOutputCompressor(deflater, os);
095    }
096
097    /**
098     * Create a stream compressor with the given compression level.
099     *
100     * @param compressionLevel The {@link Deflater}  compression level
101     * @param bs               The ScatterGatherBackingStore to receive output
102     * @return A stream compressor
103     */
104    public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
105        final Deflater deflater = new Deflater(compressionLevel, true);
106        return new ScatterGatherBackingStoreCompressor(deflater, bs);
107    }
108
109    /**
110     * Create a stream compressor with the default compression level.
111     *
112     * @param bs The ScatterGatherBackingStore to receive output
113     * @return A stream compressor
114     */
115    public static StreamCompressor create(final ScatterGatherBackingStore bs) {
116        return create(Deflater.DEFAULT_COMPRESSION, bs);
117    }
118
119    /**
120     * The crc32 of the last deflated file
121     *
122     * @return the crc32
123     */
124
125    public long getCrc32() {
126        return crc.getValue();
127    }
128
129    /**
130     * Return the number of bytes read from the source stream
131     *
132     * @return The number of bytes read, never negative
133     */
134    public long getBytesRead() {
135        return sourcePayloadLength;
136    }
137
138    /**
139     * The number of bytes written to the output for the last entry
140     *
141     * @return The number of bytes, never negative
142     */
143    public long getBytesWrittenForLastEntry() {
144        return writtenToOutputStreamForLastEntry;
145    }
146
147    /**
148     * The total number of bytes written to the output for all files
149     *
150     * @return The number of bytes, never negative
151     */
152    public long getTotalBytesWritten() {
153        return totalWrittenToOutputStream;
154    }
155
156
157    /**
158     * Deflate the given source using the supplied compression method
159     *
160     * @param source The source to compress
161     * @param method The #ZipArchiveEntry compression method
162     * @throws IOException When failures happen
163     */
164
165    public void deflate(final InputStream source, final int method) throws IOException {
166        reset();
167        int length;
168
169        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
170            write(readerBuf, 0, length, method);
171        }
172        if (method == ZipEntry.DEFLATED) {
173            flushDeflater();
174        }
175    }
176
177    /**
178     * Writes bytes to ZIP entry.
179     *
180     * @param b      the byte array to write
181     * @param offset the start position to write from
182     * @param length the number of bytes to write
183     * @param method the comrpession method to use
184     * @return the number of bytes written to the stream this time
185     * @throws IOException on error
186     */
187    long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
188        final long current = writtenToOutputStreamForLastEntry;
189        crc.update(b, offset, length);
190        if (method == ZipEntry.DEFLATED) {
191            writeDeflated(b, offset, length);
192        } else {
193            writeCounted(b, offset, length);
194        }
195        sourcePayloadLength += length;
196        return writtenToOutputStreamForLastEntry - current;
197    }
198
199
200    void reset() {
201        crc.reset();
202        def.reset();
203        sourcePayloadLength = 0;
204        writtenToOutputStreamForLastEntry = 0;
205    }
206
207    @Override
208    public void close() throws IOException {
209        def.end();
210    }
211
212    void flushDeflater() throws IOException {
213        def.finish();
214        while (!def.finished()) {
215            deflate();
216        }
217    }
218
219    private void writeDeflated(final byte[] b, final int offset, final int length)
220            throws IOException {
221        if (length > 0 && !def.finished()) {
222            if (length <= DEFLATER_BLOCK_SIZE) {
223                def.setInput(b, offset, length);
224                deflateUntilInputIsNeeded();
225            } else {
226                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
227                for (int i = 0; i < fullblocks; i++) {
228                    def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE,
229                            DEFLATER_BLOCK_SIZE);
230                    deflateUntilInputIsNeeded();
231                }
232                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
233                if (done < length) {
234                    def.setInput(b, offset + done, length - done);
235                    deflateUntilInputIsNeeded();
236                }
237            }
238        }
239    }
240
241    private void deflateUntilInputIsNeeded() throws IOException {
242        while (!def.needsInput()) {
243            deflate();
244        }
245    }
246
247    void deflate() throws IOException {
248        final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
249        if (len > 0) {
250            writeCounted(outputBuffer, 0, len);
251        }
252    }
253
254    public void writeCounted(final byte[] data) throws IOException {
255        writeCounted(data, 0, data.length);
256    }
257
258    public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
259        writeOut(data, offset, length);
260        writtenToOutputStreamForLastEntry += length;
261        totalWrittenToOutputStream += length;
262    }
263
264    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
265
266    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
267        private final ScatterGatherBackingStore bs;
268
269        public ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
270            super(deflater);
271            this.bs = bs;
272        }
273
274        @Override
275        protected final void writeOut(final byte[] data, final int offset, final int length)
276                throws IOException {
277            bs.writeOut(data, offset, length);
278        }
279    }
280
281    private static final class OutputStreamCompressor extends StreamCompressor {
282        private final OutputStream os;
283
284        public OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
285            super(deflater);
286            this.os = os;
287        }
288
289        @Override
290        protected final void writeOut(final byte[] data, final int offset, final int length)
291                throws IOException {
292            os.write(data, offset, length);
293        }
294    }
295
296    private static final class DataOutputCompressor extends StreamCompressor {
297        private final DataOutput raf;
298
299        public DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
300            super(deflater);
301            this.raf = raf;
302        }
303
304        @Override
305        protected final void writeOut(final byte[] data, final int offset, final int length)
306                throws IOException {
307            raf.write(data, offset, length);
308        }
309    }
310}