OkHttp使用分析—WebSocket篇

OkHttp使用分析—WebSocket篇

我们先看一下怎么使用OKhtttp完成WebSocket的请求:

  //设置连接超时时间
        mOkHttpClient = new OkHttpClient.Builder().connectTimeout(9 * 10, TimeUnit.SECONDS).build();
        Request request = new Request.Builder().url(BASE_URL).build();
        mWebSocket = mOkHttpClient.newWebSocket(request, this);

重点在这里,打开OkHttpClient.class查找newWebSocket()方法:

  /**
   * Uses {@code request} to connect a new web socket.
   */
  @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
    webSocket.connect(this);
    return webSocket;
  }

这里传入request对象和websocket的专用监听WebSocketListener,WebSocketListener 对象稍后再做赘述,主流程还是看RealWebSocket.class的connect()方法:
步骤1:

 client = client.newBuilder()
        .protocols(ONLY_HTTP1)
        .build();

我们都知道普通的请求时client是需要被bulid的,这里拿到OkHttpClient又重新创建了一遍,一开始就创建好了干嘛还要创建创建呢?看这个方法:protocols(ONLY_HTTP1),

 private static final List<Protocol> ONLY_HTTP1 = Collections.singletonList(Protocol.HTTP_1_1);

步骤2:

 final Request request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();

对request对象的头部加工,

步骤3:

 call = Internal.instance.newWebSocketCall(client, request);

从OkHttpClient中 获取WebSocket的call对象(回调使用),这个Internal.instance虽然是接口方法,其实现是在OkHttpClient中,直接看对应方法:

 @Override public Call newWebSocketCall(OkHttpClient client, Request originalRequest) {
        return new RealCall(client, originalRequest, true);
      }

步骤4:搜嘎 原来enqueue()方法是使用RealCall.class的enqueue()方法,这是一个入队的方法,而且是个异步的方法。这就说明webSocket建立连接后才响应回调。而且如果是长连接那么这个线程就一直在线程池里不会被释放掉。

call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }

照现在的进度已经到了设置好的回调要开始执行了,那就转战RealCall

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

其实我对okhttp同步请求有几点疑惑:
1一开始我没有创建线程,那么这个请求就是在主线程中吗?
2如果是同步请求那么如果同时多次请求是不是如果前面的请求在执行后面的请求在进入等待的状态了呢?
其实这些问题就需要从dispatcher()的线程池入手了。

这个dispatcher在一开始介绍ok的时候已经介绍过了,我们来看dispatcher中的enqueue()方法:
嘿嘿嘿,又到了OkHttp请求里了 而且 这时候realCall内部创建了AsyncCall(异步的Call),其实看方法名就应该知道的,ok的webSocket都是使用异步的,而且我们要明白现在只是一个最初的socket,之后的通信,都会在该线程池的一个线程中进行。

问题1:ok的websocket是异步的,并不会阻塞主线程,而且也不需要单独开辟一个子线程来创建连接。
问题2:会不会阻塞首先我们再次看看这个executorService的线程池结构。虽然在同步篇对dispatcher的线程池做过介绍,但是在我看来还是很解释不够清晰的地方:
首先 这个是dispatcher线程池的结构

executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));

我在这里做一个详细的说明:首先,SynchronousQueue是一个无缓存的阻塞的队列,什么意思呢?我们可以理解为当这个队列中有元素的时候,这个元素没有被取走(take方法)之前是不允许继续对之后的内容进行操作。

注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
注意2:它是线程安全的,是阻塞的。
注意3:不允许使用 null 元素。
注意4:公平排序策略是指调用put的线程之间,或take的线程之间。公平排序策略可以查考ArrayBlockingQueue中的公平策略。
所以这又解决了一个困扰我多年的难题:
okhttp的能同时执行多少个请求?
这个线程池的配置其实就是Executors提供的线程池配置方案之一,构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


用一个形象的比喻就是一个传球手,当从主线程传进了任务,就创建一个runnable来接收。


ThreadPoolExecutor.jpg

这里是Dispatcher的异步启动方法:

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

在这里专门用runningAsyncCalls来记录在执行的Call,每次执行都会记录,当向executor添加call的时候,根据2,将任务放入SynchronousQueue中等待前面的request被取出才能执行之后的request,这里maxRequests 被定为64.超出64的将会被放入readyAsyncCalls。
ready和running之间怎么传递呢?
这就需要我们对比分析下RealCall这个类:
同步的时候是调用RealCall的:@Override public Response execute() throws IOException
异步的时候是调用AsyncCall的:

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

