1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
|
package unsnap
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"hash/crc32"
snappy "github.com/golang/snappy"
// The C library can be used, but this makes the binary dependent
// lots of extraneous c-libraries; it is no longer stand-alone. Yuck.
//
// Therefore we comment out the "dgryski/go-csnappy" path and use the
// "github.com/golang/snappy/snappy" above instead. If you are
// performance limited and can deal with distributing more libraries,
// then this is easy to swap.
//
// If you swap, note that some of the tests won't pass
// because snappy-go produces slightly different (but still
// conformant) encodings on some data. Here are bindings
// to the C-snappy:
// snappy "github.com/dgryski/go-csnappy"
)
// SnappyFile: create a drop-in-replacement/wrapper for an *os.File that handles doing the unsnappification online as more is read from it
type SnappyFile struct {
Fname string
Reader io.Reader
Writer io.Writer
// allow clients to substitute us for an os.File and just switch
// off compression if they don't want it.
SnappyEncodeDecodeOff bool // if true, we bypass straight to Filep
EncBuf FixedSizeRingBuf // holds any extra that isn't yet returned, encoded
DecBuf FixedSizeRingBuf // holds any extra that isn't yet returned, decoded
// for writing to stream-framed snappy
HeaderChunkWritten bool
// Sanity check: we can only read, or only write, to one SnappyFile.
// EncBuf and DecBuf are used differently in each mode. Verify
// that we are consistent with this flag.
Writing bool
}
var total int
// for debugging, show state of buffers
func (f *SnappyFile) Dump() {
fmt.Printf("EncBuf has length %d and contents:\n%s\n", len(f.EncBuf.Bytes()), string(f.EncBuf.Bytes()))
fmt.Printf("DecBuf has length %d and contents:\n%s\n", len(f.DecBuf.Bytes()), string(f.DecBuf.Bytes()))
}
func (f *SnappyFile) Read(p []byte) (n int, err error) {
if f.SnappyEncodeDecodeOff {
return f.Reader.Read(p)
}
if f.Writing {
panic("Reading on a write-only SnappyFile")
}
// before we unencrypt more, try to drain the DecBuf first
n, _ = f.DecBuf.Read(p)
if n > 0 {
total += n
return n, nil
}
//nEncRead, nDecAdded, err := UnsnapOneFrame(f.Filep, &f.EncBuf, &f.DecBuf, f.Fname)
_, _, err = UnsnapOneFrame(f.Reader, &f.EncBuf, &f.DecBuf, f.Fname)
if err != nil && err != io.EOF {
panic(err)
}
n, _ = f.DecBuf.Read(p)
if n > 0 {
total += n
return n, nil
}
if f.DecBuf.Readable == 0 {
if f.DecBuf.Readable == 0 && f.EncBuf.Readable == 0 {
// only now (when EncBuf is empty) can we give io.EOF.
// Any earlier, and we leave stuff un-decoded!
return 0, io.EOF
}
}
return 0, nil
}
func Open(name string) (file *SnappyFile, err error) {
fp, err := os.Open(name)
if err != nil {
return nil, err
}
// encoding in snappy can apparently go beyond the original size, so
// we make our buffers big enough, 2*max snappy chunk => 2 * CHUNK_MAX(65536)
snap := NewReader(fp)
snap.Fname = name
return snap, nil
}
func NewReader(r io.Reader) *SnappyFile {
return &SnappyFile{
Reader: r,
EncBuf: *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snappy encoded bytes
DecBuf: *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snapppy decoded bytes
Writing: false,
}
}
func NewWriter(w io.Writer) *SnappyFile {
return &SnappyFile{
Writer: w,
EncBuf: *NewFixedSizeRingBuf(65536), // on writing: temp for testing compression
DecBuf: *NewFixedSizeRingBuf(65536 * 2), // on writing: final buffer of snappy framed and encoded bytes
Writing: true,
}
}
func Create(name string) (file *SnappyFile, err error) {
fp, err := os.Create(name)
if err != nil {
return nil, err
}
snap := NewWriter(fp)
snap.Fname = name
return snap, nil
}
func (f *SnappyFile) Close() error {
if f.Writing {
wc, ok := f.Writer.(io.WriteCloser)
if ok {
return wc.Close()
}
return nil
}
rc, ok := f.Reader.(io.ReadCloser)
if ok {
return rc.Close()
}
return nil
}
func (f *SnappyFile) Sync() error {
file, ok := f.Writer.(*os.File)
if ok {
return file.Sync()
}
return nil
}
// for an increment of a frame at a time:
// read from r into encBuf (encBuf is still encoded, thus the name), and write unsnappified frames into outDecodedBuf
// the returned n: number of bytes read from the encrypted encBuf
func UnsnapOneFrame(r io.Reader, encBuf *FixedSizeRingBuf, outDecodedBuf *FixedSizeRingBuf, fname string) (nEnc int64, nDec int64, err error) {
// b, err := ioutil.ReadAll(r)
// if err != nil {
// panic(err)
// }
nEnc = 0
nDec = 0
// read up to 65536 bytes from r into encBuf, at least a snappy frame
nread, err := io.CopyN(encBuf, r, 65536) // returns nwrotebytes, err
nEnc += nread
if err != nil {
if err == io.EOF {
if nread == 0 {
if encBuf.Readable == 0 {
return nEnc, nDec, io.EOF
}
// else we have bytes in encBuf, so decode them!
err = nil
} else {
// continue below, processing the nread bytes
err = nil
}
} else {
// may be an odd already closed... don't panic on that
if strings.Contains(err.Error(), "file already closed") {
err = nil
} else {
panic(err)
}
}
}
// flag for printing chunk size alignment messages
verbose := false
const snappyStreamHeaderSz = 10
const headerSz = 4
const crc32Sz = 4
// the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum
// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
chunk := (*encBuf).Bytes()
// however we exit, advance as
// defer func() { (*encBuf).Next(N) }()
// 65536 is the max size of a snappy framed chunk. See
// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91
// buf := make([]byte, 65536)
// fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b)
// fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut)
//fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14])
fourbytes := make([]byte, 4)
chunkCount := 0
for nDec < 65536 {
if len(chunk) == 0 {
break
}
chunkCount++
fourbytes[3] = 0
copy(fourbytes, chunk[1:4])
chunksz := binary.LittleEndian.Uint32(fourbytes)
chunk_type := chunk[0]
switch true {
case chunk_type == 0xff:
{ // stream identifier
streamHeader := chunk[:snappyStreamHeaderSz]
if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.")
} else {
//fmt.Printf("got streaming snappy magic header just fine.\n")
}
chunk = chunk[snappyStreamHeaderSz:]
(*encBuf).Advance(snappyStreamHeaderSz)
nEnc += snappyStreamHeaderSz
continue
}
case chunk_type == 0x00:
{ // compressed data
if verbose {
fmt.Fprintf(os.Stderr, "chunksz is %d while total bytes avail are: %d\n", int(chunksz), len(chunk)-4)
}
crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
dec, ok := snappy.Decode(nil, section)
if ok != nil {
// we've probably truncated a snappy frame at this point
// ok=snappy: corrupt input
// len(dec) == 0
//
panic(fmt.Sprintf("could not decode snappy stream: '%s' and len dec=%d and ok=%v\n", fname, len(dec), ok))
// get back to caller with what we've got so far
return nEnc, nDec, nil
}
// fmt.Printf("ok, b is %#v , %#v\n", ok, dec)
// spit out decoded text
// n, err := w.Write(dec)
//fmt.Printf("len(dec) = %d, outDecodedBuf.Readable=%d\n", len(dec), outDecodedBuf.Readable)
bnb := bytes.NewBuffer(dec)
n, err := io.Copy(outDecodedBuf, bnb)
if err != nil {
//fmt.Printf("got n=%d, err= %s ; when trying to io.Copy(outDecodedBuf: N=%d, Readable=%d)\n", n, err, outDecodedBuf.N, outDecodedBuf.Readable)
panic(err)
}
if n != int64(len(dec)) {
panic("could not write all bytes to outDecodedBuf")
}
nDec += n
// verify the crc32 rotated checksum
m32 := masked_crc32c(dec)
if m32 != crc {
panic(fmt.Sprintf("crc32 masked failiure. expected: %v but got: %v", crc, m32))
} else {
//fmt.Printf("\nchecksums match: %v == %v\n", crc, m32)
}
// move to next header
inc := (headerSz + int(chunksz))
chunk = chunk[inc:]
(*encBuf).Advance(inc)
nEnc += int64(inc)
continue
}
case chunk_type == 0x01:
{ // uncompressed data
//n, err := w.Write(chunk[(headerSz+crc32Sz):(headerSz + int(chunksz))])
n, err := io.Copy(outDecodedBuf, bytes.NewBuffer(chunk[(headerSz+crc32Sz):(headerSz+int(chunksz))]))
if verbose {
//fmt.Printf("debug: n=%d err=%v chunksz=%d outDecodedBuf='%v'\n", n, err, chunksz, outDecodedBuf)
}
if err != nil {
panic(err)
}
if n != int64(chunksz-crc32Sz) {
panic("could not write all bytes to stdout")
}
nDec += n
inc := (headerSz + int(chunksz))
chunk = chunk[inc:]
(*encBuf).Advance(inc)
nEnc += int64(inc)
continue
}
case chunk_type == 0xfe:
fallthrough // padding, just skip it
case chunk_type >= 0x80 && chunk_type <= 0xfd:
{ // Reserved skippable chunks
//fmt.Printf("\nin reserved skippable chunks, at nEnc=%v\n", nEnc)
inc := (headerSz + int(chunksz))
chunk = chunk[inc:]
nEnc += int64(inc)
(*encBuf).Advance(inc)
continue
}
default:
panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type))
}
} // end for{}
return nEnc, nDec, err
//return int64(N), nil
}
// for whole file at once:
//
// receive on stdin a stream of bytes in the snappy-streaming framed
// format, defined here: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
// Grab each frame, run it through the snappy decoder, and spit out
// each frame all joined back-to-back on stdout.
//
func Unsnappy(r io.Reader, w io.Writer) (err error) {
b, err := ioutil.ReadAll(r)
if err != nil {
panic(err)
}
// flag for printing chunk size alignment messages
verbose := false
const snappyStreamHeaderSz = 10
const headerSz = 4
const crc32Sz = 4
// the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum
// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
chunk := b[:]
// 65536 is the max size of a snappy framed chunk. See
// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91
//buf := make([]byte, 65536)
// fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b)
// fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut)
//fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14])
fourbytes := make([]byte, 4)
chunkCount := 0
for {
if len(chunk) == 0 {
break
}
chunkCount++
fourbytes[3] = 0
copy(fourbytes, chunk[1:4])
chunksz := binary.LittleEndian.Uint32(fourbytes)
chunk_type := chunk[0]
switch true {
case chunk_type == 0xff:
{ // stream identifier
streamHeader := chunk[:snappyStreamHeaderSz]
if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.")
} else {
//fmt.Printf("got streaming snappy magic header just fine.\n")
}
chunk = chunk[snappyStreamHeaderSz:]
continue
}
case chunk_type == 0x00:
{ // compressed data
if verbose {
fmt.Fprintf(os.Stderr, "chunksz is %d while total bytes avail are: %d\n", int(chunksz), len(chunk)-4)
}
//crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
dec, ok := snappy.Decode(nil, section)
if ok != nil {
panic("could not decode snappy stream")
}
// fmt.Printf("ok, b is %#v , %#v\n", ok, dec)
// spit out decoded text
n, err := w.Write(dec)
if err != nil {
panic(err)
}
if n != len(dec) {
panic("could not write all bytes to stdout")
}
// TODO: verify the crc32 rotated checksum?
// move to next header
chunk = chunk[(headerSz + int(chunksz)):]
continue
}
case chunk_type == 0x01:
{ // uncompressed data
//crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
n, err := w.Write(section)
if err != nil {
panic(err)
}
if n != int(chunksz-crc32Sz) {
panic("could not write all bytes to stdout")
}
chunk = chunk[(headerSz + int(chunksz)):]
continue
}
case chunk_type == 0xfe:
fallthrough // padding, just skip it
case chunk_type >= 0x80 && chunk_type <= 0xfd:
{ // Reserved skippable chunks
chunk = chunk[(headerSz + int(chunksz)):]
continue
}
default:
panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type))
}
} // end for{}
return nil
}
// 0xff 0x06 0x00 0x00 sNaPpY
var SnappyStreamHeaderMagic = []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}
const CHUNK_MAX = 65536
const _STREAM_TO_STREAM_BLOCK_SIZE = CHUNK_MAX
const _STREAM_IDENTIFIER = `sNaPpY`
const _COMPRESSED_CHUNK = 0x00
const _UNCOMPRESSED_CHUNK = 0x01
const _IDENTIFIER_CHUNK = 0xff
const _RESERVED_UNSKIPPABLE0 = 0x02 // chunk ranges are [inclusive, exclusive)
const _RESERVED_UNSKIPPABLE1 = 0x80
const _RESERVED_SKIPPABLE0 = 0x80
const _RESERVED_SKIPPABLE1 = 0xff
// the minimum percent of bytes compression must save to be enabled in automatic
// mode
const _COMPRESSION_THRESHOLD = .125
var crctab *crc32.Table
func init() {
crctab = crc32.MakeTable(crc32.Castagnoli) // this is correct table, matches the crc32c.c code used by python
}
func masked_crc32c(data []byte) uint32 {
// see the framing format specification, http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
var crc uint32 = crc32.Checksum(data, crctab)
return (uint32((crc>>15)|(crc<<17)) + 0xa282ead8)
}
func ReadSnappyStreamCompressedFile(filename string) ([]byte, error) {
snappyFile, err := Open(filename)
if err != nil {
return []byte{}, err
}
var bb bytes.Buffer
_, err = bb.ReadFrom(snappyFile)
if err == io.EOF {
err = nil
}
if err != nil {
panic(err)
}
return bb.Bytes(), err
}
|