Need help with bizsocket?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

typ0520
474 Stars 145 Forks Apache License 2.0 26 Commits 7 Opened issues

Description

异步socket,对一些业务场景做了支持

Services available

!
?

Need anything else?

Contributors list

# 71,114
Socket....
socket-...
sockets
Dart
9 commits

bizsocket

license

Download

异步socket,对一些业务场景做了支持

  • 断线重连
  • 一对一请求
  • 通知、粘性通知
  • 串行请求合并
  • 包分片处理(AbstractFragmentRequestQueue)
  • 缓存
  • 拦截器
  • 支持rxjava,提供类似于retrofit的支持
  • 提供rxjava和rxjava2两种使用方式

使用方式

RxJava2

Maven

  com.github.typ0520
  bizsocket-rx2
  2.0.0

or Gradle

buildscript {
    repositories {
       jcenter()
    }
}

dependencies { compile 'com.github.typ0520:bizsocket-rx2:2.0.0' }

RxJava

Maven

  com.github.typ0520
  bizsocket-rx1
  2.0.0

or Gradle

buildscript {
    repositories {
       jcenter()
    }
}

dependencies { compile 'com.github.typ0520:bizsocket-rx1:2.0.0' }

适用协议

如果想使用此库,客户端和服务器的通讯协议中必须要有命令号、包序列号这两个字段

  • 命令号代表请求类型,可以想象成http中url的作用
  • 包序列号是数据包的唯一索引,客户端发起请求时为数据包生成一个唯一索引,服务器返回请求对应的结果时把这个包序列号带回去

协议可以类似于下面这种:

| cmd | packetId | contentLength |content| | -----------|:-------------:| ---------------------------:| ---------------------------:| |int |int | int | byte[] |

也可以类似于下面这样的每个数据包都是一段json字符串,包与包之间用换行符分割

{"cmd": xxx , "packetId": xxx , ...... } 

数据包的创建是通过这两个抽象类PacketFactory、Packet,整个库的数据流转都是通过命令号、包序列号这两个字段来完成的,字段名、出现的位置以及形式不限,只要有这两个字段就适用此库

配置BizSocket

sample中client与server之间的通讯协议是

| length(int) | cmd(int) | seq(int) | content(byte[]) | | -------------|:--------:| -----------------:| -------------------------------:| | 数据包的总长度 | 命令号 | 数据包的唯一索引 | 报文体,可以想象成http协议中的body |

下面的代码片段来自sample,建议把代码拉下来看

  • 1、 首先需要创建一个数据包类继承自Packet
public class SamplePacket extends Packet {
    static volatile int currentSeq = 0;
    public int length;
    public int cmd;
    public int seq;
    public String content;

@Override
public int getCommand() {
     //覆盖父类的抽象方法
    return cmd;
}

@Override
public String getPacketID() {
    //覆盖父类的抽象方法
    return String.valueOf(seq);
}

//获取请求数据包byte[],写给服务器
public byte[] toBytes() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    BufferedSink bufferedSink = Okio.buffer(Okio.sink(bos));
    try {
        //包长 = 内容长度 + 包头固定的12个字节
        ByteString byteString = ByteString.encodeUtf8(content);
        bufferedSink.writeInt(byteString.size() + 12);
        bufferedSink.writeInt(cmd);
        bufferedSink.writeInt(seq);
        bufferedSink.write(byteString);
        bufferedSink.flush();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return bos.toByteArray();
}

}

  • 2、创建PacketFactory,主要用来从流中解析出server发给client的数据包
public class SamplePacketFactory extends PacketFactory {
    @Override
    public Packet getRequestPacket(Packet reusable,Request request) {
        return new SamplePacket(request.command(),request.body());
    }

@Override
public Packet getHeartBeatPacket(Packet reusable) {
    return new SamplePacket(SampleCmd.HEARTBEAT.getValue(), ByteString.encodeUtf8("{}"));
}

@Override
public Packet getRemotePacket(Packet reusable,BufferedSource source) throws IOException {
     SamplePacket packet = new SamplePacket();
    packet.length = reader.readInt();
    packet.cmd = reader.readInt();
    packet.seq = reader.readInt();
    //减去协议头的12个字节长度
    packet.content = reader.readString(packet.length - 12, Charset.forName("utf-8"));
    return packet;
}

}

  • 3、配置client
public class SampleClient extends AbstractBizSocket {
    public SampleClient(Configuration configuration) {
        super(configuration);
    }

@Override
protected PacketFactory createPacketFactory() {
    return new SamplePacketFactory();
}

}

  • 3、启动client,以j2se为例写一个main方法
public static void main(String[] args) {
        SampleClient client = new SampleClient(new Configuration.Builder()
                .host("127.0.0.1")
                .port(9103)
                .readTimeout(TimeUnit.SECONDS,30)
                .heartbeat(60)
                .build());

    client.getInterceptorChain().addInterceptor(new Interceptor() {
        @Override
        public boolean postRequestHandle(RequestContext context) throws Exception {
            System.out.println("发现一个请求postRequestHandle: " + context);
            return false;
        }

        @Override
        public boolean postResponseHandle(int command, Packet responsePacket) throws Exception {
            System.out.println("收到一个包postResponseHandle: " + responsePacket);
            return false;
        }
    });

    try {
        //连接
        client.connect();
        //启动断线重连
        client.getSocketConnection().bindReconnectionManager();
        //开启心跳
        client.getSocketConnection().startHeartBeat();
    } catch (Exception e) {
        e.printStackTrace();
    }

    //注册通知,接收服务端的推送
    client.subscribe(client, SampleCmd.NOTIFY_PRICE.getValue(), new ResponseHandler() {
        @Override
        public void sendSuccessMessage(int command, ByteString requestBody, Packet responsePacket) {
            System.out.println("cmd: " + command + " ,requestBody: " + requestBody + " responsePacket: " + responsePacket);
        }

        @Override
        public void sendFailureMessage(int command, Throwable error) {
            System.out.println(command + " ,err: " + error);
        }
    });

//发起一对一请求
    String json = "{\"productId\" : \"1\",\"isJuan\" : \"0\",\"type\" : \"2\",\"sl\" : \"1\"}";
    client.request(new Request.Builder().command(SampleCmd.CREATE_ORDER.getValue()).utf8body(json).build(), new ResponseHandler() {
        @Override
        public void sendSuccessMessage(int command, ByteString requestBody, Packet responsePacket) {
            System.out.println("cmd: " + command + " ,requestBody: " + requestBody + " attach: " + " responsePacket: " + responsePacket);
        }

        @Override
        public void sendFailureMessage(int command, Throwable error) {
            System.out.println(command + " ,err: " + error);
        }
    });

//如果想用rxjava的形式调用也是支持的,提供了类似于retrofit通过动态代理创建的service类来调用
    BizSocketRxSupport rxSupport = new BizSocketRxSupport.Builder()
            .requestConverter(new JSONRequestConverter())
            .responseConverter(new JSONResponseConverter())
            .bizSocket(client)
            .build();
    SampleService service = rxSupport.create(SampleService.class);

    JSONObject params = new JSONObject();
    try {
        params.put("pageSize","10000");
    } catch (JSONException e) {
        e.printStackTrace();
    }
    //rxjava范例,使用rxjava2需修改Subscriber
    service.queryOrderList(params).subscribe(new Subscriber<jsonobject>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(JSONObject jsonObject) {
            System.out.println("rx response: " + jsonObject);
        }
    });

//阻塞主线程,防止程序退出,可以想象成android中的Looper类
    while (true) {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.