Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

upsidedown.go 26KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072
  1. // Copyright (c) 2014 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:generate protoc --gofast_out=. upsidedown.proto
  15. package upsidedown
  16. import (
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "math"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. "github.com/blevesearch/bleve/analysis"
  25. "github.com/blevesearch/bleve/document"
  26. "github.com/blevesearch/bleve/index"
  27. "github.com/blevesearch/bleve/index/store"
  28. "github.com/blevesearch/bleve/registry"
  29. "github.com/golang/protobuf/proto"
  30. )
  31. const Name = "upside_down"
  32. // RowBufferSize should ideally this is sized to be the smallest
  33. // size that can contain an index row key and its corresponding
  34. // value. It is not a limit, if need be a larger buffer is
  35. // allocated, but performance will be more optimal if *most*
  36. // rows fit this size.
  37. const RowBufferSize = 4 * 1024
  38. var VersionKey = []byte{'v'}
  39. const Version uint8 = 7
  40. var IncompatibleVersion = fmt.Errorf("incompatible version, %d is supported", Version)
  41. type UpsideDownCouch struct {
  42. version uint8
  43. path string
  44. storeName string
  45. storeConfig map[string]interface{}
  46. store store.KVStore
  47. fieldCache *index.FieldCache
  48. analysisQueue *index.AnalysisQueue
  49. stats *indexStat
  50. m sync.RWMutex
  51. // fields protected by m
  52. docCount uint64
  53. writeMutex sync.Mutex
  54. }
  55. type docBackIndexRow struct {
  56. docID string
  57. doc *document.Document // If deletion, doc will be nil.
  58. backIndexRow *BackIndexRow
  59. }
  60. func NewUpsideDownCouch(storeName string, storeConfig map[string]interface{}, analysisQueue *index.AnalysisQueue) (index.Index, error) {
  61. rv := &UpsideDownCouch{
  62. version: Version,
  63. fieldCache: index.NewFieldCache(),
  64. storeName: storeName,
  65. storeConfig: storeConfig,
  66. analysisQueue: analysisQueue,
  67. }
  68. rv.stats = &indexStat{i: rv}
  69. return rv, nil
  70. }
  71. func (udc *UpsideDownCouch) init(kvwriter store.KVWriter) (err error) {
  72. // version marker
  73. rowsAll := [][]UpsideDownCouchRow{
  74. {NewVersionRow(udc.version)},
  75. }
  76. err = udc.batchRows(kvwriter, nil, rowsAll, nil)
  77. return
  78. }
  79. func (udc *UpsideDownCouch) loadSchema(kvreader store.KVReader) (err error) {
  80. it := kvreader.PrefixIterator([]byte{'f'})
  81. defer func() {
  82. if cerr := it.Close(); err == nil && cerr != nil {
  83. err = cerr
  84. }
  85. }()
  86. key, val, valid := it.Current()
  87. for valid {
  88. var fieldRow *FieldRow
  89. fieldRow, err = NewFieldRowKV(key, val)
  90. if err != nil {
  91. return
  92. }
  93. udc.fieldCache.AddExisting(fieldRow.name, fieldRow.index)
  94. it.Next()
  95. key, val, valid = it.Current()
  96. }
  97. val, err = kvreader.Get([]byte{'v'})
  98. if err != nil {
  99. return
  100. }
  101. var vr *VersionRow
  102. vr, err = NewVersionRowKV([]byte{'v'}, val)
  103. if err != nil {
  104. return
  105. }
  106. if vr.version != Version {
  107. err = IncompatibleVersion
  108. return
  109. }
  110. return
  111. }
  112. var rowBufferPool sync.Pool
  113. func GetRowBuffer() []byte {
  114. if rb, ok := rowBufferPool.Get().([]byte); ok {
  115. return rb
  116. } else {
  117. return make([]byte, RowBufferSize)
  118. }
  119. }
  120. func PutRowBuffer(buf []byte) {
  121. rowBufferPool.Put(buf)
  122. }
  123. func (udc *UpsideDownCouch) batchRows(writer store.KVWriter, addRowsAll [][]UpsideDownCouchRow, updateRowsAll [][]UpsideDownCouchRow, deleteRowsAll [][]UpsideDownCouchRow) (err error) {
  124. dictionaryDeltas := make(map[string]int64)
  125. // count up bytes needed for buffering.
  126. addNum := 0
  127. addKeyBytes := 0
  128. addValBytes := 0
  129. updateNum := 0
  130. updateKeyBytes := 0
  131. updateValBytes := 0
  132. deleteNum := 0
  133. deleteKeyBytes := 0
  134. rowBuf := GetRowBuffer()
  135. for _, addRows := range addRowsAll {
  136. for _, row := range addRows {
  137. tfr, ok := row.(*TermFrequencyRow)
  138. if ok {
  139. if tfr.DictionaryRowKeySize() > len(rowBuf) {
  140. rowBuf = make([]byte, tfr.DictionaryRowKeySize())
  141. }
  142. dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
  143. if err != nil {
  144. return err
  145. }
  146. dictionaryDeltas[string(rowBuf[:dictKeySize])] += 1
  147. }
  148. addKeyBytes += row.KeySize()
  149. addValBytes += row.ValueSize()
  150. }
  151. addNum += len(addRows)
  152. }
  153. for _, updateRows := range updateRowsAll {
  154. for _, row := range updateRows {
  155. updateKeyBytes += row.KeySize()
  156. updateValBytes += row.ValueSize()
  157. }
  158. updateNum += len(updateRows)
  159. }
  160. for _, deleteRows := range deleteRowsAll {
  161. for _, row := range deleteRows {
  162. tfr, ok := row.(*TermFrequencyRow)
  163. if ok {
  164. // need to decrement counter
  165. if tfr.DictionaryRowKeySize() > len(rowBuf) {
  166. rowBuf = make([]byte, tfr.DictionaryRowKeySize())
  167. }
  168. dictKeySize, err := tfr.DictionaryRowKeyTo(rowBuf)
  169. if err != nil {
  170. return err
  171. }
  172. dictionaryDeltas[string(rowBuf[:dictKeySize])] -= 1
  173. }
  174. deleteKeyBytes += row.KeySize()
  175. }
  176. deleteNum += len(deleteRows)
  177. }
  178. PutRowBuffer(rowBuf)
  179. mergeNum := len(dictionaryDeltas)
  180. mergeKeyBytes := 0
  181. mergeValBytes := mergeNum * DictionaryRowMaxValueSize
  182. for dictRowKey := range dictionaryDeltas {
  183. mergeKeyBytes += len(dictRowKey)
  184. }
  185. // prepare batch
  186. totBytes := addKeyBytes + addValBytes +
  187. updateKeyBytes + updateValBytes +
  188. deleteKeyBytes +
  189. 2*(mergeKeyBytes+mergeValBytes)
  190. buf, wb, err := writer.NewBatchEx(store.KVBatchOptions{
  191. TotalBytes: totBytes,
  192. NumSets: addNum + updateNum,
  193. NumDeletes: deleteNum,
  194. NumMerges: mergeNum,
  195. })
  196. if err != nil {
  197. return err
  198. }
  199. defer func() {
  200. _ = wb.Close()
  201. }()
  202. // fill the batch
  203. for _, addRows := range addRowsAll {
  204. for _, row := range addRows {
  205. keySize, err := row.KeyTo(buf)
  206. if err != nil {
  207. return err
  208. }
  209. valSize, err := row.ValueTo(buf[keySize:])
  210. if err != nil {
  211. return err
  212. }
  213. wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
  214. buf = buf[keySize+valSize:]
  215. }
  216. }
  217. for _, updateRows := range updateRowsAll {
  218. for _, row := range updateRows {
  219. keySize, err := row.KeyTo(buf)
  220. if err != nil {
  221. return err
  222. }
  223. valSize, err := row.ValueTo(buf[keySize:])
  224. if err != nil {
  225. return err
  226. }
  227. wb.Set(buf[:keySize], buf[keySize:keySize+valSize])
  228. buf = buf[keySize+valSize:]
  229. }
  230. }
  231. for _, deleteRows := range deleteRowsAll {
  232. for _, row := range deleteRows {
  233. keySize, err := row.KeyTo(buf)
  234. if err != nil {
  235. return err
  236. }
  237. wb.Delete(buf[:keySize])
  238. buf = buf[keySize:]
  239. }
  240. }
  241. for dictRowKey, delta := range dictionaryDeltas {
  242. dictRowKeyLen := copy(buf, dictRowKey)
  243. binary.LittleEndian.PutUint64(buf[dictRowKeyLen:], uint64(delta))
  244. wb.Merge(buf[:dictRowKeyLen], buf[dictRowKeyLen:dictRowKeyLen+DictionaryRowMaxValueSize])
  245. buf = buf[dictRowKeyLen+DictionaryRowMaxValueSize:]
  246. }
  247. // write out the batch
  248. return writer.ExecuteBatch(wb)
  249. }
  250. func (udc *UpsideDownCouch) Open() (err error) {
  251. //acquire the write mutex for the duratin of Open()
  252. udc.writeMutex.Lock()
  253. defer udc.writeMutex.Unlock()
  254. // open the kv store
  255. storeConstructor := registry.KVStoreConstructorByName(udc.storeName)
  256. if storeConstructor == nil {
  257. err = index.ErrorUnknownStorageType
  258. return
  259. }
  260. // now open the store
  261. udc.store, err = storeConstructor(&mergeOperator, udc.storeConfig)
  262. if err != nil {
  263. return
  264. }
  265. // start a reader to look at the index
  266. var kvreader store.KVReader
  267. kvreader, err = udc.store.Reader()
  268. if err != nil {
  269. return
  270. }
  271. var value []byte
  272. value, err = kvreader.Get(VersionKey)
  273. if err != nil {
  274. _ = kvreader.Close()
  275. return
  276. }
  277. if value != nil {
  278. err = udc.loadSchema(kvreader)
  279. if err != nil {
  280. _ = kvreader.Close()
  281. return
  282. }
  283. // set doc count
  284. udc.m.Lock()
  285. udc.docCount, err = udc.countDocs(kvreader)
  286. udc.m.Unlock()
  287. err = kvreader.Close()
  288. } else {
  289. // new index, close the reader and open writer to init
  290. err = kvreader.Close()
  291. if err != nil {
  292. return
  293. }
  294. var kvwriter store.KVWriter
  295. kvwriter, err = udc.store.Writer()
  296. if err != nil {
  297. return
  298. }
  299. defer func() {
  300. if cerr := kvwriter.Close(); err == nil && cerr != nil {
  301. err = cerr
  302. }
  303. }()
  304. // init the index
  305. err = udc.init(kvwriter)
  306. }
  307. return
  308. }
  309. func (udc *UpsideDownCouch) countDocs(kvreader store.KVReader) (count uint64, err error) {
  310. it := kvreader.PrefixIterator([]byte{'b'})
  311. defer func() {
  312. if cerr := it.Close(); err == nil && cerr != nil {
  313. err = cerr
  314. }
  315. }()
  316. _, _, valid := it.Current()
  317. for valid {
  318. count++
  319. it.Next()
  320. _, _, valid = it.Current()
  321. }
  322. return
  323. }
  324. func (udc *UpsideDownCouch) rowCount() (count uint64, err error) {
  325. // start an isolated reader for use during the rowcount
  326. kvreader, err := udc.store.Reader()
  327. if err != nil {
  328. return
  329. }
  330. defer func() {
  331. if cerr := kvreader.Close(); err == nil && cerr != nil {
  332. err = cerr
  333. }
  334. }()
  335. it := kvreader.RangeIterator(nil, nil)
  336. defer func() {
  337. if cerr := it.Close(); err == nil && cerr != nil {
  338. err = cerr
  339. }
  340. }()
  341. _, _, valid := it.Current()
  342. for valid {
  343. count++
  344. it.Next()
  345. _, _, valid = it.Current()
  346. }
  347. return
  348. }
  349. func (udc *UpsideDownCouch) Close() error {
  350. return udc.store.Close()
  351. }
  352. func (udc *UpsideDownCouch) Update(doc *document.Document) (err error) {
  353. // do analysis before acquiring write lock
  354. analysisStart := time.Now()
  355. numPlainTextBytes := doc.NumPlainTextBytes()
  356. resultChan := make(chan *index.AnalysisResult)
  357. aw := index.NewAnalysisWork(udc, doc, resultChan)
  358. // put the work on the queue
  359. udc.analysisQueue.Queue(aw)
  360. // wait for the result
  361. result := <-resultChan
  362. close(resultChan)
  363. atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
  364. udc.writeMutex.Lock()
  365. defer udc.writeMutex.Unlock()
  366. // open a reader for backindex lookup
  367. var kvreader store.KVReader
  368. kvreader, err = udc.store.Reader()
  369. if err != nil {
  370. return
  371. }
  372. // first we lookup the backindex row for the doc id if it exists
  373. // lookup the back index row
  374. var backIndexRow *BackIndexRow
  375. backIndexRow, err = backIndexRowForDoc(kvreader, index.IndexInternalID(doc.ID))
  376. if err != nil {
  377. _ = kvreader.Close()
  378. atomic.AddUint64(&udc.stats.errors, 1)
  379. return
  380. }
  381. err = kvreader.Close()
  382. if err != nil {
  383. return
  384. }
  385. // start a writer for this update
  386. indexStart := time.Now()
  387. var kvwriter store.KVWriter
  388. kvwriter, err = udc.store.Writer()
  389. if err != nil {
  390. return
  391. }
  392. defer func() {
  393. if cerr := kvwriter.Close(); err == nil && cerr != nil {
  394. err = cerr
  395. }
  396. }()
  397. // prepare a list of rows
  398. var addRowsAll [][]UpsideDownCouchRow
  399. var updateRowsAll [][]UpsideDownCouchRow
  400. var deleteRowsAll [][]UpsideDownCouchRow
  401. addRows, updateRows, deleteRows := udc.mergeOldAndNew(backIndexRow, result.Rows)
  402. if len(addRows) > 0 {
  403. addRowsAll = append(addRowsAll, addRows)
  404. }
  405. if len(updateRows) > 0 {
  406. updateRowsAll = append(updateRowsAll, updateRows)
  407. }
  408. if len(deleteRows) > 0 {
  409. deleteRowsAll = append(deleteRowsAll, deleteRows)
  410. }
  411. err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
  412. if err == nil && backIndexRow == nil {
  413. udc.m.Lock()
  414. udc.docCount++
  415. udc.m.Unlock()
  416. }
  417. atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
  418. if err == nil {
  419. atomic.AddUint64(&udc.stats.updates, 1)
  420. atomic.AddUint64(&udc.stats.numPlainTextBytesIndexed, numPlainTextBytes)
  421. } else {
  422. atomic.AddUint64(&udc.stats.errors, 1)
  423. }
  424. return
  425. }
  426. func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
  427. addRows = make([]UpsideDownCouchRow, 0, len(rows))
  428. if backIndexRow == nil {
  429. addRows = addRows[0:len(rows)]
  430. for i, row := range rows {
  431. addRows[i] = row
  432. }
  433. return addRows, nil, nil
  434. }
  435. updateRows = make([]UpsideDownCouchRow, 0, len(rows))
  436. deleteRows = make([]UpsideDownCouchRow, 0, len(rows))
  437. var existingTermKeys map[string]struct{}
  438. backIndexTermKeys := backIndexRow.AllTermKeys()
  439. if len(backIndexTermKeys) > 0 {
  440. existingTermKeys = make(map[string]struct{}, len(backIndexTermKeys))
  441. for _, key := range backIndexTermKeys {
  442. existingTermKeys[string(key)] = struct{}{}
  443. }
  444. }
  445. var existingStoredKeys map[string]struct{}
  446. backIndexStoredKeys := backIndexRow.AllStoredKeys()
  447. if len(backIndexStoredKeys) > 0 {
  448. existingStoredKeys = make(map[string]struct{}, len(backIndexStoredKeys))
  449. for _, key := range backIndexStoredKeys {
  450. existingStoredKeys[string(key)] = struct{}{}
  451. }
  452. }
  453. keyBuf := GetRowBuffer()
  454. for _, row := range rows {
  455. switch row := row.(type) {
  456. case *TermFrequencyRow:
  457. if existingTermKeys != nil {
  458. if row.KeySize() > len(keyBuf) {
  459. keyBuf = make([]byte, row.KeySize())
  460. }
  461. keySize, _ := row.KeyTo(keyBuf)
  462. if _, ok := existingTermKeys[string(keyBuf[:keySize])]; ok {
  463. updateRows = append(updateRows, row)
  464. delete(existingTermKeys, string(keyBuf[:keySize]))
  465. continue
  466. }
  467. }
  468. addRows = append(addRows, row)
  469. case *StoredRow:
  470. if existingStoredKeys != nil {
  471. if row.KeySize() > len(keyBuf) {
  472. keyBuf = make([]byte, row.KeySize())
  473. }
  474. keySize, _ := row.KeyTo(keyBuf)
  475. if _, ok := existingStoredKeys[string(keyBuf[:keySize])]; ok {
  476. updateRows = append(updateRows, row)
  477. delete(existingStoredKeys, string(keyBuf[:keySize]))
  478. continue
  479. }
  480. }
  481. addRows = append(addRows, row)
  482. default:
  483. updateRows = append(updateRows, row)
  484. }
  485. }
  486. PutRowBuffer(keyBuf)
  487. // any of the existing rows that weren't updated need to be deleted
  488. for existingTermKey := range existingTermKeys {
  489. termFreqRow, err := NewTermFrequencyRowK([]byte(existingTermKey))
  490. if err == nil {
  491. deleteRows = append(deleteRows, termFreqRow)
  492. }
  493. }
  494. // any of the existing stored fields that weren't updated need to be deleted
  495. for existingStoredKey := range existingStoredKeys {
  496. storedRow, err := NewStoredRowK([]byte(existingStoredKey))
  497. if err == nil {
  498. deleteRows = append(deleteRows, storedRow)
  499. }
  500. }
  501. return addRows, updateRows, deleteRows
  502. }
  503. func (udc *UpsideDownCouch) storeField(docID []byte, field document.Field, fieldIndex uint16, rows []index.IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]index.IndexRow, []*BackIndexStoreEntry) {
  504. fieldType := encodeFieldType(field)
  505. storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value())
  506. // record the back index entry
  507. backIndexStoredEntry := BackIndexStoreEntry{Field: proto.Uint32(uint32(fieldIndex)), ArrayPositions: field.ArrayPositions()}
  508. return append(rows, storedRow), append(backIndexStoredEntries, &backIndexStoredEntry)
  509. }
  510. func encodeFieldType(f document.Field) byte {
  511. fieldType := byte('x')
  512. switch f.(type) {
  513. case *document.TextField:
  514. fieldType = 't'
  515. case *document.NumericField:
  516. fieldType = 'n'
  517. case *document.DateTimeField:
  518. fieldType = 'd'
  519. case *document.BooleanField:
  520. fieldType = 'b'
  521. case *document.GeoPointField:
  522. fieldType = 'g'
  523. case *document.CompositeField:
  524. fieldType = 'c'
  525. }
  526. return fieldType
  527. }
  528. func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs analysis.TokenFrequencies, rows []index.IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]index.IndexRow, []*BackIndexTermsEntry) {
  529. fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))
  530. termFreqRows := make([]TermFrequencyRow, len(tokenFreqs))
  531. termFreqRowsUsed := 0
  532. terms := make([]string, 0, len(tokenFreqs))
  533. for k, tf := range tokenFreqs {
  534. termFreqRow := &termFreqRows[termFreqRowsUsed]
  535. termFreqRowsUsed++
  536. InitTermFrequencyRow(termFreqRow, tf.Term, fieldIndex, docID,
  537. uint64(frequencyFromTokenFreq(tf)), fieldNorm)
  538. if includeTermVectors {
  539. termFreqRow.vectors, rows = udc.termVectorsFromTokenFreq(fieldIndex, tf, rows)
  540. }
  541. // record the back index entry
  542. terms = append(terms, k)
  543. rows = append(rows, termFreqRow)
  544. }
  545. backIndexTermsEntry := BackIndexTermsEntry{Field: proto.Uint32(uint32(fieldIndex)), Terms: terms}
  546. backIndexTermsEntries = append(backIndexTermsEntries, &backIndexTermsEntry)
  547. return rows, backIndexTermsEntries
  548. }
  549. func (udc *UpsideDownCouch) Delete(id string) (err error) {
  550. indexStart := time.Now()
  551. udc.writeMutex.Lock()
  552. defer udc.writeMutex.Unlock()
  553. // open a reader for backindex lookup
  554. var kvreader store.KVReader
  555. kvreader, err = udc.store.Reader()
  556. if err != nil {
  557. return
  558. }
  559. // first we lookup the backindex row for the doc id if it exists
  560. // lookup the back index row
  561. var backIndexRow *BackIndexRow
  562. backIndexRow, err = backIndexRowForDoc(kvreader, index.IndexInternalID(id))
  563. if err != nil {
  564. _ = kvreader.Close()
  565. atomic.AddUint64(&udc.stats.errors, 1)
  566. return
  567. }
  568. err = kvreader.Close()
  569. if err != nil {
  570. return
  571. }
  572. if backIndexRow == nil {
  573. atomic.AddUint64(&udc.stats.deletes, 1)
  574. return
  575. }
  576. // start a writer for this delete
  577. var kvwriter store.KVWriter
  578. kvwriter, err = udc.store.Writer()
  579. if err != nil {
  580. return
  581. }
  582. defer func() {
  583. if cerr := kvwriter.Close(); err == nil && cerr != nil {
  584. err = cerr
  585. }
  586. }()
  587. var deleteRowsAll [][]UpsideDownCouchRow
  588. deleteRows := udc.deleteSingle(id, backIndexRow, nil)
  589. if len(deleteRows) > 0 {
  590. deleteRowsAll = append(deleteRowsAll, deleteRows)
  591. }
  592. err = udc.batchRows(kvwriter, nil, nil, deleteRowsAll)
  593. if err == nil {
  594. udc.m.Lock()
  595. udc.docCount--
  596. udc.m.Unlock()
  597. }
  598. atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
  599. if err == nil {
  600. atomic.AddUint64(&udc.stats.deletes, 1)
  601. } else {
  602. atomic.AddUint64(&udc.stats.errors, 1)
  603. }
  604. return
  605. }
  606. func (udc *UpsideDownCouch) deleteSingle(id string, backIndexRow *BackIndexRow, deleteRows []UpsideDownCouchRow) []UpsideDownCouchRow {
  607. idBytes := []byte(id)
  608. for _, backIndexEntry := range backIndexRow.termsEntries {
  609. for i := range backIndexEntry.Terms {
  610. tfr := NewTermFrequencyRow([]byte(backIndexEntry.Terms[i]), uint16(*backIndexEntry.Field), idBytes, 0, 0)
  611. deleteRows = append(deleteRows, tfr)
  612. }
  613. }
  614. for _, se := range backIndexRow.storedEntries {
  615. sf := NewStoredRow(idBytes, uint16(*se.Field), se.ArrayPositions, 'x', nil)
  616. deleteRows = append(deleteRows, sf)
  617. }
  618. // also delete the back entry itself
  619. deleteRows = append(deleteRows, backIndexRow)
  620. return deleteRows
  621. }
  622. func decodeFieldType(typ byte, name string, pos []uint64, value []byte) document.Field {
  623. switch typ {
  624. case 't':
  625. return document.NewTextField(name, pos, value)
  626. case 'n':
  627. return document.NewNumericFieldFromBytes(name, pos, value)
  628. case 'd':
  629. return document.NewDateTimeFieldFromBytes(name, pos, value)
  630. case 'b':
  631. return document.NewBooleanFieldFromBytes(name, pos, value)
  632. case 'g':
  633. return document.NewGeoPointFieldFromBytes(name, pos, value)
  634. }
  635. return nil
  636. }
  637. func frequencyFromTokenFreq(tf *analysis.TokenFreq) int {
  638. return tf.Frequency()
  639. }
  640. func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *analysis.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) {
  641. a := make([]TermVector, len(tf.Locations))
  642. rv := make([]*TermVector, len(tf.Locations))
  643. for i, l := range tf.Locations {
  644. var newFieldRow *FieldRow
  645. fieldIndex := field
  646. if l.Field != "" {
  647. // lookup correct field
  648. fieldIndex, newFieldRow = udc.fieldIndexOrNewRow(l.Field)
  649. if newFieldRow != nil {
  650. rows = append(rows, newFieldRow)
  651. }
  652. }
  653. a[i] = TermVector{
  654. field: fieldIndex,
  655. arrayPositions: l.ArrayPositions,
  656. pos: uint64(l.Position),
  657. start: uint64(l.Start),
  658. end: uint64(l.End),
  659. }
  660. rv[i] = &a[i]
  661. }
  662. return rv, rows
  663. }
  664. func (udc *UpsideDownCouch) termFieldVectorsFromTermVectors(in []*TermVector) []*index.TermFieldVector {
  665. if len(in) <= 0 {
  666. return nil
  667. }
  668. a := make([]index.TermFieldVector, len(in))
  669. rv := make([]*index.TermFieldVector, len(in))
  670. for i, tv := range in {
  671. fieldName := udc.fieldCache.FieldIndexed(tv.field)
  672. a[i] = index.TermFieldVector{
  673. Field: fieldName,
  674. ArrayPositions: tv.arrayPositions,
  675. Pos: tv.pos,
  676. Start: tv.start,
  677. End: tv.end,
  678. }
  679. rv[i] = &a[i]
  680. }
  681. return rv
  682. }
  683. func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
  684. analysisStart := time.Now()
  685. resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
  686. var numUpdates uint64
  687. var numPlainTextBytes uint64
  688. for _, doc := range batch.IndexOps {
  689. if doc != nil {
  690. numUpdates++
  691. numPlainTextBytes += doc.NumPlainTextBytes()
  692. }
  693. }
  694. go func() {
  695. for _, doc := range batch.IndexOps {
  696. if doc != nil {
  697. aw := index.NewAnalysisWork(udc, doc, resultChan)
  698. // put the work on the queue
  699. udc.analysisQueue.Queue(aw)
  700. }
  701. }
  702. }()
  703. // retrieve back index rows concurrent with analysis
  704. docBackIndexRowErr := error(nil)
  705. docBackIndexRowCh := make(chan *docBackIndexRow, len(batch.IndexOps))
  706. udc.writeMutex.Lock()
  707. defer udc.writeMutex.Unlock()
  708. go func() {
  709. defer close(docBackIndexRowCh)
  710. // open a reader for backindex lookup
  711. var kvreader store.KVReader
  712. kvreader, err = udc.store.Reader()
  713. if err != nil {
  714. docBackIndexRowErr = err
  715. return
  716. }
  717. for docID, doc := range batch.IndexOps {
  718. backIndexRow, err := backIndexRowForDoc(kvreader, index.IndexInternalID(docID))
  719. if err != nil {
  720. docBackIndexRowErr = err
  721. return
  722. }
  723. docBackIndexRowCh <- &docBackIndexRow{docID, doc, backIndexRow}
  724. }
  725. err = kvreader.Close()
  726. if err != nil {
  727. docBackIndexRowErr = err
  728. return
  729. }
  730. }()
  731. // wait for analysis result
  732. newRowsMap := make(map[string][]index.IndexRow)
  733. var itemsDeQueued uint64
  734. for itemsDeQueued < numUpdates {
  735. result := <-resultChan
  736. newRowsMap[result.DocID] = result.Rows
  737. itemsDeQueued++
  738. }
  739. close(resultChan)
  740. atomic.AddUint64(&udc.stats.analysisTime, uint64(time.Since(analysisStart)))
  741. docsAdded := uint64(0)
  742. docsDeleted := uint64(0)
  743. indexStart := time.Now()
  744. // prepare a list of rows
  745. var addRowsAll [][]UpsideDownCouchRow
  746. var updateRowsAll [][]UpsideDownCouchRow
  747. var deleteRowsAll [][]UpsideDownCouchRow
  748. // add the internal ops
  749. var updateRows []UpsideDownCouchRow
  750. var deleteRows []UpsideDownCouchRow
  751. for internalKey, internalValue := range batch.InternalOps {
  752. if internalValue == nil {
  753. // delete
  754. deleteInternalRow := NewInternalRow([]byte(internalKey), nil)
  755. deleteRows = append(deleteRows, deleteInternalRow)
  756. } else {
  757. updateInternalRow := NewInternalRow([]byte(internalKey), internalValue)
  758. updateRows = append(updateRows, updateInternalRow)
  759. }
  760. }
  761. if len(updateRows) > 0 {
  762. updateRowsAll = append(updateRowsAll, updateRows)
  763. }
  764. if len(deleteRows) > 0 {
  765. deleteRowsAll = append(deleteRowsAll, deleteRows)
  766. }
  767. // process back index rows as they arrive
  768. for dbir := range docBackIndexRowCh {
  769. if dbir.doc == nil && dbir.backIndexRow != nil {
  770. // delete
  771. deleteRows := udc.deleteSingle(dbir.docID, dbir.backIndexRow, nil)
  772. if len(deleteRows) > 0 {
  773. deleteRowsAll = append(deleteRowsAll, deleteRows)
  774. }
  775. docsDeleted++
  776. } else if dbir.doc != nil {
  777. addRows, updateRows, deleteRows := udc.mergeOldAndNew(dbir.backIndexRow, newRowsMap[dbir.docID])
  778. if len(addRows) > 0 {
  779. addRowsAll = append(addRowsAll, addRows)
  780. }
  781. if len(updateRows) > 0 {
  782. updateRowsAll = append(updateRowsAll, updateRows)
  783. }
  784. if len(deleteRows) > 0 {
  785. deleteRowsAll = append(deleteRowsAll, deleteRows)
  786. }
  787. if dbir.backIndexRow == nil {
  788. docsAdded++
  789. }
  790. }
  791. }
  792. if docBackIndexRowErr != nil {
  793. return docBackIndexRowErr
  794. }
  795. // start a writer for this batch
  796. var kvwriter store.KVWriter
  797. kvwriter, err = udc.store.Writer()
  798. if err != nil {
  799. return
  800. }
  801. err = udc.batchRows(kvwriter, addRowsAll, updateRowsAll, deleteRowsAll)
  802. if err != nil {
  803. _ = kvwriter.Close()
  804. atomic.AddUint64(&udc.stats.errors, 1)
  805. return
  806. }
  807. err = kvwriter.Close()
  808. atomic.AddUint64(&udc.stats.indexTime, uint64(time.Since(indexStart)))
  809. if err == nil {
  810. udc.m.Lock()
  811. udc.docCount += docsAdded
  812. udc.docCount -= docsDeleted
  813. udc.m.Unlock()
  814. atomic.AddUint64(&udc.stats.updates, numUpdates)
  815. atomic.AddUint64(&udc.stats.deletes, docsDeleted)
  816. atomic.AddUint64(&udc.stats.batches, 1)
  817. atomic.AddUint64(&udc.stats.numPlainTextBytesIndexed, numPlainTextBytes)
  818. } else {
  819. atomic.AddUint64(&udc.stats.errors, 1)
  820. }
  821. return
  822. }
  823. func (udc *UpsideDownCouch) SetInternal(key, val []byte) (err error) {
  824. internalRow := NewInternalRow(key, val)
  825. udc.writeMutex.Lock()
  826. defer udc.writeMutex.Unlock()
  827. var writer store.KVWriter
  828. writer, err = udc.store.Writer()
  829. if err != nil {
  830. return
  831. }
  832. defer func() {
  833. if cerr := writer.Close(); err == nil && cerr != nil {
  834. err = cerr
  835. }
  836. }()
  837. batch := writer.NewBatch()
  838. batch.Set(internalRow.Key(), internalRow.Value())
  839. return writer.ExecuteBatch(batch)
  840. }
  841. func (udc *UpsideDownCouch) DeleteInternal(key []byte) (err error) {
  842. internalRow := NewInternalRow(key, nil)
  843. udc.writeMutex.Lock()
  844. defer udc.writeMutex.Unlock()
  845. var writer store.KVWriter
  846. writer, err = udc.store.Writer()
  847. if err != nil {
  848. return
  849. }
  850. defer func() {
  851. if cerr := writer.Close(); err == nil && cerr != nil {
  852. err = cerr
  853. }
  854. }()
  855. batch := writer.NewBatch()
  856. batch.Delete(internalRow.Key())
  857. return writer.ExecuteBatch(batch)
  858. }
  859. func (udc *UpsideDownCouch) Reader() (index.IndexReader, error) {
  860. kvr, err := udc.store.Reader()
  861. if err != nil {
  862. return nil, fmt.Errorf("error opening store reader: %v", err)
  863. }
  864. udc.m.RLock()
  865. defer udc.m.RUnlock()
  866. return &IndexReader{
  867. index: udc,
  868. kvreader: kvr,
  869. docCount: udc.docCount,
  870. }, nil
  871. }
  872. func (udc *UpsideDownCouch) Stats() json.Marshaler {
  873. return udc.stats
  874. }
  875. func (udc *UpsideDownCouch) StatsMap() map[string]interface{} {
  876. return udc.stats.statsMap()
  877. }
  878. func (udc *UpsideDownCouch) Advanced() (store.KVStore, error) {
  879. return udc.store, nil
  880. }
  881. func (udc *UpsideDownCouch) fieldIndexOrNewRow(name string) (uint16, *FieldRow) {
  882. index, existed := udc.fieldCache.FieldNamed(name, true)
  883. if !existed {
  884. return index, NewFieldRow(index, name)
  885. }
  886. return index, nil
  887. }
  888. func init() {
  889. registry.RegisterIndexType(Name, NewUpsideDownCouch)
  890. }
  891. func backIndexRowForDoc(kvreader store.KVReader, docID index.IndexInternalID) (*BackIndexRow, error) {
  892. // use a temporary row structure to build key
  893. tempRow := BackIndexRow{
  894. doc: docID,
  895. }
  896. keyBuf := GetRowBuffer()
  897. if tempRow.KeySize() > len(keyBuf) {
  898. keyBuf = make([]byte, 2*tempRow.KeySize())
  899. }
  900. defer PutRowBuffer(keyBuf)
  901. keySize, err := tempRow.KeyTo(keyBuf)
  902. if err != nil {
  903. return nil, err
  904. }
  905. value, err := kvreader.Get(keyBuf[:keySize])
  906. if err != nil {
  907. return nil, err
  908. }
  909. if value == nil {
  910. return nil, nil
  911. }
  912. backIndexRow, err := NewBackIndexRowKV(keyBuf[:keySize], value)
  913. if err != nil {
  914. return nil, err
  915. }
  916. return backIndexRow, nil
  917. }