Need help with kinesis-producer?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

a8m
132 Stars 32 Forks Other 64 Commits 12 Opened issues

Description

An aggregated records producer for Amazon Kinesis

Services available

!
?

Need anything else?

Contributors list

# 8,153
Go
mariadb
node
test-fr...
50 commits
# 28,816
Python
Webpack
Django
reactjs
2 commits
# 294,815
Go
Python
aws-lam...
autosca...
1 commit
# 386,766
C
kinesis
C++
unittes...
1 commit
# 195,648
Go
TypeScr...
serverl...
chrome-...
1 commit
# 46,920
Go
ics
ffmpeg
python3
1 commit
# 693,018
Go
kinesis
1 commit
# 352,500
statsd
kinesis
C
Shell
1 commit

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Useful links

Example

package main

import ( "time"

"github.com/sirupsen/logrus"
"github.com/a8m/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"

)

func main() { client := kinesis.New(session.New(aws.NewConfig())) pr := producer.New(&producer.Config{ StreamName: "test", BacklogCount: 2000, Client: client })

pr.Start()

// Handle failures
go func() {
    for r := range pr.NotifyFailures() {
        // r contains `Data`, `PartitionKey` and `Error()`
        log.Error(r)
    }
}()

go func() {
    for i := 0; i < 5000; i++ {
        err := pr.Put([]byte("foo"), "bar")
        if err != nil {
            log.WithError(err).Fatal("error producing")
        }
    }
}()

time.Sleep(3 * time.Second)
pr.Stop()

}

Specifying logger implementation

producer.Config
takes an optional
logging.Logger
implementation.
Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{ StreamName: "test", BacklogCount: 2000, Client: client, Logger: customLogger, }

Using logrus

import (
    "github.com/sirupsen/logrus"
    producer "github.com/a8m/kinesis-producer"
    "github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{ StreamName: "test", BacklogCount: 2000, Client: client, Logger: loggers.Logrus(log), }

kinesis-producer ships with three logger implementations.

  • producer.Standard
    used the standard library logger
  • loggers.Logrus
    uses logrus logger
  • loggers.Zap
    uses zap logger

License

MIT

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.