More effective network communication, two-way calling, notify and broadcast supported.
| Pattern | Interactive Directions | Description |
| ------- | ---------------------------- | ------------------------ |
| call | two-way:
c -> s
s -> c | request and response |
| notify | two-way:
c -> s
s -> c | request without response |
| Framework | Protocol | Codec | Configuration | Connection Num | Number of Goroutines Per Connection | Qps |
| --------- | --------------- | ------------- | --------------------------------------------------------- | -------------- | ----------------------------------- | ------- |
| arpc | tcp/localhost | encoding/json | os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G | 8 | 10 | 80-100k |
| grpc | http2/localhost | protobuf | os: VMWare Ubuntu 18.04
cpu: AMD 3500U 4c8t
mem: 2G | 8 | 10 | 20-30k |
| bodyLen | reserved | cmd | flag | methodLen | sequence | method | body | | ------- | -------- | ------ | ------- | --------- | -------- | --------------- | ----------------------- | | 4 bytes | 1 byte | 1 byte | 1 bytes | 1 bytes | 8 bytes | methodLen bytes | bodyLen-methodLen bytes |
$ go get -u github.com/lesismal/arpc
import "github.com/lesismal/arpc"
package mainimport "github.com/lesismal/arpc"
func main() { server := arpc.NewServer()
// register router server.Handler.Handle("/echo", func(ctx *arpc.Context) { str := "" if err := ctx.Bind(&str); err == nil { ctx.Write(str) } }) server.Run("localhost:8888")
}
package mainimport ( "log" "net" "time"
"github.com/lesismal/arpc"
)
func main() { client, err := arpc.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", "localhost:8888", time.Second*3) }) if err != nil { panic(err) } defer client.Stop()
req := "hello" rsp := "" err = client.Call("/echo", &req, &rsp, time.Second*5) if err != nil { log.Fatalf("Call failed: %v", err) } else { log.Printf("Call Response: \"%v\"", rsp) }
}
var handler arpc.Handler// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler
// message would be default handled one by one in the same conn reader goroutine handler.Handle("/route", func(ctx *arpc.Context) { ... }) handler.Handle("/route2", func(ctx *arpc.Context) { ... })
// this make message handled by a new goroutine async := true handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)
See router middleware, it's easy to implement middlewares yourself
import "github.com/lesismal/arpc/extension/middleware/router"var handler arpc.Handler
// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler
handler.Use(router.Recover()) handler.Use(router.Logger()) handler.Use(func(ctx *arpc.Context) { ... }) handler.Handle("/echo", func(ctx *arpc.Context) { ... }) handler.Use(func(ctx *arpc.Context) { ... })
import "github.com/lesismal/arpc/extension/middleware/coder/gzip"var handler arpc.Handler
// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler
handler.UseCoder(gzip.New()) handler.Handle("/echo", func(ctx *arpc.Context) { ... })
request := &Echo{...} response := &Echo{} timeout := time.Second*5 err := client.Call("/call/echo", request, response, timeout) // ctx, cancel := context.WithTimeout(context.Background(), time.Second) // defer cancel() // err := client.CallWith(ctx, "/call/echo", request, response)
request := &Echo{...}timeout := time.Second*5 err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) { response := &Echo{} ctx.Bind(response) ... }, timeout)
data := &Notify{...} client.Notify("/notify", data, time.Second) // ctx, cancel := context.WithTimeout(context.Background(), time.Second) // defer cancel() // client.NotifyWith(ctx, "/notify", data)
var client *arpc.Client server.Handler.Handle("/route", func(ctx *arpc.Context) { client = ctx.Client // release client client.OnDisconnected(func(c *arpc.Client){ client = nil }) })go func() { for { time.Sleep(time.Second) if client != nil { client.Call(...) client.CallAsync(...) client.Notify(...) } } }()
var mux = sync.RWMutex{} var clientMap = make(map[*arpc.Client]struct{})func broadcast() { var svr *arpc.Server = ... msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i)) mux.RLock() for client := range clientMap { client.PushMsg(msg, arpc.TimeZero) } mux.RUnlock() }
var handler arpc.Handler// package handler = arpc.DefaultHandler // server handler = server.Handler // client handler = client.Handler
asyncResponse := true // default is true, or set false handler.Handle("/echo", func(ctx *arpc.Context) { req := ... err := ctx.Bind(req) if err == nil { ctx.Write(data) } }, asyncResponse)
// package arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) { ... })// server svr := arpc.NewServer() svr.Handler.HandleConnected(func(c *arpc.Client) { ... })
// client client, err := arpc.NewClient(...) client.Handler.HandleConnected(func(c *arpc.Client) { ... })
// package arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) { ... })// server svr := arpc.NewServer() svr.Handler.HandleDisconnected(func(c *arpc.Client) { ... })
// client client, err := arpc.NewClient(...) client.Handler.HandleDisconnected(func(c *arpc.Client) { ... })
// package arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) { ... })// server svr := arpc.NewServer() svr.Handler.HandleOverstock(func(c *arpc.Client) { ... })
// client client, err := arpc.NewClient(...) client.Handler.HandleOverstock(func(c *arpc.Client) { ... })
// server var ln net.Listener = ... svr := arpc.NewServer() svr.Serve(ln)// client dialer := func() (net.Conn, error) { return ... } client, err := arpc.NewClient(dialer)
import "github.com/lesismal/arpc/codec"var codec arpc.Codec = ...
// package codec.Defaultcodec = codec
// server svr := arpc.NewServer() svr.Codec = codec
// client client, err := arpc.NewClient(...) client.Codec = codec
import "github.com/lesismal/arpc/log"var logger arpc.Logger = ... log.SetLogger(logger) // log.DefaultLogger = logger
arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) { // ... })arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) { // ... })
arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) { // ... })
arpc.DefaultHandler.SetSendQueueSize(4096)
var ( address = "localhost:8888"
password = "123qwe"topicName = "Broadcast"
)
func main() { s := pubsub.NewServer() s.Password = password
// server publish to all clients go func() { for i := 0; true; i++ { time.Sleep(time.Second) s.Publish(topicName, fmt.Sprintf("message from server %v", i)) } }()s.Run(address)
} ```
var ( address = "localhost:8888"
password = "123qwe"topicName = "Broadcast"
)
func onTopic(topic *pubsub.Topic) { log.Info("[OnTopic] [%v] \"%v\", [%v]", topic.Name, string(topic.Data), time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000")) }
func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password
// authentication err = client.Authenticate() if err != nil { panic(err) }// subscribe topic if err := client.Subscribe(topicName, onTopic, time.Second); err != nil { panic(err) }
} ```
var ( address = "localhost:8888"
password = "123qwe"topicName = "Broadcast"
)
func main() { client, err := pubsub.NewClient(func() (net.Conn, error) { return net.DialTimeout("tcp", address, time.Second*3) }) if err != nil { panic(err) } client.Password = password
// authentication err = client.Authenticate() if err != nil { panic(err) }for i := 0; true; i++ { if i%5 == 0 { // publish msg to all clients client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second) } else { // publish msg to only one client client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second) } time.Sleep(time.Second) }
} ```