You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

reindex.go 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "net/http"
  10. "net/url"
  11. "strings"
  12. )
  13. // ReindexService is a method to copy documents from one index to another.
  14. // It is documented at https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-reindex.html.
  15. type ReindexService struct {
  16. client *Client
  17. pretty *bool // pretty format the returned JSON response
  18. human *bool // return human readable values for statistics
  19. errorTrace *bool // include the stack trace of returned errors
  20. filterPath []string // list of filters used to reduce the response
  21. headers http.Header // custom request-level HTTP headers
  22. refresh string
  23. timeout string
  24. waitForActiveShards string
  25. waitForCompletion *bool
  26. requestsPerSecond *int
  27. slices interface{}
  28. body interface{}
  29. source *ReindexSource
  30. destination *ReindexDestination
  31. conflicts string
  32. size *int
  33. script *Script
  34. }
  35. // NewReindexService creates a new ReindexService.
  36. func NewReindexService(client *Client) *ReindexService {
  37. return &ReindexService{
  38. client: client,
  39. }
  40. }
  41. // Pretty tells Elasticsearch whether to return a formatted JSON response.
  42. func (s *ReindexService) Pretty(pretty bool) *ReindexService {
  43. s.pretty = &pretty
  44. return s
  45. }
  46. // Human specifies whether human readable values should be returned in
  47. // the JSON response, e.g. "7.5mb".
  48. func (s *ReindexService) Human(human bool) *ReindexService {
  49. s.human = &human
  50. return s
  51. }
  52. // ErrorTrace specifies whether to include the stack trace of returned errors.
  53. func (s *ReindexService) ErrorTrace(errorTrace bool) *ReindexService {
  54. s.errorTrace = &errorTrace
  55. return s
  56. }
  57. // FilterPath specifies a list of filters used to reduce the response.
  58. func (s *ReindexService) FilterPath(filterPath ...string) *ReindexService {
  59. s.filterPath = filterPath
  60. return s
  61. }
  62. // Header adds a header to the request.
  63. func (s *ReindexService) Header(name string, value string) *ReindexService {
  64. if s.headers == nil {
  65. s.headers = http.Header{}
  66. }
  67. s.headers.Add(name, value)
  68. return s
  69. }
  70. // Headers specifies the headers of the request.
  71. func (s *ReindexService) Headers(headers http.Header) *ReindexService {
  72. s.headers = headers
  73. return s
  74. }
  75. // WaitForActiveShards sets the number of shard copies that must be active before
  76. // proceeding with the reindex operation. Defaults to 1, meaning the primary shard only.
  77. // Set to `all` for all shard copies, otherwise set to any non-negative value less than or
  78. // equal to the total number of copies for the shard (number of replicas + 1).
  79. func (s *ReindexService) WaitForActiveShards(waitForActiveShards string) *ReindexService {
  80. s.waitForActiveShards = waitForActiveShards
  81. return s
  82. }
  83. // RequestsPerSecond specifies the throttle to set on this request in sub-requests per second.
  84. // -1 means set no throttle as does "unlimited" which is the only non-float this accepts.
  85. func (s *ReindexService) RequestsPerSecond(requestsPerSecond int) *ReindexService {
  86. s.requestsPerSecond = &requestsPerSecond
  87. return s
  88. }
  89. // Slices specifies the number of slices this task should be divided into. Defaults to 1.
  90. // It used to be a number, but can be set to "auto" as of 6.7.
  91. //
  92. // See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-reindex.html#docs-reindex-slice
  93. // for details.
  94. func (s *ReindexService) Slices(slices interface{}) *ReindexService {
  95. s.slices = slices
  96. return s
  97. }
  98. // Refresh indicates whether Elasticsearch should refresh the effected indexes
  99. // immediately.
  100. //
  101. // See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-refresh.html
  102. // for details.
  103. func (s *ReindexService) Refresh(refresh string) *ReindexService {
  104. s.refresh = refresh
  105. return s
  106. }
  107. // Timeout is the time each individual bulk request should wait for shards
  108. // that are unavailable.
  109. func (s *ReindexService) Timeout(timeout string) *ReindexService {
  110. s.timeout = timeout
  111. return s
  112. }
  113. // WaitForCompletion indicates whether Elasticsearch should block until the
  114. // reindex is complete.
  115. func (s *ReindexService) WaitForCompletion(waitForCompletion bool) *ReindexService {
  116. s.waitForCompletion = &waitForCompletion
  117. return s
  118. }
  119. // Source specifies the source of the reindexing process.
  120. func (s *ReindexService) Source(source *ReindexSource) *ReindexService {
  121. s.source = source
  122. return s
  123. }
  124. // SourceIndex specifies the source index of the reindexing process.
  125. func (s *ReindexService) SourceIndex(index string) *ReindexService {
  126. if s.source == nil {
  127. s.source = NewReindexSource()
  128. }
  129. s.source = s.source.Index(index)
  130. return s
  131. }
  132. // Destination specifies the destination of the reindexing process.
  133. func (s *ReindexService) Destination(destination *ReindexDestination) *ReindexService {
  134. s.destination = destination
  135. return s
  136. }
  137. // DestinationIndex specifies the destination index of the reindexing process.
  138. func (s *ReindexService) DestinationIndex(index string) *ReindexService {
  139. if s.destination == nil {
  140. s.destination = NewReindexDestination()
  141. }
  142. s.destination = s.destination.Index(index)
  143. return s
  144. }
  145. // DestinationIndexAndType specifies both the destination index and type
  146. // of the reindexing process.
  147. func (s *ReindexService) DestinationIndexAndType(index, typ string) *ReindexService {
  148. if s.destination == nil {
  149. s.destination = NewReindexDestination()
  150. }
  151. s.destination = s.destination.Index(index)
  152. s.destination = s.destination.Type(typ)
  153. return s
  154. }
  155. // Conflicts indicates what to do when the process detects version conflicts.
  156. // Possible values are "proceed" and "abort".
  157. func (s *ReindexService) Conflicts(conflicts string) *ReindexService {
  158. s.conflicts = conflicts
  159. return s
  160. }
  161. // AbortOnVersionConflict aborts the request on version conflicts.
  162. // It is an alias to setting Conflicts("abort").
  163. func (s *ReindexService) AbortOnVersionConflict() *ReindexService {
  164. s.conflicts = "abort"
  165. return s
  166. }
  167. // ProceedOnVersionConflict aborts the request on version conflicts.
  168. // It is an alias to setting Conflicts("proceed").
  169. func (s *ReindexService) ProceedOnVersionConflict() *ReindexService {
  170. s.conflicts = "proceed"
  171. return s
  172. }
  173. // Size sets an upper limit for the number of processed documents.
  174. func (s *ReindexService) Size(size int) *ReindexService {
  175. s.size = &size
  176. return s
  177. }
  178. // Script allows for modification of the documents as they are reindexed
  179. // from source to destination.
  180. func (s *ReindexService) Script(script *Script) *ReindexService {
  181. s.script = script
  182. return s
  183. }
  184. // Body specifies the body of the request to send to Elasticsearch.
  185. // It overrides settings specified with other setters, e.g. Query.
  186. func (s *ReindexService) Body(body interface{}) *ReindexService {
  187. s.body = body
  188. return s
  189. }
  190. // buildURL builds the URL for the operation.
  191. func (s *ReindexService) buildURL() (string, url.Values, error) {
  192. // Build URL path
  193. path := "/_reindex"
  194. // Add query string parameters
  195. params := url.Values{}
  196. if v := s.pretty; v != nil {
  197. params.Set("pretty", fmt.Sprint(*v))
  198. }
  199. if v := s.human; v != nil {
  200. params.Set("human", fmt.Sprint(*v))
  201. }
  202. if v := s.errorTrace; v != nil {
  203. params.Set("error_trace", fmt.Sprint(*v))
  204. }
  205. if len(s.filterPath) > 0 {
  206. params.Set("filter_path", strings.Join(s.filterPath, ","))
  207. }
  208. if s.refresh != "" {
  209. params.Set("refresh", s.refresh)
  210. }
  211. if s.timeout != "" {
  212. params.Set("timeout", s.timeout)
  213. }
  214. if s.requestsPerSecond != nil {
  215. params.Set("requests_per_second", fmt.Sprintf("%v", *s.requestsPerSecond))
  216. }
  217. if s.slices != nil {
  218. params.Set("slices", fmt.Sprintf("%v", s.slices))
  219. }
  220. if s.waitForActiveShards != "" {
  221. params.Set("wait_for_active_shards", s.waitForActiveShards)
  222. }
  223. if s.waitForCompletion != nil {
  224. params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
  225. }
  226. return path, params, nil
  227. }
  228. // Validate checks if the operation is valid.
  229. func (s *ReindexService) Validate() error {
  230. var invalid []string
  231. if s.body != nil {
  232. return nil
  233. }
  234. if s.source == nil {
  235. invalid = append(invalid, "Source")
  236. } else {
  237. if len(s.source.request.indices) == 0 {
  238. invalid = append(invalid, "Source.Index")
  239. }
  240. }
  241. if s.destination == nil {
  242. invalid = append(invalid, "Destination")
  243. }
  244. if len(invalid) > 0 {
  245. return fmt.Errorf("missing required fields: %v", invalid)
  246. }
  247. return nil
  248. }
  249. // getBody returns the body part of the document request.
  250. func (s *ReindexService) getBody() (interface{}, error) {
  251. if s.body != nil {
  252. return s.body, nil
  253. }
  254. body := make(map[string]interface{})
  255. if s.conflicts != "" {
  256. body["conflicts"] = s.conflicts
  257. }
  258. if s.size != nil {
  259. body["size"] = *s.size
  260. }
  261. if s.script != nil {
  262. out, err := s.script.Source()
  263. if err != nil {
  264. return nil, err
  265. }
  266. body["script"] = out
  267. }
  268. src, err := s.source.Source()
  269. if err != nil {
  270. return nil, err
  271. }
  272. body["source"] = src
  273. dst, err := s.destination.Source()
  274. if err != nil {
  275. return nil, err
  276. }
  277. body["dest"] = dst
  278. return body, nil
  279. }
  280. // Do executes the operation.
  281. func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error) {
  282. // Check pre-conditions
  283. if err := s.Validate(); err != nil {
  284. return nil, err
  285. }
  286. // Get URL for request
  287. path, params, err := s.buildURL()
  288. if err != nil {
  289. return nil, err
  290. }
  291. // Setup HTTP request body
  292. body, err := s.getBody()
  293. if err != nil {
  294. return nil, err
  295. }
  296. // Get HTTP response
  297. res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
  298. Method: "POST",
  299. Path: path,
  300. Params: params,
  301. Body: body,
  302. Headers: s.headers,
  303. })
  304. if err != nil {
  305. return nil, err
  306. }
  307. // Return operation response
  308. ret := new(BulkIndexByScrollResponse)
  309. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  310. return nil, err
  311. }
  312. ret.Header = res.Header
  313. return ret, nil
  314. }
  315. // DoAsync executes the reindexing operation asynchronously by starting a new task.
  316. // Callers need to use the Task Management API to watch the outcome of the reindexing
  317. // operation.
  318. func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
  319. // Check pre-conditions
  320. if err := s.Validate(); err != nil {
  321. return nil, err
  322. }
  323. // DoAsync only makes sense with WaitForCompletion set to false
  324. if s.waitForCompletion != nil && *s.waitForCompletion {
  325. return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
  326. }
  327. f := false
  328. s.waitForCompletion = &f
  329. // Get URL for request
  330. path, params, err := s.buildURL()
  331. if err != nil {
  332. return nil, err
  333. }
  334. // Setup HTTP request body
  335. body, err := s.getBody()
  336. if err != nil {
  337. return nil, err
  338. }
  339. // Get HTTP response
  340. res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
  341. Method: "POST",
  342. Path: path,
  343. Params: params,
  344. Body: body,
  345. Headers: s.headers,
  346. })
  347. if err != nil {
  348. return nil, err
  349. }
  350. // Return operation response
  351. ret := new(StartTaskResult)
  352. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  353. return nil, err
  354. }
  355. ret.Header = res.Header
  356. return ret, nil
  357. }
  358. // -- Source of Reindex --
  359. // ReindexSource specifies the source of a Reindex process.
  360. type ReindexSource struct {
  361. request *SearchRequest
  362. remoteInfo *ReindexRemoteInfo
  363. }
  364. // NewReindexSource creates a new ReindexSource.
  365. func NewReindexSource() *ReindexSource {
  366. return &ReindexSource{
  367. request: NewSearchRequest(),
  368. }
  369. }
  370. // Request specifies the search request used for source.
  371. func (r *ReindexSource) Request(request *SearchRequest) *ReindexSource {
  372. if request == nil {
  373. r.request = NewSearchRequest()
  374. } else {
  375. r.request = request
  376. }
  377. return r
  378. }
  379. // SearchType is the search operation type. Possible values are
  380. // "query_then_fetch" and "dfs_query_then_fetch".
  381. func (r *ReindexSource) SearchType(searchType string) *ReindexSource {
  382. r.request = r.request.SearchType(searchType)
  383. return r
  384. }
  385. func (r *ReindexSource) SearchTypeDfsQueryThenFetch() *ReindexSource {
  386. r.request = r.request.SearchType("dfs_query_then_fetch")
  387. return r
  388. }
  389. func (r *ReindexSource) SearchTypeQueryThenFetch() *ReindexSource {
  390. r.request = r.request.SearchType("query_then_fetch")
  391. return r
  392. }
  393. func (r *ReindexSource) Index(indices ...string) *ReindexSource {
  394. r.request = r.request.Index(indices...)
  395. return r
  396. }
  397. func (r *ReindexSource) Type(types ...string) *ReindexSource {
  398. r.request = r.request.Type(types...)
  399. return r
  400. }
  401. func (r *ReindexSource) Preference(preference string) *ReindexSource {
  402. r.request = r.request.Preference(preference)
  403. return r
  404. }
  405. func (r *ReindexSource) RequestCache(requestCache bool) *ReindexSource {
  406. r.request = r.request.RequestCache(requestCache)
  407. return r
  408. }
  409. func (r *ReindexSource) Scroll(scroll string) *ReindexSource {
  410. r.request = r.request.Scroll(scroll)
  411. return r
  412. }
  413. func (r *ReindexSource) Query(query Query) *ReindexSource {
  414. r.request = r.request.Query(query)
  415. return r
  416. }
  417. // Sort adds a sort order.
  418. func (r *ReindexSource) Sort(field string, ascending bool) *ReindexSource {
  419. r.request = r.request.Sort(field, ascending)
  420. return r
  421. }
  422. // SortWithInfo adds a sort order.
  423. func (r *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
  424. r.request = r.request.SortWithInfo(info)
  425. return r
  426. }
  427. // SortBy adds a sort order.
  428. func (r *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
  429. r.request = r.request.SortBy(sorter...)
  430. return r
  431. }
  432. // FetchSource indicates whether the response should contain the stored
  433. // _source for every hit.
  434. func (r *ReindexSource) FetchSource(fetchSource bool) *ReindexSource {
  435. r.request = r.request.FetchSource(fetchSource)
  436. return r
  437. }
  438. // FetchSourceIncludeExclude specifies that _source should be returned
  439. // with each hit, where "include" and "exclude" serve as a simple wildcard
  440. // matcher that gets applied to its fields
  441. // (e.g. include := []string{"obj1.*","obj2.*"}, exclude := []string{"description.*"}).
  442. func (r *ReindexSource) FetchSourceIncludeExclude(include, exclude []string) *ReindexSource {
  443. r.request = r.request.FetchSourceIncludeExclude(include, exclude)
  444. return r
  445. }
  446. // FetchSourceContext indicates how the _source should be fetched.
  447. func (r *ReindexSource) FetchSourceContext(fsc *FetchSourceContext) *ReindexSource {
  448. r.request = r.request.FetchSourceContext(fsc)
  449. return r
  450. }
  451. // RemoteInfo sets up reindexing from a remote cluster.
  452. func (r *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource {
  453. r.remoteInfo = ri
  454. return r
  455. }
  456. // Source returns a serializable JSON request for the request.
  457. func (r *ReindexSource) Source() (interface{}, error) {
  458. src, err := r.request.sourceAsMap()
  459. if err != nil {
  460. return nil, err
  461. }
  462. source, ok := src.(map[string]interface{})
  463. if !ok {
  464. return nil, errors.New("unable to use SearchRequest as map[string]interface{}")
  465. }
  466. switch len(r.request.indices) {
  467. case 1:
  468. source["index"] = r.request.indices[0]
  469. default:
  470. source["index"] = r.request.indices
  471. }
  472. switch len(r.request.types) {
  473. case 0:
  474. case 1:
  475. source["type"] = r.request.types[0]
  476. default:
  477. source["type"] = r.request.types
  478. }
  479. if r.remoteInfo != nil {
  480. src, err := r.remoteInfo.Source()
  481. if err != nil {
  482. return nil, err
  483. }
  484. source["remote"] = src
  485. }
  486. return source, nil
  487. }
  488. // ReindexRemoteInfo contains information for reindexing from a remote cluster.
  489. type ReindexRemoteInfo struct {
  490. host string
  491. username string
  492. password string
  493. socketTimeout string // e.g. "1m" or "30s"
  494. connectTimeout string // e.g. "1m" or "30s"
  495. }
  496. // NewReindexRemoteInfo creates a new ReindexRemoteInfo.
  497. func NewReindexRemoteInfo() *ReindexRemoteInfo {
  498. return &ReindexRemoteInfo{}
  499. }
  500. // Host sets the host information of the remote cluster.
  501. // It must be of the form "http(s)://<hostname>:<port>"
  502. func (ri *ReindexRemoteInfo) Host(host string) *ReindexRemoteInfo {
  503. ri.host = host
  504. return ri
  505. }
  506. // Username sets the username to authenticate with the remote cluster.
  507. func (ri *ReindexRemoteInfo) Username(username string) *ReindexRemoteInfo {
  508. ri.username = username
  509. return ri
  510. }
  511. // Password sets the password to authenticate with the remote cluster.
  512. func (ri *ReindexRemoteInfo) Password(password string) *ReindexRemoteInfo {
  513. ri.password = password
  514. return ri
  515. }
  516. // SocketTimeout sets the socket timeout to connect with the remote cluster.
  517. // Use ES compatible values like e.g. "30s" or "1m".
  518. func (ri *ReindexRemoteInfo) SocketTimeout(timeout string) *ReindexRemoteInfo {
  519. ri.socketTimeout = timeout
  520. return ri
  521. }
  522. // ConnectTimeout sets the connection timeout to connect with the remote cluster.
  523. // Use ES compatible values like e.g. "30s" or "1m".
  524. func (ri *ReindexRemoteInfo) ConnectTimeout(timeout string) *ReindexRemoteInfo {
  525. ri.connectTimeout = timeout
  526. return ri
  527. }
  528. // Source returns the serializable JSON data for the request.
  529. func (ri *ReindexRemoteInfo) Source() (interface{}, error) {
  530. res := make(map[string]interface{})
  531. res["host"] = ri.host
  532. if len(ri.username) > 0 {
  533. res["username"] = ri.username
  534. }
  535. if len(ri.password) > 0 {
  536. res["password"] = ri.password
  537. }
  538. if len(ri.socketTimeout) > 0 {
  539. res["socket_timeout"] = ri.socketTimeout
  540. }
  541. if len(ri.connectTimeout) > 0 {
  542. res["connect_timeout"] = ri.connectTimeout
  543. }
  544. return res, nil
  545. }
  546. // -- Destination of Reindex --
  547. // ReindexDestination is the destination of a Reindex API call.
  548. // It is basically the meta data of a BulkIndexRequest.
  549. //
  550. // See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-reindex.html
  551. // for details.
  552. type ReindexDestination struct {
  553. index string
  554. typ string
  555. routing string
  556. parent string
  557. opType string
  558. version int64 // default is MATCH_ANY
  559. versionType string // default is "internal"
  560. pipeline string
  561. }
  562. // NewReindexDestination returns a new ReindexDestination.
  563. func NewReindexDestination() *ReindexDestination {
  564. return &ReindexDestination{}
  565. }
  566. // Index specifies name of the Elasticsearch index to use as the destination
  567. // of a reindexing process.
  568. func (r *ReindexDestination) Index(index string) *ReindexDestination {
  569. r.index = index
  570. return r
  571. }
  572. // Type specifies the Elasticsearch type to use for reindexing.
  573. func (r *ReindexDestination) Type(typ string) *ReindexDestination {
  574. r.typ = typ
  575. return r
  576. }
  577. // Routing specifies a routing value for the reindexing request.
  578. // It can be "keep", "discard", or start with "=". The latter specifies
  579. // the routing on the bulk request.
  580. func (r *ReindexDestination) Routing(routing string) *ReindexDestination {
  581. r.routing = routing
  582. return r
  583. }
  584. // Keep sets the routing on the bulk request sent for each match to the routing
  585. // of the match (the default).
  586. func (r *ReindexDestination) Keep() *ReindexDestination {
  587. r.routing = "keep"
  588. return r
  589. }
  590. // Discard sets the routing on the bulk request sent for each match to null.
  591. func (r *ReindexDestination) Discard() *ReindexDestination {
  592. r.routing = "discard"
  593. return r
  594. }
  595. // Parent specifies the identifier of the parent document (if available).
  596. func (r *ReindexDestination) Parent(parent string) *ReindexDestination {
  597. r.parent = parent
  598. return r
  599. }
  600. // OpType specifies if this request should follow create-only or upsert
  601. // behavior. This follows the OpType of the standard document index API.
  602. // See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-index_.html#operation-type
  603. // for details.
  604. func (r *ReindexDestination) OpType(opType string) *ReindexDestination {
  605. r.opType = opType
  606. return r
  607. }
  608. // Version indicates the version of the document as part of an optimistic
  609. // concurrency model.
  610. func (r *ReindexDestination) Version(version int64) *ReindexDestination {
  611. r.version = version
  612. return r
  613. }
  614. // VersionType specifies how versions are created.
  615. func (r *ReindexDestination) VersionType(versionType string) *ReindexDestination {
  616. r.versionType = versionType
  617. return r
  618. }
  619. // Pipeline specifies the pipeline to use for reindexing.
  620. func (r *ReindexDestination) Pipeline(pipeline string) *ReindexDestination {
  621. r.pipeline = pipeline
  622. return r
  623. }
  624. // Source returns a serializable JSON request for the request.
  625. func (r *ReindexDestination) Source() (interface{}, error) {
  626. source := make(map[string]interface{})
  627. if r.index != "" {
  628. source["index"] = r.index
  629. }
  630. if r.typ != "" {
  631. source["type"] = r.typ
  632. }
  633. if r.routing != "" {
  634. source["routing"] = r.routing
  635. }
  636. if r.opType != "" {
  637. source["op_type"] = r.opType
  638. }
  639. if r.parent != "" {
  640. source["parent"] = r.parent
  641. }
  642. if r.version > 0 {
  643. source["version"] = r.version
  644. }
  645. if r.versionType != "" {
  646. source["version_type"] = r.versionType
  647. }
  648. if r.pipeline != "" {
  649. source["pipeline"] = r.pipeline
  650. }
  651. return source, nil
  652. }