// Copyright 2023 The Gitea Authors. All rights reserved.// SPDX-License-Identifier: MITpackagequeueimport("context""sync""unsafe""code.gitea.io/gitea/modules/nosql""gitea.com/lunny/levelqueue""github.com/syndtr/goleveldb/leveldb")typebaseLevelQueueUniquestruct{internal*levelqueue.UniqueQueueconnstringcfg*BaseConfigmusync.Mutex// the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together}var_baseQueue=(*baseLevelQueueUnique)(nil)funcnewBaseLevelQueueUnique(cfg*BaseConfig)(baseQueue,error){conn,db,err:=prepareLevelDB(cfg)iferr!=nil{returnnil,err}q:=&baseLevelQueueUnique{conn:conn,cfg:cfg}q.internal,err=levelqueue.NewUniqueQueue(db,[]byte(cfg.QueueFullName),[]byte(cfg.SetFullName),false)iferr!=nil{returnnil,err}returnq,nil}func(q*baseLevelQueueUnique)PushItem(ctxcontext.Context,data[]byte)error{returnbaseLevelQueueCommon(q.cfg,q.internal,&q.mu).PushItem(ctx,data)}func(q*baseLevelQueueUnique)PopItem(ctxcontext.Context)([]byte,error){returnbaseLevelQueueCommon(q.cfg,q.internal,&q.mu).PopItem(ctx)}func(q*baseLevelQueueUnique)HasItem(ctxcontext.Context,data[]byte)(bool,error){q.mu.Lock()deferq.mu.Unlock()returnq.internal.Has(data)}func(q*baseLevelQueueUnique)Len(ctxcontext.Context)(int,error){q.mu.Lock()deferq.mu.Unlock()returnint(q.internal.Len()),nil}func(q*baseLevelQueueUnique)Close()error{q.mu.Lock()deferq.mu.Unlock()err:=q.internal.Close()_=nosql.GetManager().CloseLevelDB(q.conn)returnerr}func(q*baseLevelQueueUnique)RemoveAll(ctxcontext.Context)error{q.mu.Lock()deferq.mu.Unlock()typelevelUniqueQueuestruct{q*levelqueue.Queueset*levelqueue.Setdb*leveldb.DB}lq:=(*levelUniqueQueue)(unsafe.Pointer(q.internal))members,err:=lq.set.Members()iferr!=nil{returnerr// seriously corrupted}for_,v:=rangemembers{_,_=lq.set.Remove(v)}forlq.q.Len()>0{if_,err=lq.q.LPop();err!=nil{returnerr}}returnnil}