事件的回调已经具备了,回收需要看这里.finished(this)方法,最终会调用这个:

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

那么问题又来了,
请对比分析Ok与Volley的优缺点。

websocket篇:

此前我先声明一点,一个websocket链接的建立是在一个子线程当中,如果链接不关闭这个子线程一直存在,
在链接前 我们创建了一个RealWebSocket.class我们进它的构造里看看也许有个惊喜:

public RealWebSocket(Request request, WebSocketListener listener, Random random) {
//省略部分代码  
this.writerRunnable = new Runnable() {
  @Override public void run() {
try {
  while (writeOneFrame()) {
  }
} catch (IOException e) {
  failWebSocket(e, null);
}
  }
};
  }

在这里创建了一个写的线程,writerRunnable
再看connect()方法:这次只需要看call的回调就可以。根据现在的流程,链接成功,走了成功的回调,Call的onResponse方法:

 try {
  listener.onOpen(RealWebSocket.this, response);
  String name = "OkHttp WebSocket " + request.url().redact();
  initReaderAndWriter(name, pingIntervalMillis, streams);
  streamAllocation.connection().socket().setSoTimeout(0);
  loopReader();
} catch (Exception e) {
  failWebSocket(e, null);
}
  }

核心代码在这里:
1.initReaderAndWriter()初始化读写者。这是为同服务器交互进行准备?

 this.writer = new WebSocketWriter(streams.client, streams.sink, random);
 this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));

准备了Writer,准备了定时任务(心跳链接ping——pong)
runWriter();方法都做了什么呢?
private void runWriter() {
assert (Thread.holdsLock(this));

if (executor != null) {
  executor.execute(writerRunnable);
}
  }

哈哈 原来是为心跳链接做准备啊,定时进行通知服务器 我还在哈。

2.loopReader()开始轮训读取消息(随时准备接受来自服务器的消息)

 public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
  // This method call results in one or more onRead* methods being called on this thread.
  reader.processNextFrame();
}
  }

这不,一直循环调用reader.processNextFrame();

 /**
   * Process the next protocol frame.
   *
   * <ul>
   * <li>If it is a control frame this will result in a single call to {@link FrameCallback}.
   * <li>If it is a message frame this will result in a single call to {@link
   * FrameCallback#onReadMessage}. If the message spans multiple frames, each interleaved
   * control frame will result in a corresponding call to {@link FrameCallback}.
   * </ul>
   */
  void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
  readControlFrame();
} else {
  readMessageFrame();
}
  }

没办法 注释写的太好了,我忍不住都粘贴了进来:
1如果是控制帧将会有一个单一的callback:FrameCallback
2如果是消息帧也会有一个单一的callback:FrameCallback#onReadMessage

看到这里websocket基本上已经完了,剩下的就是调用监听了。
~~~~~~~~~~~~~~ 补充部分 ~~~~~~~~~~~~~~~

感谢网友朋友细心指导,因为写这篇文章比较早(细节忘了很多,尴尬)还原问题:
“框架会自动发送ping包吗? 怎么设置发送间隔时间呢?”

真的会,而且在而且OkHttpClient也支持设置心跳间隔:

 // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);

还对 ping pong的次数进行了记录:至于怎么发送ping 需要看这个:

  initReaderAndWriter(name, pingIntervalMillis, streams);

没错 又追踪到了初始化读写者,在初始化读写者的时候有这样一句(多看一句就能回答 读者的问题了 甚是惭愧):

      if (pingIntervalMillis != 0) {
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }

由此可见:
1 如果pingIntervalMillis 设置为0的时候 心跳executor是不会执行的。
2 executor 原来也负责心跳包的定时任务

让我们看看 pingrunnable里都做了什么吧:

  private final class PingRunnable implements Runnable {
    PingRunnable() {
    }

    @Override public void run() {
      writePingFrame();
    }
  }

  void writePingFrame() {
    WebSocketWriter writer;
    synchronized (this) {
      if (failed) return;
      writer = this.writer;
    }

    try {
      writer.writePing(ByteString.EMPTY);
    } catch (IOException e) {
      failWebSocket(e, null);
    }
  }

果然简单实用:
一个runnable 调用writer的writePing方法。想一想还是很合理啊,毕竟发送消息就是需要 writer来做,所以 writer有这些方法也不足为其。具体writer怎么写 我们看下:

 /** Send a ping with the supplied {@code payload}. */
  void writePing(ByteString payload) throws IOException {
    synchronized (this) {
      writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload);
    }
  }

  /** Send a pong with the supplied {@code payload}. */
  void writePong(ByteString payload) throws IOException {
    synchronized (this) {
      writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload);
    }
  }

