using System; using System.IO; using System.IO.Compression; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace Brotli { public class BrotliStream : Stream { const int BufferSize = 64 * 1024; protected Stream _stream = null; protected MemoryStream _intermediateStream = new MemoryStream(); protected CompressionMode _mode = CompressionMode.Compress; protected IntPtr _state = IntPtr.Zero; protected IntPtr _ptrInputBuffer = IntPtr.Zero; protected IntPtr _ptrOutputBuffer = IntPtr.Zero; protected IntPtr _ptrNextInput = IntPtr.Zero; protected IntPtr _ptrNextOutput = IntPtr.Zero; protected UInt32 _availableIn = 0; protected UInt32 _availableOut = BufferSize; protected Byte[] _managedBuffer; protected Boolean _endOfStream = false; protected int _readOffset = 0; protected BrotliDecoderResult _lastDecodeResult = BrotliDecoderResult.NeedsMoreInput; protected Boolean _leaveOpen = false; public BrotliStream(Stream baseStream, CompressionMode mode,bool leaveOpen) { if (baseStream == null) throw new ArgumentNullException("baseStream"); _mode = mode; _stream = baseStream; _leaveOpen = leaveOpen; if (_mode == CompressionMode.Compress) { _state = Brolib.BrotliEncoderCreateInstance(); if (_state == IntPtr.Zero) { throw new BrotliException("Unable to create brotli encoder instance"); } Brolib.BrotliEncoderSetParameter(_state, BrotliEncoderParameter.Quality, 5); Brolib.BrotliEncoderSetParameter(_state, BrotliEncoderParameter.LGWin, 22); } else { _state = Brolib.BrotliDecoderCreateInstance(); if (_state == IntPtr.Zero) { throw new BrotliException("Unable to create brotli decoder instance"); } //follow the brotli default standard var succ = Brolib.BrotliDecoderSetParameter(_state, BrotliDecoderParameter.LargeWindow, 1U); if (!succ) { throw new BrotliException("failed to set decoder parameter to large window"); } } _ptrInputBuffer = Marshal.AllocHGlobal(BufferSize); _ptrOutputBuffer = Marshal.AllocHGlobal(BufferSize); _ptrNextInput = _ptrInputBuffer; _ptrNextOutput = _ptrOutputBuffer; _managedBuffer = new Byte[BufferSize]; } public BrotliStream(Stream baseStream, CompressionMode mode):this(baseStream,mode,false) { } /// /// Set the compress quality(0~11) /// /// compress quality public void SetQuality(uint quality) { if (quality < 0 || quality > 11) { throw new ArgumentException("quality", "the range of quality is 0~11"); } Brolib.BrotliEncoderSetParameter(_state, BrotliEncoderParameter.Quality, quality); } /// /// Set the compress LGWin(10~24) /// /// the window size public void SetWindow(uint window) { if (window < 10 || window > 24) { throw new ArgumentException("window", "the range of window is 10~24"); } Brolib.BrotliEncoderSetParameter(_state, BrotliEncoderParameter.LGWin, window); } public override bool CanRead { get { if (_stream == null) { return false; } return (_mode == System.IO.Compression.CompressionMode.Decompress && _stream.CanRead); } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { if (_stream == null) { return false; } return (_mode == System.IO.Compression.CompressionMode.Compress && _stream.CanWrite); } } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override async Task FlushAsync(CancellationToken cancellationToken) { if (_stream == null) { throw new ObjectDisposedException(null, "Underlying stream is disposed"); } if (_mode == CompressionMode.Compress) { await FlushBrotliStreamAsync(false).ConfigureAwait(false); } } public override void Flush() { AsyncHelper.RunSync(() => FlushAsync()); } protected virtual async Task FlushBrotliStreamAsync(Boolean finished) { //test if the resource has been freed if (_state == IntPtr.Zero) return; if (Brolib.BrotliEncoderIsFinished(_state)) return; BrotliEncoderOperation op = finished ? BrotliEncoderOperation.Finish : BrotliEncoderOperation.Flush; UInt32 totalOut = 0; while (true) { var compressOK = Brolib.BrotliEncoderCompressStream(_state, op, ref _availableIn, ref _ptrNextInput, ref _availableOut, ref _ptrNextOutput, out totalOut); if (!compressOK) throw new BrotliException("Unable to finish encode stream"); var extraData = _availableOut != BufferSize; if (extraData) { var bytesWrote = (int)(BufferSize - _availableOut); Marshal.Copy(_ptrOutputBuffer, _managedBuffer, 0, bytesWrote); await _stream.WriteAsync(_managedBuffer, 0, bytesWrote).ConfigureAwait(false); _availableOut = BufferSize; _ptrNextOutput = _ptrOutputBuffer; } if (Brolib.BrotliEncoderIsFinished(_state)) break; if (!extraData) break; } } protected virtual void FlushBrotliStream(Boolean finished) { AsyncHelper.RunSync(() => FlushBrotliStreamAsync(finished)); } protected override void Dispose(bool disposing) { if (_mode == CompressionMode.Compress) { FlushBrotliStream(true); } base.Dispose(disposing); if (!_leaveOpen) _stream.Dispose(); _intermediateStream.Dispose(); if (_ptrInputBuffer!=IntPtr.Zero) Marshal.FreeHGlobal(_ptrInputBuffer); if (_ptrOutputBuffer != IntPtr.Zero) Marshal.FreeHGlobal(_ptrOutputBuffer); _managedBuffer = null; _ptrInputBuffer = IntPtr.Zero; _ptrOutputBuffer = IntPtr.Zero; if (_state != IntPtr.Zero) { if (_mode == CompressionMode.Compress) { Brolib.BrotliEncoderDestroyInstance(_state); } else { Brolib.BrotliDecoderDestroyInstance(_state); } _state = IntPtr.Zero; } } public void TruncateBeginning(MemoryStream ms, int numberOfBytesToRemove) { #if NETSTANDARD2_0 ArraySegment buf; if(ms.TryGetBuffer(out buf)) { Buffer.BlockCopy(buf.Array, numberOfBytesToRemove, buf.Array, 0, (int)ms.Length - numberOfBytesToRemove); ms.SetLength(ms.Length - numberOfBytesToRemove); } else { throw new UnauthorizedAccessException(); } #else byte[] buf = ms.GetBuffer(); Buffer.BlockCopy(buf, numberOfBytesToRemove, buf, 0, (int)ms.Length - numberOfBytesToRemove); ms.SetLength(ms.Length - numberOfBytesToRemove); #endif } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (_mode != CompressionMode.Decompress) throw new BrotliException("Can't read on this stream"); int bytesRead = (int)(_intermediateStream.Length - _readOffset); uint totalCount = 0; Boolean endOfStream = false; Boolean errorDetected = false; while (bytesRead < count) { while (true) { if (_lastDecodeResult == BrotliDecoderResult.NeedsMoreInput) { _availableIn = (UInt32) await _stream.ReadAsync(_managedBuffer, 0, (int)BufferSize).ConfigureAwait(false); _ptrNextInput = _ptrInputBuffer; if (_availableIn <= 0) { endOfStream = true; break; } Marshal.Copy(_managedBuffer, 0, _ptrInputBuffer, (int)_availableIn); } else if (_lastDecodeResult == BrotliDecoderResult.NeedsMoreOutput) { Marshal.Copy(_ptrOutputBuffer, _managedBuffer, 0, BufferSize); await _intermediateStream.WriteAsync(_managedBuffer, 0, BufferSize).ConfigureAwait(false); bytesRead += BufferSize; _availableOut = BufferSize; _ptrNextOutput = _ptrOutputBuffer; } else { //Error or OK endOfStream = true; break; } _lastDecodeResult = Brolib.BrotliDecoderDecompressStream(_state, ref _availableIn, ref _ptrNextInput, ref _availableOut, ref _ptrNextOutput, out totalCount); if (bytesRead >= count) break; } if (endOfStream && !Brolib.BrotliDecoderIsFinished(_state)) { errorDetected = true; } if (_lastDecodeResult == BrotliDecoderResult.Error || errorDetected) { var error = Brolib.BrotliDecoderGetErrorCode(_state); var text = Brolib.BrotliDecoderErrorString(error); throw new BrotliDecodeException(String.Format("Unable to decode stream,possibly corrupt data.Code={0}({1})", error, text), error, text); } if (endOfStream && !Brolib.BrotliDecoderIsFinished(_state) && _lastDecodeResult == BrotliDecoderResult.NeedsMoreInput) { throw new BrotliException("Unable to decode stream,unexpected EOF"); } if (endOfStream && _ptrNextOutput != _ptrOutputBuffer) { int remainBytes = (int)(_ptrNextOutput.ToInt64() - _ptrOutputBuffer.ToInt64()); bytesRead += remainBytes; Marshal.Copy(_ptrOutputBuffer, _managedBuffer, 0, remainBytes); await _intermediateStream.WriteAsync(_managedBuffer, 0, remainBytes).ConfigureAwait(false); _ptrNextOutput = _ptrOutputBuffer; } if (endOfStream) break; } if (_intermediateStream.Length - _readOffset >= count || endOfStream) { _intermediateStream.Seek(_readOffset, SeekOrigin.Begin); var bytesToRead = (int)(_intermediateStream.Length - _readOffset); if (bytesToRead > count) bytesToRead = count; await _intermediateStream.ReadAsync(buffer, offset, bytesToRead).ConfigureAwait(false); TruncateBeginning(_intermediateStream, _readOffset + bytesToRead); _readOffset = 0; return bytesToRead; } return 0; } public override int Read(byte[] buffer, int offset, int count) { async Task task() { return await ReadAsync(buffer,offset,count).ConfigureAwait(false); } return AsyncHelper.RunSync(task); } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } static int totalWrote = 0; public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (_mode != CompressionMode.Compress) throw new BrotliException("Can't write on this stream"); totalWrote += count; //Console.WriteLine(String.Format("Write {0} bytes,total={1} bytes.", count, totalWrote)); UInt32 totalOut = 0; int bytesRemain = count; int currentOffset = offset; Boolean compressOK = true; while (bytesRemain > 0) { int copyLen = bytesRemain > BufferSize ? BufferSize : bytesRemain; Marshal.Copy(buffer, currentOffset, _ptrInputBuffer, copyLen); bytesRemain -= copyLen; currentOffset += copyLen; _availableIn = (UInt32)copyLen; _ptrNextInput = _ptrInputBuffer; while (_availableIn > 0) { compressOK = Brolib.BrotliEncoderCompressStream(_state, BrotliEncoderOperation.Process, ref _availableIn, ref _ptrNextInput, ref _availableOut, ref _ptrNextOutput, out totalOut); if (!compressOK) throw new BrotliException("Unable to compress stream"); if (_availableOut != BufferSize) { var bytesWrote = (int)(BufferSize - _availableOut); //Byte[] localBuffer = new Byte[bytesWrote]; Marshal.Copy(_ptrOutputBuffer, _managedBuffer, 0, bytesWrote); await _stream.WriteAsync(_managedBuffer, 0, bytesWrote).ConfigureAwait(false); _availableOut = BufferSize; _ptrNextOutput = _ptrOutputBuffer; } } if (Brolib.BrotliEncoderIsFinished(_state)) break; } } public override void Write(byte[] buffer, int offset, int count) { async Task task() { await WriteAsync(buffer,offset,count).ConfigureAwait(false); } AsyncHelper.RunSync(task); } } #if NET35 /// /// Improve compability issue on FX35 /// public static class StreamCopyExtension { public static void CopyTo(this Stream source,Stream destination, int bufferSize=4*1024) { if (source==null) { throw new ArgumentNullException(nameof(source)); } if (destination==null) { throw new ArgumentNullException(nameof(destination)); } if (!source.CanRead) { throw new InvalidOperationException("source stream is not readable"); } if (!destination.CanWrite) { throw new InvalidOperationException("destination stream is not writeable"); } if (bufferSize<=0) { throw new InvalidOperationException("buffer size should be greate than zero"); } byte[] buffer = new byte[bufferSize]; int read; while ((read = source.Read(buffer, 0, buffer.Length)) > 0) destination.Write(buffer, 0, read); } } #endif }