Need help with pipeline?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

myntra
456 Stars 48 Forks MIT License 23 Commits 5 Opened issues

Description

Pipeline is a package to build multi-staged concurrent workflows with a centralized logging output.

Services available

!
?

Need anything else?

Contributors list

Pipeline

A package to build multi-staged concurrent workflows with a centralized logging output.


The package could be used to define and execute CI/CD tasks(either sequential or concurrent). A tool with similar goals would be Jenkins Pipeline. However, compared to Jenkins Pipeline, this package has fewer constructs since the logic is specified in code, as opposed to a Jenkinsfile.

It's tiny by design and is valuable when used as a glue rather than a container.

go get

$ go get gopkg.in/myntra/pipeline.v1

Concepts

The package has three building blocks to create workflows : Pipeline, Stage and Step . A pipeline is a collection of stages and a stage is a collection of steps. A stage can have either concurrent or sequential steps, while stages are always sequential.

Pipeline

The step block is where the actual work is done. Stage and pipeline act as flow governors.

The Step Interface

Step is the unit of work which can be concurrently or sequentially staged with other steps. To do that, we need to implement the

Step
interface.
type Step interface {
    Out
    Exec(*Request) *Result
    Cancel() error
}

To satisfy the interface we need to embed

pipeline.StepContext
and implement
Exec(*Request)*Result
,
Cancel()error
methods in the target type. For e.g:
type work struct {
    pipeline.StepContext
}

func (w work) Exec(request *pipeline.Request) *pipeline.Result { return &pipeline.Result{} }

func (w work) Cancel() error { return nil }

The

pipeline.StepContext
type provides a
Status
method which can be used to log to the
out
channel. The current step receives a
Request
value passed on by the previous step. Internally data(
Request.Data
and
Request.KeyVal
) is copied from the previous step's
Result
.

Usage

The api NewStage(name string, concurrent bool, disableStrictMode bool) is used to stage work either sequentially or concurrently. In terms of the pipeline package, a unit of work is an interface: Step.

The following example shows a sequential stage. For a more complex example, please see: examples/advanced.go

package main

import ( "fmt" "time"

"github.com/myntra/pipeline"

)

type work struct { pipeline.StepContext id int }

func (w work) Exec(request *pipeline.Request) *pipeline.Result { w.Status(fmt.Sprintf("%+v", request))

duration := time.Duration(1000 * w.id)
time.Sleep(time.Millisecond * duration)
msg := fmt.Sprintf("work %d", w.id)

return &pipeline.Result{
    Error:  nil,
    Data:   struct{msg string}{msg:msg},
    KeyVal: map[string]interface{}{"msg": msg},
}

}

func (w work) Cancel() error { w.Status("cancel step") return nil }

func readPipeline(pipe *pipeline.Pipeline) { out, err := pipe.Out() if err != nil { return }

progress, err := pipe.GetProgressPercent()
if err != nil {
    return
}

for {
    select {
    case line := 

Check

examples
directory for more.

Logging and Progress

  • pipeline.Out()
    : Get all statuses/logs.
  • pipeline.Progress
    : Get progress in percentage.

Output of the above example:

Example Output

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.