You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

writer.go 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package lz4
  2. import (
  3. "io"
  4. "github.com/pierrec/lz4/v4/internal/lz4block"
  5. "github.com/pierrec/lz4/v4/internal/lz4errors"
  6. "github.com/pierrec/lz4/v4/internal/lz4stream"
  7. )
  8. var writerStates = []aState{
  9. noState: newState,
  10. newState: writeState,
  11. writeState: closedState,
  12. closedState: newState,
  13. errorState: newState,
  14. }
  15. // NewWriter returns a new LZ4 frame encoder.
  16. func NewWriter(w io.Writer) *Writer {
  17. zw := &Writer{frame: lz4stream.NewFrame()}
  18. zw.state.init(writerStates)
  19. _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
  20. zw.Reset(w)
  21. return zw
  22. }
  23. // Writer allows writing an LZ4 stream.
  24. type Writer struct {
  25. state _State
  26. src io.Writer // destination writer
  27. level lz4block.CompressionLevel // how hard to try
  28. num int // concurrency level
  29. frame *lz4stream.Frame // frame being built
  30. data []byte // pending data
  31. idx int // size of pending data
  32. handler func(int)
  33. legacy bool
  34. }
  35. func (*Writer) private() {}
  36. func (w *Writer) Apply(options ...Option) (err error) {
  37. defer w.state.check(&err)
  38. switch w.state.state {
  39. case newState:
  40. case errorState:
  41. return w.state.err
  42. default:
  43. return lz4errors.ErrOptionClosedOrError
  44. }
  45. for _, o := range options {
  46. if err = o(w); err != nil {
  47. return
  48. }
  49. }
  50. w.Reset(w.src)
  51. return
  52. }
  53. func (w *Writer) isNotConcurrent() bool {
  54. return w.num == 1
  55. }
  56. // init sets up the Writer when in newState. It does not change the Writer state.
  57. func (w *Writer) init() error {
  58. w.frame.InitW(w.src, w.num, w.legacy)
  59. if true || !w.isNotConcurrent() {
  60. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  61. w.data = size.Get()
  62. }
  63. w.idx = 0
  64. return w.frame.Descriptor.Write(w.frame, w.src)
  65. }
  66. func (w *Writer) Write(buf []byte) (n int, err error) {
  67. defer w.state.check(&err)
  68. switch w.state.state {
  69. case writeState:
  70. case closedState, errorState:
  71. return 0, w.state.err
  72. case newState:
  73. if err = w.init(); w.state.next(err) {
  74. return
  75. }
  76. default:
  77. return 0, w.state.fail()
  78. }
  79. zn := len(w.data)
  80. for len(buf) > 0 {
  81. if w.idx == 0 && len(buf) >= zn {
  82. // Avoid a copy as there is enough data for a block.
  83. if err = w.write(buf[:zn], false); err != nil {
  84. return
  85. }
  86. n += zn
  87. buf = buf[zn:]
  88. continue
  89. }
  90. // Accumulate the data to be compressed.
  91. m := copy(w.data[w.idx:], buf)
  92. n += m
  93. w.idx += m
  94. buf = buf[m:]
  95. if w.idx < len(w.data) {
  96. // Buffer not filled.
  97. return
  98. }
  99. // Buffer full.
  100. if err = w.write(w.data, true); err != nil {
  101. return
  102. }
  103. if !w.isNotConcurrent() {
  104. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  105. w.data = size.Get()
  106. }
  107. w.idx = 0
  108. }
  109. return
  110. }
  111. func (w *Writer) write(data []byte, safe bool) error {
  112. if w.isNotConcurrent() {
  113. block := w.frame.Blocks.Block
  114. err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
  115. w.handler(len(block.Data))
  116. return err
  117. }
  118. c := make(chan *lz4stream.FrameDataBlock)
  119. w.frame.Blocks.Blocks <- c
  120. go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) {
  121. b := lz4stream.NewFrameDataBlock(w.frame)
  122. c <- b.Compress(w.frame, data, w.level)
  123. <-c
  124. w.handler(len(b.Data))
  125. b.Close(w.frame)
  126. if safe {
  127. // safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
  128. lz4block.Put(data)
  129. }
  130. }(c, data, safe)
  131. return nil
  132. }
  133. // Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
  134. // but does not close the underlying io.Writer.
  135. func (w *Writer) Close() (err error) {
  136. switch w.state.state {
  137. case writeState:
  138. case errorState:
  139. return w.state.err
  140. default:
  141. return nil
  142. }
  143. defer w.state.nextd(&err)
  144. if w.idx > 0 {
  145. // Flush pending data, disable w.data freeing as it is done later on.
  146. if err = w.write(w.data[:w.idx], false); err != nil {
  147. return err
  148. }
  149. w.idx = 0
  150. }
  151. err = w.frame.CloseW(w.src, w.num)
  152. // It is now safe to free the buffer.
  153. if w.data != nil {
  154. lz4block.Put(w.data)
  155. w.data = nil
  156. }
  157. return
  158. }
  159. // Reset clears the state of the Writer w such that it is equivalent to its
  160. // initial state from NewWriter, but instead writing to writer.
  161. // Reset keeps the previous options unless overwritten by the supplied ones.
  162. // No access to writer is performed.
  163. //
  164. // w.Close must be called before Reset or pending data may be dropped.
  165. func (w *Writer) Reset(writer io.Writer) {
  166. w.frame.Reset(w.num)
  167. w.state.reset()
  168. w.src = writer
  169. }
  170. // ReadFrom efficiently reads from r and compressed into the Writer destination.
  171. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  172. switch w.state.state {
  173. case closedState, errorState:
  174. return 0, w.state.err
  175. case newState:
  176. if err = w.init(); w.state.next(err) {
  177. return
  178. }
  179. default:
  180. return 0, w.state.fail()
  181. }
  182. defer w.state.check(&err)
  183. size := w.frame.Descriptor.Flags.BlockSizeIndex()
  184. var done bool
  185. var rn int
  186. data := size.Get()
  187. if w.isNotConcurrent() {
  188. // Keep the same buffer for the whole process.
  189. defer lz4block.Put(data)
  190. }
  191. for !done {
  192. rn, err = io.ReadFull(r, data)
  193. switch err {
  194. case nil:
  195. case io.EOF, io.ErrUnexpectedEOF: // read may be partial
  196. done = true
  197. default:
  198. return
  199. }
  200. n += int64(rn)
  201. err = w.write(data[:rn], true)
  202. if err != nil {
  203. return
  204. }
  205. w.handler(rn)
  206. if !done && !w.isNotConcurrent() {
  207. // The buffer will be returned automatically by go routines (safe=true)
  208. // so get a new one fo the next round.
  209. data = size.Get()
  210. }
  211. }
  212. err = w.Close()
  213. return
  214. }