Need help with kinesis-producer?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

a8m
135 Stars 40 Forks Other 65 Commits 13 Opened issues

Description

An aggregated records producer for Amazon Kinesis

Services available

!
?

Need anything else?

Contributors list

# 7,788
Go
mariadb
node
test-fr...
50 commits
# 38,126
Python
Webpack
Django
reactjs
2 commits
# 303,998
Go
aws-lam...
autosca...
Amazon ...
1 commit
# 374,468
C
kinesis
sed
C++
1 commit
# 201,336
Go
TypeScr...
serverl...
serverl...
1 commit
# 48,478
Go
ics
ffmpeg
python3
1 commit
# 677,947
Go
kinesis
1 commit
# 357,034
statsd
kinesis
Shell
C
1 commit

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Useful links

Example

package main

import ( "time"

"github.com/sirupsen/logrus"
"github.com/a8m/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"

)

func main() { client := kinesis.New(session.New(aws.NewConfig())) pr := producer.New(&producer.Config{ StreamName: "test", BacklogCount: 2000, Client: client })

pr.Start()

// Handle failures
go func() {
    for r := range pr.NotifyFailures() {
        // r contains `Data`, `PartitionKey` and `Error()`
        log.Error(r)
    }
}()

go func() {
    for i := 0; i < 5000; i++ {
        err := pr.Put([]byte("foo"), "bar")
        if err != nil {
            log.WithError(err).Fatal("error producing")
        }
    }
}()

time.Sleep(3 * time.Second)
pr.Stop()

}

Specifying logger implementation

producer.Config
takes an optional
logging.Logger
implementation.
Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{ StreamName: "test", BacklogCount: 2000, Client: client, Logger: customLogger, }

Using logrus

import (
    "github.com/sirupsen/logrus"
    producer "github.com/a8m/kinesis-producer"
    "github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{ StreamName: "test", BacklogCount: 2000, Client: client, Logger: loggers.Logrus(log), }

kinesis-producer ships with three logger implementations.

  • producer.Standard
    used the standard library logger
  • loggers.Logrus
    uses logrus logger
  • loggers.Zap
    uses zap logger

License

MIT

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.