Need help with arpc?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

lesismal
154 Stars 14 Forks MIT License 196 Commits 0 Opened issues

Description

More effective network communication, two-way calling, notify and broadcast supported.

Services available

!
?

Need anything else?

Contributors list

# 264,578
Go
Shell
rpc
microse...
196 commits

ARPC - More Effective Network Communication

Mentioned in Awesome Go

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd

Contents

Features

  • [x] Two-Way Calling
  • [x] Two-Way Notify
  • [x] Sync and Async Calling
  • [x] Sync and Async Response
  • [x] Batch Write | Writev | net.Buffers
  • [x] Broadcast
  • [x] Middleware
  • [x] Pub/Sub
  • [x] Opentracing

| Pattern | Interactive Directions | Description | | ------- | ---------------------------- | ------------------------ | | call | two-way:
c -> s
s -> c | request and response | | notify | two-way:
c -> s
s -> c | request without response |

Performance

  • simple echo load testing

| Framework | Protocol | Codec | Configuration | Connection Num | Number of Goroutines Per Connection | Qps | | --------- | --------------- | ------------- | --------------------------------------------------------- | -------------- | ----------------------------------- | ------- | | arpc | tcp/localhost | encoding/json | os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G | 8 | 10 | 80-100k | | grpc | http2/localhost | protobuf | os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G | 8 | 10 | 20-30k |

Header Layout

  • LittleEndian

| bodyLen | reserved | cmd | flag | methodLen | sequence | method | body | | ------- | -------- | ------ | ------- | --------- | -------- | --------------- | ----------------------- | | 4 bytes | 1 byte | 1 byte | 1 bytes | 1 bytes | 8 bytes | methodLen bytes | bodyLen-methodLen bytes |

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick Start

package main

import "github.com/lesismal/arpc"

func main() { server := arpc.NewServer()

// register router
server.Handler.Handle("/echo", func(ctx *arpc.Context) {
    str := ""
    if err := ctx.Bind(&str); err == nil {
        ctx.Write(str)
    }
})

server.Run("localhost:8888")

}

package main

import ( "log" "net" "time"

"github.com/lesismal/arpc"

)

func main() { client, err := arpc.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", "localhost:8888", time.Second*3) }) if err != nil { panic(err) } defer client.Stop()

req := "hello"
rsp := ""
err = client.Call("/echo", &req, &rsp, time.Second*5)
if err != nil {
    log.Fatalf("Call failed: %v", err)
} else {
    log.Printf("Call Response: \"%v\"", rsp)
}

}

API Examples

Register Routers

var handler arpc.Handler

// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler

// message would be default handled one by one in the same conn reader goroutine handler.Handle("/route", func(ctx *arpc.Context) { ... }) handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// this make message handled by a new goroutine async := true handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

Router Middleware

See router middleware, it's easy to implement middlewares yourself

import "github.com/lesismal/arpc/extension/middleware/router"

var handler arpc.Handler

// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler

handler.Use(router.Recover()) handler.Use(router.Logger()) handler.Use(func(ctx *arpc.Context) { ... }) handler.Handle("/echo", func(ctx *arpc.Context) { ... }) handler.Use(func(ctx *arpc.Context) { ... })

Coder Middleware

  • Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpc/extension/middleware/coder/gzip"

var handler arpc.Handler

// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler

handler.UseCoder(gzip.New()) handler.Handle("/echo", func(ctx *arpc.Context) { ... })

Client Call, CallAsync, Notify

  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5 err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) { response := &Echo{} ctx.Bind(response) ... }, timeout)

  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

Server Call, CallAsync, Notify

  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
    client = ctx.Client
    // release client
    client.OnDisconnected(func(c *arpc.Client){
        client = nil
    })
})

go func() { for { time.Sleep(time.Second) if client != nil { client.Call(...) client.CallAsync(...) client.Notify(...) } } }()

  1. Then Call/CallAsync/Notify

Broadcast - Notify

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() { var svr *arpc.Server = ... msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i)) mux.RLock() for client := range clientMap { client.PushMsg(msg, arpc.TimeZero) } mux.RUnlock() }

Async Response

var handler arpc.Handler

// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler

asyncResponse := true // default is true, or set false handler.Handle("/echo", func(ctx *arpc.Context) { req := ... err := ctx.Bind(req) if err == nil { ctx.Write(data) } }, asyncResponse)

Handle New Connection

// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
    ...
})

// server svr := arpc.NewServer() svr.Handler.HandleConnected(func(c *arpc.Client) { ... })

// client client, err := arpc.NewClient(...) client.Handler.HandleConnected(func(c *arpc.Client) { ... })

Handle Disconnected

// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
    ...
})

// server svr := arpc.NewServer() svr.Handler.HandleDisconnected(func(c *arpc.Client) { ... })

// client client, err := arpc.NewClient(...) client.Handler.HandleDisconnected(func(c *arpc.Client) { ... })

Handle Client's send queue overstock

// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
    ...
})

// server svr := arpc.NewServer() svr.Handler.HandleOverstock(func(c *arpc.Client) { ... })

// client client, err := arpc.NewClient(...) client.Handler.HandleOverstock(func(c *arpc.Client) { ... })

Custom Net Protocol

// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client dialer := func() (net.Conn, error) { return ... } client, err := arpc.NewClient(dialer)

Custom Codec

import "github.com/lesismal/arpc/codec"

var codec arpc.Codec = ...

// package codec.Defaultcodec = codec

// server svr := arpc.NewServer() svr.Codec = codec

// client client, err := arpc.NewClient(...) client.Codec = codec

Custom Logger

import "github.com/lesismal/arpc/log"

var logger arpc.Logger = ... log.SetLogger(logger) // log.DefaultLogger = logger

Custom operations before conn's recv and send

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
    // ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) { // ... })

Custom arpc.Client's Reader by wrapping net.Conn

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
    // ...
})

Custom arpc.Client's send queue capacity

arpc.DefaultHandler.SetSendQueueSize(4096)

JS Client

Web Chat Examples

Pub/Sub Examples

  • start a server ```golang import "github.com/lesismal/arpc/extension/pubsub"

var ( address = "localhost:8888"

password = "123qwe"

topicName = "Broadcast"

)

func main() { s := pubsub.NewServer() s.Password = password

// server publish to all clients
go func() {
    for i := 0; true; i++ {
        time.Sleep(time.Second)
        s.Publish(topicName, fmt.Sprintf("message from server %v", i))
    }
}()

s.Run(address)

} ```

  • start a subscribe client ```golang import "github.com/lesismal/arpc/log" import "github.com/lesismal/arpc/extension/pubsub"

var ( address = "localhost:8888"

password = "123qwe"

topicName = "Broadcast"

)

func onTopic(topic *pubsub.Topic) { log.Info("[OnTopic] [%v] \"%v\", [%v]", topic.Name, string(topic.Data), time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000")) }

func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password

// authentication
err = client.Authenticate()
if err != nil {
    panic(err)
}

// subscribe topic if err := client.Subscribe(topicName, onTopic, time.Second); err != nil { panic(err) }

} ```

  • start a publish client ```golang import "github.com/lesismal/arpc/extension/pubsub"

var ( address = "localhost:8888"

password = "123qwe"

topicName = "Broadcast"

)

func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password

// authentication
err = client.Authenticate()
if err != nil {
    panic(err)
}

for i := 0; true; i++ { if i%5 == 0 { // publish msg to all clients client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second) } else { // publish msg to only one client client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second) } time.Sleep(time.Second) }

} ```

More Examples

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.