顺便一瞅 就在下边有个pong的发送方法,分析一下:
1 入参payload 是ByteString.EMPTY 就是一个空的字节,
2 最终都是相同的方法writeControlFrameSynchronized,
3 对于消息的区分:依靠writeControlFrameSynchronized的第一个入参opcode,
4 writeControlFrameSynchronized这个方法虽然没有注释 但是 即然写消息都需要调用这个方法,相比这个方法才是writer的实力担当:

  private void writeControlFrameSynchronized(int opcode, ByteString payload) throws IOException {
    assert Thread.holdsLock(this);

    if (writerClosed) throw new IOException("closed");

    int length = payload.size();
    if (length > PAYLOAD_BYTE_MAX) {
      throw new IllegalArgumentException(
          "Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX);
    }

    int b0 = B0_FLAG_FIN | opcode;
    sink.writeByte(b0);

    int b1 = length;
    if (isClient) {
      b1 |= B1_FLAG_MASK;
      sink.writeByte(b1);

      random.nextBytes(maskKey);
      sink.write(maskKey);

      byte[] bytes = payload.toByteArray();
      toggleMask(bytes, bytes.length, maskKey, 0);
      sink.write(bytes);
    } else {
      sink.writeByte(b1);
      sink.write(payload);
    }

    sink.flush();
  }

操作太6 ,表示职能看懂个大概 , 都被写入这个sink中了?。?!

问题来了:sink是什么东西?

 /** Writes must be guarded(被守护的) by synchronizing on 'this'. */
  final BufferedSink sink;

没有交代,但是有这样一个提醒,对sink写的时候必须是被synchronizing?;さ?这样我算是明白为嘛ping和pong的方法都会加锁了(他说咋做就咋做 嘻嘻 稍后看)。

我们先从单词上理解这个变量的意义吧:sink,水槽,洗涤池,什么鬼?看不懂。。。我还是看BufferedSink吧:

  • A sink that keeps a buffer internally so that callers can do small writes
  • 在内部保留缓冲区的接收器,以便调用方可以执行小的写入操作。
  • without a performance penalty.

都说了是个小型的缓冲池,因此在写的时候会对大小进行限制:
static final long PAYLOAD_BYTE_MAX = 125L;

虽然是个接口但是已经给了我们足够多的有效信息,让我们看看在创建的时候是怎么实现这个BufferedSink,回到最初writer创建的地方:

  this.writer = new WebSocketWriter(streams.client, streams.sink, random);

哦?在初始化的时候从Stream中获取的。在向上找当初的stream是怎么创建的:
当链接成功后就会 返回一个Call:

   @Override public void onResponse(Call call, Response response) 

  // Promote the HTTP streams into web socket streams.
  // 促进 http流初始化这个socket流
  StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
   // Prevent connection pooling!
   // 防止连接共用
        streamAllocation.noNewStreams(); 
  //创建 Stream
   Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

看来一切的谜底都在 RealConnection的newWebSockerStreams里:

 public RealWebSocket.Streams newWebSocketStreams(final StreamAllocation streamAllocation) {
    return new RealWebSocket.Streams(true, source, sink) {
      @Override public void close() throws IOException {
        streamAllocation.streamFinished(true, streamAllocation.codec());
      }
    };
  }

呵呵,看到真相我有点想放弃, new RealWebSocket.Streams(true, source, sink) sink就是这样被赋予的,让我回想一下,RealConnection还是挺熟悉的,是在什么时候创建的呢?
今天先研究到这里我容我仔细研究一番。。。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • 参考资源 官网 国内博客 GitHub官网 鉴于一些关于OKHttp3源码的解析文档过于碎片化,本文系统的,由浅入...
    风骨依存阅读 12,497评论 11 82
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • OkHttp源码的samples的简单使用的示例: public static void main(String....
    _warren阅读 739评论 0 1
  • OkHttp解析系列 OkHttp解析(一)从用法看清原理OkHttp解析(二)网络连接OkHttp解析(三)关于...
    Hohohong阅读 20,971评论 4 58
  • OkHttp源码分析-同步篇 很早就想拿okhttp开刀了,这次就记一次使用OKhttp的网络请求。首先需要说明的...
    埃赛尔阅读 978评论 1 2