You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

decoder.go 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "bytes"
  7. "errors"
  8. "io"
  9. "sync"
  10. )
  11. // Decoder provides decoding of zstandard streams.
  12. // The decoder has been designed to operate without allocations after a warmup.
  13. // This means that you should store the decoder for best performance.
  14. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  15. // A decoder can safely be re-used even if the previous stream failed.
  16. // To release the resources, you must call the Close() function on a decoder.
  17. type Decoder struct {
  18. o decoderOptions
  19. // Unreferenced decoders, ready for use.
  20. decoders chan *blockDec
  21. // Streams ready to be decoded.
  22. stream chan decodeStream
  23. // Current read position used for Reader functionality.
  24. current decoderState
  25. // Custom dictionaries.
  26. // Always uses copies.
  27. dicts map[uint32]dict
  28. // streamWg is the waitgroup for all streams
  29. streamWg sync.WaitGroup
  30. }
  31. // decoderState is used for maintaining state when the decoder
  32. // is used for streaming.
  33. type decoderState struct {
  34. // current block being written to stream.
  35. decodeOutput
  36. // output in order to be written to stream.
  37. output chan decodeOutput
  38. // cancel remaining output.
  39. cancel chan struct{}
  40. flushed bool
  41. }
  42. var (
  43. // Check the interfaces we want to support.
  44. _ = io.WriterTo(&Decoder{})
  45. _ = io.Reader(&Decoder{})
  46. )
  47. // NewReader creates a new decoder.
  48. // A nil Reader can be provided in which case Reset can be used to start a decode.
  49. //
  50. // A Decoder can be used in two modes:
  51. //
  52. // 1) As a stream, or
  53. // 2) For stateless decoding using DecodeAll.
  54. //
  55. // Only a single stream can be decoded concurrently, but the same decoder
  56. // can run multiple concurrent stateless decodes. It is even possible to
  57. // use stateless decodes while a stream is being decoded.
  58. //
  59. // The Reset function can be used to initiate a new stream, which is will considerably
  60. // reduce the allocations normally caused by NewReader.
  61. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  62. initPredefined()
  63. var d Decoder
  64. d.o.setDefault()
  65. for _, o := range opts {
  66. err := o(&d.o)
  67. if err != nil {
  68. return nil, err
  69. }
  70. }
  71. d.current.output = make(chan decodeOutput, d.o.concurrent)
  72. d.current.flushed = true
  73. // Transfer option dicts.
  74. d.dicts = make(map[uint32]dict, len(d.o.dicts))
  75. for _, dc := range d.o.dicts {
  76. d.dicts[dc.id] = dc
  77. }
  78. d.o.dicts = nil
  79. // Create decoders
  80. d.decoders = make(chan *blockDec, d.o.concurrent)
  81. for i := 0; i < d.o.concurrent; i++ {
  82. dec := newBlockDec(d.o.lowMem)
  83. dec.localFrame = newFrameDec(d.o)
  84. d.decoders <- dec
  85. }
  86. if r == nil {
  87. return &d, nil
  88. }
  89. return &d, d.Reset(r)
  90. }
  91. // Read bytes from the decompressed stream into p.
  92. // Returns the number of bytes written and any error that occurred.
  93. // When the stream is done, io.EOF will be returned.
  94. func (d *Decoder) Read(p []byte) (int, error) {
  95. if d.stream == nil {
  96. return 0, errors.New("no input has been initialized")
  97. }
  98. var n int
  99. for {
  100. if len(d.current.b) > 0 {
  101. filled := copy(p, d.current.b)
  102. p = p[filled:]
  103. d.current.b = d.current.b[filled:]
  104. n += filled
  105. }
  106. if len(p) == 0 {
  107. break
  108. }
  109. if len(d.current.b) == 0 {
  110. // We have an error and no more data
  111. if d.current.err != nil {
  112. break
  113. }
  114. if !d.nextBlock(n == 0) {
  115. return n, nil
  116. }
  117. }
  118. }
  119. if len(d.current.b) > 0 {
  120. if debug {
  121. println("returning", n, "still bytes left:", len(d.current.b))
  122. }
  123. // Only return error at end of block
  124. return n, nil
  125. }
  126. if d.current.err != nil {
  127. d.drainOutput()
  128. }
  129. if debug {
  130. println("returning", n, d.current.err, len(d.decoders))
  131. }
  132. return n, d.current.err
  133. }
  134. // Reset will reset the decoder the supplied stream after the current has finished processing.
  135. // Note that this functionality cannot be used after Close has been called.
  136. func (d *Decoder) Reset(r io.Reader) error {
  137. if d.current.err == ErrDecoderClosed {
  138. return d.current.err
  139. }
  140. if r == nil {
  141. return errors.New("nil Reader sent as input")
  142. }
  143. if d.stream == nil {
  144. d.stream = make(chan decodeStream, 1)
  145. d.streamWg.Add(1)
  146. go d.startStreamDecoder(d.stream)
  147. }
  148. d.drainOutput()
  149. // If bytes buffer and < 1MB, do sync decoding anyway.
  150. if bb, ok := r.(*bytes.Buffer); ok && bb.Len() < 1<<20 {
  151. if debug {
  152. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  153. }
  154. b := bb.Bytes()
  155. var dst []byte
  156. if cap(d.current.b) > 0 {
  157. dst = d.current.b
  158. }
  159. dst, err := d.DecodeAll(b, dst[:0])
  160. if err == nil {
  161. err = io.EOF
  162. }
  163. d.current.b = dst
  164. d.current.err = err
  165. d.current.flushed = true
  166. if debug {
  167. println("sync decode to", len(dst), "bytes, err:", err)
  168. }
  169. return nil
  170. }
  171. // Remove current block.
  172. d.current.decodeOutput = decodeOutput{}
  173. d.current.err = nil
  174. d.current.cancel = make(chan struct{})
  175. d.current.flushed = false
  176. d.current.d = nil
  177. d.stream <- decodeStream{
  178. r: r,
  179. output: d.current.output,
  180. cancel: d.current.cancel,
  181. }
  182. return nil
  183. }
  184. // drainOutput will drain the output until errEndOfStream is sent.
  185. func (d *Decoder) drainOutput() {
  186. if d.current.cancel != nil {
  187. println("cancelling current")
  188. close(d.current.cancel)
  189. d.current.cancel = nil
  190. }
  191. if d.current.d != nil {
  192. if debug {
  193. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  194. }
  195. d.decoders <- d.current.d
  196. d.current.d = nil
  197. d.current.b = nil
  198. }
  199. if d.current.output == nil || d.current.flushed {
  200. println("current already flushed")
  201. return
  202. }
  203. for {
  204. select {
  205. case v := <-d.current.output:
  206. if v.d != nil {
  207. if debug {
  208. printf("re-adding decoder %p", v.d)
  209. }
  210. d.decoders <- v.d
  211. }
  212. if v.err == errEndOfStream {
  213. println("current flushed")
  214. d.current.flushed = true
  215. return
  216. }
  217. }
  218. }
  219. }
  220. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  221. // The return value n is the number of bytes written.
  222. // Any error encountered during the write is also returned.
  223. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  224. if d.stream == nil {
  225. return 0, errors.New("no input has been initialized")
  226. }
  227. var n int64
  228. for {
  229. if len(d.current.b) > 0 {
  230. n2, err2 := w.Write(d.current.b)
  231. n += int64(n2)
  232. if err2 != nil && d.current.err == nil {
  233. d.current.err = err2
  234. break
  235. }
  236. }
  237. if d.current.err != nil {
  238. break
  239. }
  240. d.nextBlock(true)
  241. }
  242. err := d.current.err
  243. if err != nil {
  244. d.drainOutput()
  245. }
  246. if err == io.EOF {
  247. err = nil
  248. }
  249. return n, err
  250. }
  251. // DecodeAll allows stateless decoding of a blob of bytes.
  252. // Output will be appended to dst, so if the destination size is known
  253. // you can pre-allocate the destination slice to avoid allocations.
  254. // DecodeAll can be used concurrently.
  255. // The Decoder concurrency limits will be respected.
  256. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  257. if d.current.err == ErrDecoderClosed {
  258. return dst, ErrDecoderClosed
  259. }
  260. // Grab a block decoder and frame decoder.
  261. block := <-d.decoders
  262. frame := block.localFrame
  263. defer func() {
  264. if debug {
  265. printf("re-adding decoder: %p", block)
  266. }
  267. frame.rawInput = nil
  268. frame.bBuf = nil
  269. d.decoders <- block
  270. }()
  271. frame.bBuf = input
  272. for {
  273. frame.history.reset()
  274. err := frame.reset(&frame.bBuf)
  275. if err == io.EOF {
  276. if debug {
  277. println("frame reset return EOF")
  278. }
  279. return dst, nil
  280. }
  281. if frame.DictionaryID != nil {
  282. dict, ok := d.dicts[*frame.DictionaryID]
  283. if !ok {
  284. return nil, ErrUnknownDictionary
  285. }
  286. frame.history.setDict(&dict)
  287. }
  288. if err != nil {
  289. return dst, err
  290. }
  291. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
  292. return dst, ErrDecoderSizeExceeded
  293. }
  294. if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
  295. // Never preallocate moe than 1 GB up front.
  296. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  297. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
  298. copy(dst2, dst)
  299. dst = dst2
  300. }
  301. }
  302. if cap(dst) == 0 {
  303. // Allocate len(input) * 2 by default if nothing is provided
  304. // and we didn't get frame content size.
  305. size := len(input) * 2
  306. // Cap to 1 MB.
  307. if size > 1<<20 {
  308. size = 1 << 20
  309. }
  310. if uint64(size) > d.o.maxDecodedSize {
  311. size = int(d.o.maxDecodedSize)
  312. }
  313. dst = make([]byte, 0, size)
  314. }
  315. dst, err = frame.runDecoder(dst, block)
  316. if err != nil {
  317. return dst, err
  318. }
  319. if len(frame.bBuf) == 0 {
  320. if debug {
  321. println("frame dbuf empty")
  322. }
  323. break
  324. }
  325. }
  326. return dst, nil
  327. }
  328. // nextBlock returns the next block.
  329. // If an error occurs d.err will be set.
  330. // Optionally the function can block for new output.
  331. // If non-blocking mode is used the returned boolean will be false
  332. // if no data was available without blocking.
  333. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  334. if d.current.d != nil {
  335. if debug {
  336. printf("re-adding current decoder %p", d.current.d)
  337. }
  338. d.decoders <- d.current.d
  339. d.current.d = nil
  340. }
  341. if d.current.err != nil {
  342. // Keep error state.
  343. return blocking
  344. }
  345. if blocking {
  346. d.current.decodeOutput = <-d.current.output
  347. } else {
  348. select {
  349. case d.current.decodeOutput = <-d.current.output:
  350. default:
  351. return false
  352. }
  353. }
  354. if debug {
  355. println("got", len(d.current.b), "bytes, error:", d.current.err)
  356. }
  357. return true
  358. }
  359. // Close will release all resources.
  360. // It is NOT possible to reuse the decoder after this.
  361. func (d *Decoder) Close() {
  362. if d.current.err == ErrDecoderClosed {
  363. return
  364. }
  365. d.drainOutput()
  366. if d.stream != nil {
  367. close(d.stream)
  368. d.streamWg.Wait()
  369. d.stream = nil
  370. }
  371. if d.decoders != nil {
  372. close(d.decoders)
  373. for dec := range d.decoders {
  374. dec.Close()
  375. }
  376. d.decoders = nil
  377. }
  378. if d.current.d != nil {
  379. d.current.d.Close()
  380. d.current.d = nil
  381. }
  382. d.current.err = ErrDecoderClosed
  383. }
  384. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  385. // Any changes to the decoder will be reflected, so the returned ReadCloser
  386. // can be reused along with the decoder.
  387. // io.WriterTo is also supported by the returned ReadCloser.
  388. func (d *Decoder) IOReadCloser() io.ReadCloser {
  389. return closeWrapper{d: d}
  390. }
  391. // closeWrapper wraps a function call as a closer.
  392. type closeWrapper struct {
  393. d *Decoder
  394. }
  395. // WriteTo forwards WriteTo calls to the decoder.
  396. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  397. return c.d.WriteTo(w)
  398. }
  399. // Read forwards read calls to the decoder.
  400. func (c closeWrapper) Read(p []byte) (n int, err error) {
  401. return c.d.Read(p)
  402. }
  403. // Close closes the decoder.
  404. func (c closeWrapper) Close() error {
  405. c.d.Close()
  406. return nil
  407. }
  408. type decodeOutput struct {
  409. d *blockDec
  410. b []byte
  411. err error
  412. }
  413. type decodeStream struct {
  414. r io.Reader
  415. // Blocks ready to be written to output.
  416. output chan decodeOutput
  417. // cancel reading from the input
  418. cancel chan struct{}
  419. }
  420. // errEndOfStream indicates that everything from the stream was read.
  421. var errEndOfStream = errors.New("end-of-stream")
  422. // Create Decoder:
  423. // Spawn n block decoders. These accept tasks to decode a block.
  424. // Create goroutine that handles stream processing, this will send history to decoders as they are available.
  425. // Decoders update the history as they decode.
  426. // When a block is returned:
  427. // a) history is sent to the next decoder,
  428. // b) content written to CRC.
  429. // c) return data to WRITER.
  430. // d) wait for next block to return data.
  431. // Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
  432. func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
  433. defer d.streamWg.Done()
  434. frame := newFrameDec(d.o)
  435. for stream := range inStream {
  436. if debug {
  437. println("got new stream")
  438. }
  439. br := readerWrapper{r: stream.r}
  440. decodeStream:
  441. for {
  442. frame.history.reset()
  443. err := frame.reset(&br)
  444. if debug && err != nil {
  445. println("Frame decoder returned", err)
  446. }
  447. if err == nil && frame.DictionaryID != nil {
  448. dict, ok := d.dicts[*frame.DictionaryID]
  449. if !ok {
  450. err = ErrUnknownDictionary
  451. } else {
  452. frame.history.setDict(&dict)
  453. }
  454. }
  455. if err != nil {
  456. stream.output <- decodeOutput{
  457. err: err,
  458. }
  459. break
  460. }
  461. if debug {
  462. println("starting frame decoder")
  463. }
  464. // This goroutine will forward history between frames.
  465. frame.frameDone.Add(1)
  466. frame.initAsync()
  467. go frame.startDecoder(stream.output)
  468. decodeFrame:
  469. // Go through all blocks of the frame.
  470. for {
  471. dec := <-d.decoders
  472. select {
  473. case <-stream.cancel:
  474. if !frame.sendErr(dec, io.EOF) {
  475. // To not let the decoder dangle, send it back.
  476. stream.output <- decodeOutput{d: dec}
  477. }
  478. break decodeStream
  479. default:
  480. }
  481. err := frame.next(dec)
  482. switch err {
  483. case io.EOF:
  484. // End of current frame, no error
  485. println("EOF on next block")
  486. break decodeFrame
  487. case nil:
  488. continue
  489. default:
  490. println("block decoder returned", err)
  491. break decodeStream
  492. }
  493. }
  494. // All blocks have started decoding, check if there are more frames.
  495. println("waiting for done")
  496. frame.frameDone.Wait()
  497. println("done waiting...")
  498. }
  499. frame.frameDone.Wait()
  500. println("Sending EOS")
  501. stream.output <- decodeOutput{err: errEndOfStream}
  502. }
  503. }