diff options
Diffstat (limited to 'vendor/github.com/philhofer/fwd/reader.go')
-rw-r--r-- | vendor/github.com/philhofer/fwd/reader.go | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/vendor/github.com/philhofer/fwd/reader.go b/vendor/github.com/philhofer/fwd/reader.go new file mode 100644 index 0000000000..75be62ab09 --- /dev/null +++ b/vendor/github.com/philhofer/fwd/reader.go @@ -0,0 +1,383 @@ +// The `fwd` package provides a buffered reader +// and writer. Each has methods that help improve +// the encoding/decoding performance of some binary +// protocols. +// +// The `fwd.Writer` and `fwd.Reader` type provide similar +// functionality to their counterparts in `bufio`, plus +// a few extra utility methods that simplify read-ahead +// and write-ahead. I wrote this package to improve serialization +// performance for http://github.com/tinylib/msgp, +// where it provided about a 2x speedup over `bufio` for certain +// workloads. However, care must be taken to understand the semantics of the +// extra methods provided by this package, as they allow +// the user to access and manipulate the buffer memory +// directly. +// +// The extra methods for `fwd.Reader` are `Peek`, `Skip` +// and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`, +// will re-allocate the read buffer in order to accommodate arbitrarily +// large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes +// in the stream, and uses the `io.Seeker` interface if the underlying +// stream implements it. `(*fwd.Reader).Next` returns a slice pointing +// to the next `n` bytes in the read buffer (like `Peek`), but also +// increments the read position. This allows users to process streams +// in arbitrary block sizes without having to manage appropriately-sized +// slices. Additionally, obviating the need to copy the data from the +// buffer to another location in memory can improve performance dramatically +// in CPU-bound applications. +// +// `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which +// returns a slice pointing to the next `n` bytes of the writer, and increments +// the write position by the length of the returned slice. This allows users +// to write directly to the end of the buffer. +// +package fwd + +import "io" + +const ( + // DefaultReaderSize is the default size of the read buffer + DefaultReaderSize = 2048 + + // minimum read buffer; straight from bufio + minReaderSize = 16 +) + +// NewReader returns a new *Reader that reads from 'r' +func NewReader(r io.Reader) *Reader { + return NewReaderSize(r, DefaultReaderSize) +} + +// NewReaderSize returns a new *Reader that +// reads from 'r' and has a buffer size 'n' +func NewReaderSize(r io.Reader, n int) *Reader { + rd := &Reader{ + r: r, + data: make([]byte, 0, max(minReaderSize, n)), + } + if s, ok := r.(io.Seeker); ok { + rd.rs = s + } + return rd +} + +// Reader is a buffered look-ahead reader +type Reader struct { + r io.Reader // underlying reader + + // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space + data []byte // data + n int // read offset + state error // last read error + + // if the reader past to NewReader was + // also an io.Seeker, this is non-nil + rs io.Seeker +} + +// Reset resets the underlying reader +// and the read buffer. +func (r *Reader) Reset(rd io.Reader) { + r.r = rd + r.data = r.data[0:0] + r.n = 0 + r.state = nil + if s, ok := rd.(io.Seeker); ok { + r.rs = s + } else { + r.rs = nil + } +} + +// more() does one read on the underlying reader +func (r *Reader) more() { + // move data backwards so that + // the read offset is 0; this way + // we can supply the maximum number of + // bytes to the reader + if r.n != 0 { + if r.n < len(r.data) { + r.data = r.data[:copy(r.data[0:], r.data[r.n:])] + } else { + r.data = r.data[:0] + } + r.n = 0 + } + var a int + a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)]) + if a == 0 && r.state == nil { + r.state = io.ErrNoProgress + return + } else if a > 0 && r.state == io.EOF { + // discard the io.EOF if we read more than 0 bytes. + // the next call to Read should return io.EOF again. + r.state = nil + } + r.data = r.data[:len(r.data)+a] +} + +// pop error +func (r *Reader) err() (e error) { + e, r.state = r.state, nil + return +} + +// pop error; EOF -> io.ErrUnexpectedEOF +func (r *Reader) noEOF() (e error) { + e, r.state = r.state, nil + if e == io.EOF { + e = io.ErrUnexpectedEOF + } + return +} + +// buffered bytes +func (r *Reader) buffered() int { return len(r.data) - r.n } + +// Buffered returns the number of bytes currently in the buffer +func (r *Reader) Buffered() int { return len(r.data) - r.n } + +// BufferSize returns the total size of the buffer +func (r *Reader) BufferSize() int { return cap(r.data) } + +// Peek returns the next 'n' buffered bytes, +// reading from the underlying reader if necessary. +// It will only return a slice shorter than 'n' bytes +// if it also returns an error. Peek does not advance +// the reader. EOF errors are *not* returned as +// io.ErrUnexpectedEOF. +func (r *Reader) Peek(n int) ([]byte, error) { + // in the degenerate case, + // we may need to realloc + // (the caller asked for more + // bytes than the size of the buffer) + if cap(r.data) < n { + old := r.data[r.n:] + r.data = make([]byte, n+r.buffered()) + r.data = r.data[:copy(r.data, old)] + r.n = 0 + } + + // keep filling until + // we hit an error or + // read enough bytes + for r.buffered() < n && r.state == nil { + r.more() + } + + // we must have hit an error + if r.buffered() < n { + return r.data[r.n:], r.err() + } + + return r.data[r.n : r.n+n], nil +} + +// Skip moves the reader forward 'n' bytes. +// Returns the number of bytes skipped and any +// errors encountered. It is analogous to Seek(n, 1). +// If the underlying reader implements io.Seeker, then +// that method will be used to skip forward. +// +// If the reader encounters +// an EOF before skipping 'n' bytes, it +// returns io.ErrUnexpectedEOF. If the +// underlying reader implements io.Seeker, then +// those rules apply instead. (Many implementations +// will not return `io.EOF` until the next call +// to Read.) +func (r *Reader) Skip(n int) (int, error) { + + // fast path + if r.buffered() >= n { + r.n += n + return n, nil + } + + // use seeker implementation + // if we can + if r.rs != nil { + return r.skipSeek(n) + } + + // loop on filling + // and then erasing + o := n + for r.buffered() < n && r.state == nil { + r.more() + // we can skip forward + // up to r.buffered() bytes + step := min(r.buffered(), n) + r.n += step + n -= step + } + // at this point, n should be + // 0 if everything went smoothly + return o - n, r.noEOF() +} + +// Next returns the next 'n' bytes in the stream. +// Unlike Peek, Next advances the reader position. +// The returned bytes point to the same +// data as the buffer, so the slice is +// only valid until the next reader method call. +// An EOF is considered an unexpected error. +// If an the returned slice is less than the +// length asked for, an error will be returned, +// and the reader position will not be incremented. +func (r *Reader) Next(n int) ([]byte, error) { + + // in case the buffer is too small + if cap(r.data) < n { + old := r.data[r.n:] + r.data = make([]byte, n+r.buffered()) + r.data = r.data[:copy(r.data, old)] + r.n = 0 + } + + // fill at least 'n' bytes + for r.buffered() < n && r.state == nil { + r.more() + } + + if r.buffered() < n { + return r.data[r.n:], r.noEOF() + } + out := r.data[r.n : r.n+n] + r.n += n + return out, nil +} + +// skipSeek uses the io.Seeker to seek forward. +// only call this function when n > r.buffered() +func (r *Reader) skipSeek(n int) (int, error) { + o := r.buffered() + // first, clear buffer + n -= o + r.n = 0 + r.data = r.data[:0] + + // then seek forward remaning bytes + i, err := r.rs.Seek(int64(n), 1) + return int(i) + o, err +} + +// Read implements `io.Reader` +func (r *Reader) Read(b []byte) (int, error) { + // if we have data in the buffer, just + // return that. + if r.buffered() != 0 { + x := copy(b, r.data[r.n:]) + r.n += x + return x, nil + } + var n int + // we have no buffered data; determine + // whether or not to buffer or call + // the underlying reader directly + if len(b) >= cap(r.data) { + n, r.state = r.r.Read(b) + } else { + r.more() + n = copy(b, r.data) + r.n = n + } + if n == 0 { + return 0, r.err() + } + return n, nil +} + +// ReadFull attempts to read len(b) bytes into +// 'b'. It returns the number of bytes read into +// 'b', and an error if it does not return len(b). +// EOF is considered an unexpected error. +func (r *Reader) ReadFull(b []byte) (int, error) { + var n int // read into b + var nn int // scratch + l := len(b) + // either read buffered data, + // or read directly for the underlying + // buffer, or fetch more buffered data. + for n < l && r.state == nil { + if r.buffered() != 0 { + nn = copy(b[n:], r.data[r.n:]) + n += nn + r.n += nn + } else if l-n > cap(r.data) { + nn, r.state = r.r.Read(b[n:]) + n += nn + } else { + r.more() + } + } + if n < l { + return n, r.noEOF() + } + return n, nil +} + +// ReadByte implements `io.ByteReader` +func (r *Reader) ReadByte() (byte, error) { + for r.buffered() < 1 && r.state == nil { + r.more() + } + if r.buffered() < 1 { + return 0, r.err() + } + b := r.data[r.n] + r.n++ + return b, nil +} + +// WriteTo implements `io.WriterTo` +func (r *Reader) WriteTo(w io.Writer) (int64, error) { + var ( + i int64 + ii int + err error + ) + // first, clear buffer + if r.buffered() > 0 { + ii, err = w.Write(r.data[r.n:]) + i += int64(ii) + if err != nil { + return i, err + } + r.data = r.data[0:0] + r.n = 0 + } + for r.state == nil { + // here we just do + // 1:1 reads and writes + r.more() + if r.buffered() > 0 { + ii, err = w.Write(r.data) + i += int64(ii) + if err != nil { + return i, err + } + r.data = r.data[0:0] + r.n = 0 + } + } + if r.state != io.EOF { + return i, r.err() + } + return i, nil +} + +func min(a int, b int) int { + if a < b { + return a + } + return b +} + +func max(a int, b int) int { + if a < b { + return b + } + return a +} |