aboutsummaryrefslogtreecommitdiffstats
path: root/modules/queue/base.go
blob: 102e79e5416dd8d2ec8b92b8a809869cd0439081 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package queue

import (
	"context"
	"time"
)

var pushBlockTime = 5 * time.Second

type baseQueue interface {
	PushItem(ctx context.Context, data []byte) error
	PopItem(ctx context.Context) ([]byte, error)
	HasItem(ctx context.Context, data []byte) (bool, error)
	Len(ctx context.Context) (int, error)
	Close() error
	RemoveAll(ctx context.Context) error
}

func popItemByChan(ctx context.Context, popItemFn func(ctx context.Context) ([]byte, error)) (chanItem chan []byte, chanErr chan error) {
	chanItem = make(chan []byte)
	chanErr = make(chan error)
	go func() {
		for {
			it, err := popItemFn(ctx)
			if err != nil {
				close(chanItem)
				chanErr <- err
				return
			}
			if it == nil {
				close(chanItem)
				close(chanErr)
				return
			}
			chanItem <- it
		}
	}()
	return chanItem, chanErr
}