pipeline/pkg/reconciler/taskrun/taskrun.go
Dan Lorenc ea94852c3a Refactor Resource result output, and add support for Git resources.
This change builds upon #1406, and logs the exact Git commit used by a Git input
to the ResourceResults field.

Some other cleanups are included:
- Removing custom TerminationMessagePath from the Image resource. The default is fine.
- Test cleanup.
- A new helper to write termination messages from resource containers.

And finally, this adds a new environment variable to the git resource steps, indicating the
name of the resource instance they are running as. We should make this more generic and apply
it to all resource steps as part of the extensiblity refactor.
2019-10-17 09:09:20 -05:00

604 lines
22 KiB
Go

/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package taskrun
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/entrypoint"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/sidecars"
"github.com/tektoncd/pipeline/pkg/status"
"go.uber.org/zap"
"golang.org/x/xerrors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/tracker"
)
const (
// taskRunAgentName defines logging agent name for TaskRun Controller
taskRunAgentName = "taskrun-controller"
// taskRunControllerName defines name for TaskRun Controller
taskRunControllerName = "TaskRun"
// imageDigestExporterContainerName defines the name of the container that will collect the
// built images digest
)
// Reconciler implements controller.Reconciler for Configuration resources.
type Reconciler struct {
*reconciler.Base
// listers index properties about resources
taskRunLister listers.TaskRunLister
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister listers.PipelineResourceLister
cloudEventClient cloudevent.CEClient
tracker tracker.Interface
cache *entrypoint.Cache
timeoutHandler *reconciler.TimeoutSet
metrics *Recorder
}
// Check that our Reconciler implements controller.Reconciler
var _ controller.Reconciler = (*Reconciler)(nil)
// Reconcile compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Task Run
// resource with the current status of the resource.
func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.Logger.Errorf("invalid resource key: %s", key)
return nil
}
// Get the Task Run resource with this namespace/name
original, err := c.taskRunLister.TaskRuns(namespace).Get(name)
if errors.IsNotFound(err) {
// The resource no longer exists, in which case we stop processing.
c.Logger.Infof("task run %q in work queue no longer exists", key)
return nil
} else if err != nil {
c.Logger.Errorf("Error retrieving TaskRun %q: %s", name, err)
return err
}
// Don't modify the informer's copy.
tr := original.DeepCopy()
// If the TaskRun is just starting, this will also set the starttime,
// from which the timeout will immediately begin counting down.
tr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if tr.Status.StartTime.Sub(tr.CreationTimestamp.Time) < 0 {
c.Logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetRunKey(), tr.CreationTimestamp, tr.Status.StartTime)
tr.Status.StartTime = &tr.CreationTimestamp
}
if tr.IsDone() {
c.Logger.Infof("taskrun done : %s \n", tr.Name)
var merr *multierror.Error
// Try to send cloud events first
cloudEventErr := cloudevent.SendCloudEvents(tr, c.cloudEventClient, c.Logger)
// Regardless of `err`, we must write back any status update that may have
// been generated by `sendCloudEvents`
updateErr := c.updateStatusLabelsAndAnnotations(tr, original)
merr = multierror.Append(cloudEventErr, updateErr)
if cloudEventErr != nil {
// Let's keep timeouts and sidecars running as long as we're trying to
// send cloud events. So we stop here an return errors encountered this far.
return merr.ErrorOrNil()
}
c.timeoutHandler.Release(tr)
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = sidecars.Stop(pod, c.Images.NopImage, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update)
} else if errors.IsNotFound(err) {
return merr.ErrorOrNil()
}
if err != nil {
c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err)
merr = multierror.Append(merr, err)
}
go func(metrics *Recorder) {
err := metrics.DurationAndCount(tr)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
err = metrics.RecordPodLatency(pod, tr)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
return merr.ErrorOrNil()
}
// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
if err := c.reconcile(ctx, tr); err != nil {
c.Logger.Errorf("Reconcile error: %v", err.Error())
return err
}
return c.updateStatusLabelsAndAnnotations(tr, original)
}
func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error {
var updated bool
if !equality.Semantic.DeepEqual(original.Status, tr.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
if _, err := c.updateStatus(tr); err != nil {
c.Logger.Warn("Failed to update taskRun status", zap.Error(err))
return err
}
updated = true
}
// Since we are using the status subresource, it is not possible to update
// the status and labels/annotations simultaneously.
if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) {
if _, err := c.updateLabelsAndAnnotations(tr); err != nil {
c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err))
return err
}
updated = true
}
if updated {
go func(metrics *Recorder) {
err := metrics.RunningTaskRuns(c.taskRunLister)
if err != nil {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
}
return nil
}
func (c *Reconciler) getTaskFunc(tr *v1alpha1.TaskRun) (resources.GetTask, v1alpha1.TaskKind) {
var gtFunc resources.GetTask
kind := v1alpha1.NamespacedTaskKind
if tr.Spec.TaskRef != nil && tr.Spec.TaskRef.Kind == v1alpha1.ClusterTaskKind {
gtFunc = func(name string) (v1alpha1.TaskInterface, error) {
t, err := c.clusterTaskLister.Get(name)
if err != nil {
return nil, err
}
return t, nil
}
kind = v1alpha1.ClusterTaskKind
} else {
gtFunc = func(name string) (v1alpha1.TaskInterface, error) {
t, err := c.taskLister.Tasks(tr.Namespace).Get(name)
if err != nil {
return nil, err
}
return t, nil
}
}
return gtFunc, kind
}
func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error {
// We may be reading a version of the object that was stored at an older version
// and may not have had all of the assumed default specified.
tr.SetDefaults(v1alpha1.WithUpgradeViaDefaulting(ctx))
// If the taskrun is cancelled, kill resources and update status
if tr.IsCancelled() {
before := tr.Status.GetCondition(apis.ConditionSucceeded)
err := cancelTaskRun(tr, c.KubeClientSet, c.Logger)
after := tr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, tr)
return err
}
getTaskFunc, kind := c.getTaskFunc(tr)
taskMeta, taskSpec, err := resources.GetTaskData(tr, getTaskFunc)
if err != nil {
c.Logger.Errorf("Failed to determine Task spec to use for taskrun %s: %v", tr.Name, err)
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: status.ReasonFailedResolution,
Message: err.Error(),
})
return nil
}
// Propagate labels from Task to TaskRun.
if tr.ObjectMeta.Labels == nil {
tr.ObjectMeta.Labels = make(map[string]string, len(taskMeta.Labels)+1)
}
for key, value := range taskMeta.Labels {
tr.ObjectMeta.Labels[key] = value
}
if tr.Spec.TaskRef != nil {
tr.ObjectMeta.Labels[pipeline.GroupName+pipeline.TaskLabelKey] = taskMeta.Name
}
// Propagate annotations from Task to TaskRun.
if tr.ObjectMeta.Annotations == nil {
tr.ObjectMeta.Annotations = make(map[string]string, len(taskMeta.Annotations))
}
for key, value := range taskMeta.Annotations {
tr.ObjectMeta.Annotations[key] = value
}
if tr.Spec.Timeout == nil {
tr.Spec.Timeout = &metav1.Duration{Duration: config.DefaultTimeoutMinutes * time.Minute}
}
// Check if the TaskRun has timed out; if it is, this will set its status
// accordingly.
if CheckTimeout(tr) {
if err := c.updateTaskRunStatusForTimeout(tr, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Delete); err != nil {
return err
}
return nil
}
rtr, err := resources.ResolveTaskResources(taskSpec, taskMeta.Name, kind, tr.Spec.Inputs.Resources, tr.Spec.Outputs.Resources, c.resourceLister.PipelineResources(tr.Namespace).Get)
if err != nil {
c.Logger.Errorf("Failed to resolve references for taskrun %s: %v", tr.Name, err)
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: status.ReasonFailedResolution,
Message: err.Error(),
})
return nil
}
if err := ValidateResolvedTaskResources(tr.Spec.Inputs.Params, rtr); err != nil {
c.Logger.Errorf("Failed to validate taskrun %q: %v", tr.Name, err)
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: status.ReasonFailedValidation,
Message: err.Error(),
})
return nil
}
// Initialize the cloud events if at least a CloudEventResource is defined
// and they have not been initialized yet.
// FIXME(afrittoli) This resource specific logic will have to be replaced
// once we have a custom PipelineResource framework in place.
c.Logger.Infof("Cloud Events: %s", tr.Status.CloudEvents)
prs := make([]*v1alpha1.PipelineResource, 0, len(rtr.Outputs))
for _, pr := range rtr.Outputs {
prs = append(prs, pr)
}
cloudevent.InitializeCloudEvents(tr, prs)
// Get the TaskRun's Pod if it should have one. Otherwise, create the Pod.
pod, err := resources.TryGetPod(tr.Status, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get)
if err != nil {
c.Logger.Errorf("Error getting pod %q: %v", tr.Status.PodName, err)
return err
}
if pod == nil {
pod, err = c.createPod(tr, rtr)
if err != nil {
c.handlePodCreationError(tr, err)
return nil
}
go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime)
}
if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil {
c.Logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
return err
}
if status.IsPodExceedingNodeResources(pod) {
c.Recorder.Eventf(tr, corev1.EventTypeWarning, status.ReasonExceededNodeResources, "Insufficient resources to schedule pod %q", pod.Name)
}
before := tr.Status.GetCondition(apis.ConditionSucceeded)
addReady := status.UpdateStatusFromPod(tr, pod, c.resourceLister, c.KubeClientSet, c.Logger)
status.SortTaskRunStepOrder(tr.Status.Steps, taskSpec.Steps)
updateTaskRunResourceResult(tr, pod, c.Logger)
after := tr.Status.GetCondition(apis.ConditionSucceeded)
if addReady {
if err := c.updateReady(pod); err != nil {
return err
}
}
reconciler.EmitEvent(c.Recorder, before, after, tr)
c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after)
return nil
}
func (c *Reconciler) handlePodCreationError(tr *v1alpha1.TaskRun, err error) {
var reason, msg string
var succeededStatus corev1.ConditionStatus
if isExceededResourceQuotaError(err) {
succeededStatus = corev1.ConditionUnknown
reason = status.ReasonExceededResourceQuota
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
}
msg = fmt.Sprintf("%s, reattempted %d times", status.GetExceededResourcesMessage(tr), backoff.NumAttempts)
} else {
succeededStatus = corev1.ConditionFalse
reason = status.ReasonCouldntGetTask
if tr.Spec.TaskRef != nil {
msg = fmt.Sprintf("Missing or invalid Task %s/%s", tr.Namespace, tr.Spec.TaskRef.Name)
} else {
msg = fmt.Sprintf("Invalid TaskSpec")
}
}
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: succeededStatus,
Reason: reason,
Message: fmt.Sprintf("%s: %v", msg, err),
})
c.Recorder.Eventf(tr, corev1.EventTypeWarning, "BuildCreationFailed", "Failed to create build pod %q: %v", tr.Name, err)
c.Logger.Errorf("Failed to create build pod for task %q: %v", tr.Name, err)
}
func updateTaskRunResourceResult(taskRun *v1alpha1.TaskRun, pod *corev1.Pod, logger *zap.SugaredLogger) {
if taskRun.IsSuccessful() {
for _, cs := range pod.Status.ContainerStatuses {
if cs.State.Terminated != nil {
msg := cs.State.Terminated.Message
if msg != "" {
if err := updateTaskRunStatusWithResourceResult(taskRun, []byte(msg)); err != nil {
logger.Infof("No resource result from %s for %s/%s: %s", cs.Name, taskRun.Name, taskRun.Namespace, err)
}
}
}
}
}
}
// updateTaskRunStatusWithResourceResult if there is an update to the outout image resource, add to taskrun status result
func updateTaskRunStatusWithResourceResult(taskRun *v1alpha1.TaskRun, logContent []byte) error {
results := []v1alpha1.PipelineResourceResult{}
if err := json.Unmarshal(logContent, &results); err != nil {
return xerrors.Errorf("Failed to unmarshal output image exporter JSON output: %w", err)
}
taskRun.Status.ResourcesResult = append(taskRun.Status.ResourcesResult, results...)
return nil
}
func (c *Reconciler) updateStatus(taskrun *v1alpha1.TaskRun) (*v1alpha1.TaskRun, error) {
newtaskrun, err := c.taskRunLister.TaskRuns(taskrun.Namespace).Get(taskrun.Name)
if err != nil {
return nil, xerrors.Errorf("Error getting TaskRun %s when updating status: %w", taskrun.Name, err)
}
if !reflect.DeepEqual(taskrun.Status, newtaskrun.Status) {
newtaskrun.Status = taskrun.Status
return c.PipelineClientSet.TektonV1alpha1().TaskRuns(taskrun.Namespace).UpdateStatus(newtaskrun)
}
return newtaskrun, nil
}
func (c *Reconciler) updateLabelsAndAnnotations(tr *v1alpha1.TaskRun) (*v1alpha1.TaskRun, error) {
newTr, err := c.taskRunLister.TaskRuns(tr.Namespace).Get(tr.Name)
if err != nil {
return nil, xerrors.Errorf("Error getting TaskRun %s when updating labels/annotations: %w", tr.Name, err)
}
if !reflect.DeepEqual(tr.ObjectMeta.Labels, newTr.ObjectMeta.Labels) || !reflect.DeepEqual(tr.ObjectMeta.Annotations, newTr.ObjectMeta.Annotations) {
newTr.ObjectMeta.Labels = tr.ObjectMeta.Labels
newTr.ObjectMeta.Annotations = tr.ObjectMeta.Annotations
return c.PipelineClientSet.TektonV1alpha1().TaskRuns(tr.Namespace).Update(newTr)
}
return newTr, nil
}
// updateReady updates a Pod to include the "ready" annotation, which will be projected by
// the Downward API into a volume mounted by the entrypoint container. This will signal to
// the entrypoint that the TaskRun can proceed.
func (c *Reconciler) updateReady(pod *corev1.Pod) error {
newPod, err := c.KubeClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return xerrors.Errorf("Error getting Pod %q when updating ready annotation: %w", pod.Name, err)
}
updatePod := c.KubeClientSet.CoreV1().Pods(newPod.Namespace).Update
if err := resources.AddReadyAnnotation(newPod, updatePod); err != nil {
c.Logger.Errorf("Failed to update ready annotation for pod %q for taskrun %q: %v", pod.Name, pod.Name, err)
return xerrors.Errorf("Error adding ready annotation to Pod %q: %w", pod.Name, err)
}
return nil
}
// createPod creates a Pod based on the Task's configuration, with pvcName as a volumeMount
// TODO(dibyom): Refactor resource setup/substitution logic to its own function in the resources package
func (c *Reconciler) createPod(tr *v1alpha1.TaskRun, rtr *resources.ResolvedTaskResources) (*corev1.Pod, error) {
ts := rtr.TaskSpec.DeepCopy()
inputResources, err := resourceImplBinding(rtr.Inputs, c.Images)
if err != nil {
c.Logger.Errorf("Failed to initialize input resources: %v", err)
return nil, err
}
outputResources, err := resourceImplBinding(rtr.Outputs, c.Images)
if err != nil {
c.Logger.Errorf("Failed to initialize output resources: %v", err)
return nil, err
}
// Get actual resource
err = resources.AddOutputImageDigestExporter(c.Images.ImageDigestExporterImage, tr, ts, c.resourceLister.PipelineResources(tr.Namespace).Get)
if err != nil {
c.Logger.Errorf("Failed to create a build for taskrun: %s due to output image resource error %v", tr.Name, err)
return nil, err
}
ts, err = resources.AddInputResource(c.KubeClientSet, c.Images, rtr.TaskName, ts, tr, inputResources, c.Logger)
if err != nil {
c.Logger.Errorf("Failed to create a build for taskrun: %s due to input resource error %v", tr.Name, err)
return nil, err
}
ts, err = resources.AddOutputResources(c.KubeClientSet, c.Images, rtr.TaskName, ts, tr, outputResources, c.Logger)
if err != nil {
c.Logger.Errorf("Failed to create a build for taskrun: %s due to output resource error %v", tr.Name, err)
return nil, err
}
ts, err = createRedirectedTaskSpec(c.KubeClientSet, c.Images.EntryPointImage, ts, tr, c.cache, c.Logger)
if err != nil {
return nil, xerrors.Errorf("couldn't create redirected TaskSpec: %w", err)
}
var defaults []v1alpha1.ParamSpec
if ts.Inputs != nil {
defaults = append(defaults, ts.Inputs.Params...)
}
// Apply parameter substitution from the taskrun.
ts = resources.ApplyParameters(ts, tr, defaults...)
// Apply bound resource substitution from the taskrun.
ts = resources.ApplyResources(ts, inputResources, "inputs")
ts = resources.ApplyResources(ts, outputResources, "outputs")
pod, err := resources.MakePod(c.Images, tr, *ts, c.KubeClientSet)
if err != nil {
return nil, xerrors.Errorf("translating Build to Pod: %w", err)
}
return c.KubeClientSet.CoreV1().Pods(tr.Namespace).Create(pod)
}
// CreateRedirectedTaskSpec takes a TaskSpec, a persistent volume claim name, a taskrun and
// an entrypoint cache creates a build where all entrypoints are switched to
// be the entrypoint redirector binary. This function assumes that it receives
// its own copy of the TaskSpec and modifies it freely
func createRedirectedTaskSpec(kubeclient kubernetes.Interface, entrypointImage string, ts *v1alpha1.TaskSpec, tr *v1alpha1.TaskRun, cache *entrypoint.Cache, logger *zap.SugaredLogger) (*v1alpha1.TaskSpec, error) {
// RedirectSteps the entrypoint in each container so that we can use our custom
// entrypoint which copies logs to the volume
err := entrypoint.RedirectSteps(cache, ts.Steps, kubeclient, tr, logger)
if err != nil {
return nil, xerrors.Errorf("failed to add entrypoint to steps of TaskRun %s: %w", tr.Name, err)
}
// Add the step which will copy the entrypoint into the volume
// we are going to be using, so that all of the steps will have
// access to it.
entrypoint.AddCopyStep(entrypointImage, ts)
// Add the volume used for storing the binary and logs
ts.Volumes = append(ts.Volumes, corev1.Volume{
Name: entrypoint.MountName,
VolumeSource: corev1.VolumeSource{
// TODO(#107) we need to actually stream these logs somewhere, probably via sidecar.
// Currently these logs will be lost when the pod is unscheduled.
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
})
ts.Volumes = append(ts.Volumes, corev1.Volume{
Name: entrypoint.DownwardMountName,
VolumeSource: corev1.VolumeSource{
DownwardAPI: &corev1.DownwardAPIVolumeSource{
Items: []corev1.DownwardAPIVolumeFile{
{
Path: entrypoint.DownwardMountReadyFile,
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", resources.ReadyAnnotation),
},
},
},
},
},
})
return ts, nil
}
type DeletePod func(podName string, options *metav1.DeleteOptions) error
func (c *Reconciler) updateTaskRunStatusForTimeout(tr *v1alpha1.TaskRun, dp DeletePod) error {
c.Logger.Infof("TaskRun %q has timed out, deleting pod", tr.Name)
// tr.Status.PodName will be empty if the pod was never successfully created. This condition
// can be reached, for example, by the pod never being schedulable due to limits imposed by
// a namespace's ResourceQuota.
if tr.Status.PodName != "" {
if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
c.Logger.Errorf("Failed to terminate pod: %v", err)
return err
}
}
timeout := tr.Spec.Timeout.Duration
timeoutMsg := fmt.Sprintf("TaskRun %q failed to finish within %q", tr.Name, timeout.String())
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: status.ReasonTimedOut,
Message: timeoutMsg,
})
// update tr completed time
tr.Status.CompletionTime = &metav1.Time{Time: time.Now()}
return nil
}
func isExceededResourceQuotaError(err error) bool {
return err != nil && errors.IsForbidden(err) && strings.Contains(err.Error(), "exceeded quota")
}
// resourceImplBinding maps pipeline resource names to the actual resource type implementations
func resourceImplBinding(resources map[string]*v1alpha1.PipelineResource, images pipeline.Images) (map[string]v1alpha1.PipelineResourceInterface, error) {
p := make(map[string]v1alpha1.PipelineResourceInterface)
for rName, r := range resources {
i, err := v1alpha1.ResourceFromType(r, images)
if err != nil {
return nil, xerrors.Errorf("failed to create resource %s : %v with error: %w", rName, r, err)
}
p[rName] = i
}
return p, nil
}