Reduce unnecessary DB queries for Actions tasks #219

Merged
wolfogre merged 12 commits from sillyguodong/act_runner:feature/fetch_task_with_index into main 2023-07-25 03:25:52 +00:00
3 changed files with 26 additions and 8 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner
go 1.20
require (
code.gitea.io/actions-proto-go v0.3.0
code.gitea.io/actions-proto-go v0.3.1
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5
github.com/avast/retry-go/v4 v4.3.1
github.com/bufbuild/connect-go v1.3.1

4
go.sum
View File

@ -1,5 +1,5 @@
code.gitea.io/actions-proto-go v0.3.0 h1:9Tvg8+TaaCXPKi6EnWl9vVgs2VZsj1Cs5afnsHa4AmM=
code.gitea.io/actions-proto-go v0.3.0/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
code.gitea.io/actions-proto-go v0.3.1 h1:PMyiQtBKb8dNnpEO2R5rcZdXSis+UQZVo/SciMtR1aU=
code.gitea.io/actions-proto-go v0.3.1/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5 h1:daBEK2GQeqGikJESctP5Cu1i33z5ztAD4kyQWiw185M=
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
gitea.com/gitea/act v0.246.2-0.20230717034634-cdc6d4bc6a38 h1:whUEO/qPkYfpbL1he9TuIIzz2P4v6xEwb2lT6E/4F7A=

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"github.com/bufbuild/connect-go"
@ -20,9 +21,10 @@ import (
)
type Poller struct {
client client.Client
runner *run.Runner
cfg *config.Config
client client.Client
runner *run.Runner
cfg *config.Config
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
}
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
@ -77,7 +79,11 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
defer cancel()
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
// Load the version value that was in the cache when the request was sent.
v := p.tasksVersion.Load()
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: v,
}))
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
@ -86,8 +92,20 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
return nil, false
}
if resp == nil || resp.Msg == nil || resp.Msg.Task == nil {
if resp == nil || resp.Msg == nil {
return nil, false
}
if resp.Msg.TasksVersion > v {
p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
}
if resp.Msg.Task == nil {
return nil, false
}
// got a task, set `tasksVersion` to zero to focre query db in next request.
p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
return resp.Msg.Task, true
}