class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange() val request = realChain.request() val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() // 写入请求头 exchange.writeRequestHeaders(request)
var invokeStartEvent = true var responseBuilder: Response.Builder? = null // 针对支持body的请求,且body不为空的情况进行请求体写入 if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // 对 “100-continue” 进行处理 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } if (responseBuilder == null) { if (requestBody.isDuplex()) { // 写入请求体 exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection()!!.isMultiplexed) { exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() }
fun finishRequest() { try { codec.finishRequest() // 执行请求写入 } catch (e: IOException) { eventListener.requestFailed(call, e) trackFailure(e) throw e } }
读取响应头
Exchange.readResponseHeaders
1 2 3 4 5 6 7 8 9 10 11
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? { try { val result = codec.readResponseHeaders(expectContinue) result?.initExchange(this) return result } catch (e: IOException) { eventListener.responseFailed(call, e) trackFailure(e) throw e } }
读取响应体
Exchange.openResponseBody
创建响应体封装实例,含有响应体的读取流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
fun openResponseBody(response: Response): ResponseBody { try { // 数据类型 val contentType = response.header("Content-Type") // 数据大小 val contentLength = codec.reportedContentLength(response) // 编码器创建读取原生流 val rawSource = codec.openResponseBodySource(response) // 创建响应体的Source val source = ResponseBodySource(rawSource, contentLength) return RealResponseBody(contentType, contentLength, source.buffer()) } catch (e: IOException) { eventListener.responseFailed(call, e) trackFailure(e) throw e } }
HTTP1.x对应实现
Http1ExchangeCodec.writeRequestHeaders
创建RequestLine,执行写入headers方法
1 2 3 4 5 6 7
override fun writeRequestHeaders(request: Request) { // 获取RequestLine实例,进行request输入 val requestLine = RequestLine.get( request, realConnection!!.route().proxy.type()) // 写入headers writeRequest(request.headers, requestLine) }
Http1ExchangeCodec.writeRequest
通过Okio的Sink实例 写入headers
1 2 3 4 5 6 7 8 9 10 11 12 13
fun writeRequest(headers: Headers, requestLine: String) { check(state == STATE_IDLE) { "state: $state" } sink.writeUtf8(requestLine).writeUtf8("\r\n") // 写入headers for (i in 0 until headers.size) { sink.writeUtf8(headers.name(i)) .writeUtf8(": ") .writeUtf8(headers.value(i)) .writeUtf8("\r\n") } sink.writeUtf8("\r\n") state = STATE_OPEN_REQUEST_BODY }
Http1ExchangeCodec.createRequestBody
1 2 3 4 5 6 7 8 9 10 11
override fun createRequestBody(request: Request, contentLength: Long): Sink { return when { request.body != null && request.body.isDuplex() -> throw ProtocolException( "Duplex connections are not supported for HTTP/1") request.isChunked() -> newChunkedSink() // 创建未知大小的SInk contentLength != -1L -> newKnownLengthSink() // 创建已知大小的Sink else -> // Stream a request body of a known length. throw IllegalStateException( "Cannot stream a request body without chunked encoding or a known content length!") } }
private fun newStream( associatedStreamId: Int, requestHeaders: List<Header>, out: Boolean ): Http2Stream { val outFinished = !out val inFinished = false val flushHeaders: Boolean val stream: Http2Stream val streamId: Int
Emit a single data frame to the connection. The frame’s size be limited by this stream’s write window. This method will block until the write window is nonempty.
fun nextFrame(requireSettings: Boolean, handler: Handler): Boolean { try { source.require(9) // Frame header size. } catch (e: EOFException) { return false // This might be a normal socket close. }
val length = source.readMedium() if (length > INITIAL_MAX_FRAME_SIZE) { throw IOException("FRAME_SIZE_ERROR: $length") } val type = source.readByte() and 0xff if (requireSettings && type != TYPE_SETTINGS) { throw IOException("Expected a SETTINGS frame but was $type") } val flags = source.readByte() and 0xff val streamId = source.readInt() and 0x7fffffff // Ignore reserved bit. if (logger.isLoggable(FINE)) logger.fine(frameLog(true, streamId, length, type, flags)) // 读取不同类型数据 when (type) { TYPE_DATA -> readData(handler, length, flags, streamId) TYPE_HEADERS -> readHeaders(handler, length, flags, streamId) TYPE_PRIORITY -> readPriority(handler, length, flags, streamId) TYPE_RST_STREAM -> readRstStream(handler, length, flags, streamId) TYPE_SETTINGS -> readSettings(handler, length, flags, streamId) TYPE_PUSH_PROMISE -> readPushPromise(handler, length, flags, streamId) TYPE_PING -> readPing(handler, length, flags, streamId) TYPE_GOAWAY -> readGoAway(handler, length, flags, streamId) TYPE_WINDOW_UPDATE -> readWindowUpdate(handler, length, flags, streamId) else -> source.skip(length.toLong()) // Implementations MUST discard frames of unknown types. }
return true }
1 2 3 4
```
#### FramingSource.read
override fun read(sink: Buffer, byteCount: Long): Long { require(byteCount >= 0L) { “byteCount < 0: $byteCount” }
while (true) {
var tryAgain = false
var readBytesDelivered = -1L
var errorExceptionToDeliver: IOException? = null
// 1. Decide what to do in a synchronized block.
synchronized(this@Http2Stream) {
readTimeout.enter()
try {
if (errorCode != null) {
// Prepare to deliver an error.
errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!)
}
if (closed) {
throw IOException("stream closed")
} else if (readBuffer.size > 0L) {
// Prepare to read bytes. Start by moving them to the caller's buffer.
readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size))
readBytesTotal += readBytesDelivered
val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
if (errorExceptionToDeliver == null &&
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a
// WINDOW_UPDATE if the stream isn't in error.
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead)
readBytesAcknowledged = readBytesTotal
}
} else if (!finished && errorExceptionToDeliver == null) {
// Nothing to do. Wait until that changes then try again.
waitForIo()
tryAgain = true
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
}
// 2. Do it outside of the synchronized block and timeout.
if (tryAgain) {
continue
}
if (readBytesDelivered != -1L) {
// Update connection.unacknowledgedBytesRead outside the synchronized block.
updateConnectionFlowControl(readBytesDelivered)
return readBytesDelivered
}
...
return -1L // This source is exhausted.
}
}