Skip to content

fix(devti): 优化 SSE 数据解析逻辑 #368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,68 +59,70 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
reader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
var line: String? = null
var sse: SSE? = null
while (!emitter.isCancelled && reader.readLine().also { line = it } != null) {
sse = when {
line!!.startsWith("data:") -> {
val data = line!!.substring(5).trim { it <= ' ' }
SSE(data)
}

line == "" && sse != null -> {
if (sse.isDone) {
if (emitDone) {
emitter.onNext(sse)
}
break
//用于兼容非标准格式的SSE:一条数据拆成了多行,每行都以data:开头
val dataBuilder = StringBuilder() // 用于合并多行 data 内容
while (!emitter.isCancelled && reader.readLine().also { line = it } != null) {
if (line!!.startsWith("data:")) {
val dataPart = line!!.substring(5).trim { it <= ' ' }
dataBuilder.append(dataPart) // 追加 data 内容
} else if (line == "" && dataBuilder.isNotEmpty()) {
// 遇到空行且有累积的 data 内容,创建 SSE 对象
val data = dataBuilder.toString()
sse = SSE(data)
if (sse.isDone) {
if (emitDone) {
emitter.onNext(sse)
}
emitter.onNext(sse)
null
break
}
// starts with event:
line!!.startsWith("event:") -> {
// https://github.com./sysid/sse-starlette/issues/16
val eventName = line!!.substring(6).trim { it <= ' ' }
if (eventName == "ping") {
// skip ping event and data
emitter.onNext(sse ?: SSE(""))
emitter.onNext(sse ?: SSE(""))
}
emitter.onNext(sse)
dataBuilder.clear() // 清空 data 内容
sse = null
} else {
// 其他情况,按照原逻辑处理
sse = when {
// starts with event:
line!!.startsWith("event:") -> {
// https://github.com./sysid/sse-starlette/issues/16
val eventName = line!!.substring(6).trim { it <= ' ' }
if (eventName == "ping") {
// skip ping event and data
emitter.onNext(sse ?: SSE(""))
emitter.onNext(sse ?: SSE(""))
}

null
}
null
}

// skip `: ping` comments for: https://github.com./sysid/sse-starlette/issues/16
line!!.startsWith(": ping") -> {
null
}
// skip `: ping` comments for: https://github.com./sysid/sse-starlette/issues/16
line!!.startsWith(": ping") -> {
null
}

else -> {
when {
// sometimes the server maybe returns empty line
line == "" -> {
null
}

// : is comment
// https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
line!!.startsWith(":") -> {
null
}
// : is comment
// https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
line!!.startsWith(":") -> {
null
}

line.startsWith("{") && line.endsWith("}") -> {
emitter.onNext(SSE(line))
emitter.onComplete()
return
}
line.startsWith("{") && line.endsWith("}") -> {
emitter.onNext(SSE(line))
emitter.onComplete()
return
}

else -> {
throw AutoDevHttpException("Invalid sse format! '$line'", response.code)
}
else -> {
throw AutoDevHttpException("Invalid sse format! '$line'", response.code)
}
}
}
}

emitter.onComplete()
} catch (t: Throwable) {
logger<ResponseBodyCallback>().error("Error while reading SSE", t)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cc.unitmesh.devti.language.highlight

import cc.unitmesh.devti.language.psi.DevInTypes
import com.intellij.lang.annotation.AnnotationHolder
import com.intellij.lang.annotation.Annotator
import com.intellij.lang.annotation.HighlightSeverity
import com.intellij.openapi.editor.DefaultLanguageHighlighterColors
import com.intellij.psi.PsiElement
import com.intellij.psi.util.PsiUtilCore

class DevInHighlightingAnnotator : Annotator {
override fun annotate(element: PsiElement, holder: AnnotationHolder) {
when (PsiUtilCore.getElementType(element)) {
DevInTypes.IDENTIFIER -> {
holder.newSilentAnnotation(HighlightSeverity.INFORMATION)
.textAttributes(DefaultLanguageHighlighterColors.IDENTIFIER)
.create()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
<lang.syntaxHighlighterFactory language="DevIn"
implementationClass="cc.unitmesh.devti.language.highlight.DevInSyntaxHighlighterFactory"/>

<annotator language="Shire" implementationClass="cc.unitmesh.devti.language.highlight.DevInHighlightingAnnotator"/>

<lang.ast.factory language="DevIn"
implementationClass="cc.unitmesh.devti.language.DevInAstFactory"/>

Expand Down