Pārlūkot izejas kodu

1, 优化json rpc 2.0的原有代码封装; 2,新增json rpc 2.0的server和client基础封装; 3, 重新编译打包基础模块

niujiuru 1 nedēļu atpakaļ
vecāks
revīzija
dfa788f859

BIN
package/rtu_linux_modules_1.0.0.1.tar.gz


BIN
package/yfkj_ssh_client_1.0.0.1.tar.gz


+ 0 - 1
readme.txt

@@ -57,5 +57,4 @@ SHELL脚本封装库, 后台守护软件运行等功能, 可以平替硬件看
 2, 判断生成的armv7hf可执行文件依赖哪些依赖库,可以在上位机上运行以下命令:
    arm-linux-gnueabihf-readelf -d ./a.out | grep NEEDED
 
-
 Written by niujiuru 2025年于郑州

+ 5 - 5
sshd/client/coupler.go

@@ -36,9 +36,9 @@ type MQTTCoupler struct {
 	cwd      string // 当前工作目录
 
 	// 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
-	cmdMu     sync.Mutex                      // 串行执行
-	pending   map[int]chan *jsonrpc2.Response // 等命令结果
-	pendingMu sync.Mutex                      // 等待结果
+	cmdMu     sync.Mutex                        // 串行执行锁
+	pending   map[int64]chan *jsonrpc2.Response // 等命令结果
+	pendingMu sync.Mutex                        // 等待结果锁
 
 	interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号
 }
@@ -78,7 +78,7 @@ func (c *MQTTCoupler) init2() error {
 		}
 	}
 
-	c.pending = make(map[int]chan *jsonrpc2.Response)
+	c.pending = make(map[int64]chan *jsonrpc2.Response)
 
 	c.client = mqtt.NewClient(opts)
 	go c.keepOnline()
@@ -127,7 +127,7 @@ func (c *MQTTCoupler) connect() error {
 	return token.Error()
 }
 
-func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (*jsonrpc2.Response, error) {
+func (c *MQTTCoupler) doCmd(method string, params any, id ...int64) (*jsonrpc2.Response, error) {
 	zero := &jsonrpc2.Response{}
 
 	if c.needSerialize(method) {

+ 0 - 14
sshd/protocol.go

@@ -5,23 +5,9 @@ import (
 	"fmt"
 	"strings"
 
-	"hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
 	"hnyfkj.com.cn/rtu/linux/utils/shell"
 )
 
-func buildResp(req *jsonrpc2.Request, result any, err error) *jsonrpc2.Response {
-	if err != nil {
-		return jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error())
-	}
-
-	resp, err := jsonrpc2.BuildResult(req, result)
-	if err != nil {
-		return jsonrpc2.BuildError(req, jsonrpc2.ErrInternal, "")
-	}
-
-	return resp
-}
-
 func extractShellExecuteParams(params json.RawMessage) (shell.ExecuteParams, error) {
 	if len(params) == 0 {
 		return shell.ExecuteParams{}, fmt.Errorf("missing params")

+ 5 - 5
sshd/sshd.go

@@ -267,7 +267,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 	switch req.Method {
 	// Call-1: 心跳, 链路检测,"ping-pong"测试
 	case "executor.ping":
-		resp = buildResp(req, "pong", nil)
+		resp = jsonrpc2.BuildResponse(req, "pong", nil)
 		goto retp
 	// Call-2:在本地shell中执行远程下发的指令
 	case "executor.exec":
@@ -276,7 +276,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 		params, err := extractShellExecuteParams(req.Params)
 		if err != nil {
 			ce.mu.Unlock()
-			resp = jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error())
+			resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
 			goto retp
 		}
 
@@ -337,7 +337,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 		}
 		ce.mu.Unlock()
 
-		resp = buildResp(req, result, err)
+		resp = jsonrpc2.BuildResponse(req, result, err)
 		goto retp
 	// Call-3:中断本地shell的执行,等价Ctrl+C
 	case "executor.interrupt":
@@ -351,7 +351,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 		}
 
 		err := ce.executor.Interrupt()
-		resp = buildResp(req, "interrupted", err)
+		resp = jsonrpc2.BuildResponse(req, "interrupted", err)
 		goto retp
 	// Call-4:客户端安全退出, 释放本地的执行器
 	case "executor.close":
@@ -362,7 +362,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 		baseapp.Logger.Infof("[%s] 客户端 %s 退出成功", MODULE_NAME, clientID)
 		c.executorMapMu.Unlock()
 
-		resp = buildResp(req, "closed", err)
+		resp = jsonrpc2.BuildResponse(req, "closed", err)
 		goto retp
 	// Call-?:无效, 远端调用了还不支持的-方法
 	default:

+ 192 - 0
utils/jsonrpc2/client.go

@@ -0,0 +1,192 @@
+// Package jsonrpc2 provides a minimal JSON-RPC 2.0 HTTP client.
+//
+// The client is designed to work with the jsonrpc2 server implementation.
+// It supports only single JSON-RPC 2.0 requests and notifications.
+// Batch requests are explicitly unsupported.
+//
+// All request lifetimes are controlled by context.Context.
+//
+// Author: NiuJiuRu
+// Email: niujiuru@qq.com
+//
+// See: https://www.jsonrpc.org/specification
+
+package jsonrpc2
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"math"
+	"net/http"
+	"net/url"
+	"sync/atomic"
+)
+
+// ClientOption configures an RPCClient.
+type ClientOption func(*RPCClient)
+
+// WithHTTPClient sets a custom http.Client.
+func WithHTTPClient(hc *http.Client) ClientOption {
+	return func(c *RPCClient) {
+		if hc != nil {
+			c.http = hc
+		}
+	}
+}
+
+// SetHeader sets a custom HTTP header for all requests.
+func SetHeader(key, value string) ClientOption {
+	return func(c *RPCClient) {
+		if key == "" {
+			return
+		}
+		if c.headers == nil {
+			c.headers = make(http.Header)
+		}
+		c.headers.Set(key, value)
+	}
+}
+
+// RPCClient is a minimal JSON-RPC 2.0 HTTP client.
+type RPCClient struct {
+	baseURL string
+	http    *http.Client
+	headers http.Header
+	seq     atomic.Int64
+}
+
+// NewRPCClient creates a new JSON-RPC 2.0 client.
+//
+// The client is minimal, safe for concurrent use, and designed to work
+// with standard JSON-RPC 2.0 HTTP servers.
+//
+// Example:
+//
+//	// Create a client with default settings.
+//	client := jsonrpc2.NewRPCClient("http://localhost:8080/rpc")
+//
+//	// Create a client with a custom HTTP client and headers.
+//	hc := &http.Client{
+//		Timeout: 5 * time.Second,
+//	}
+//
+//	client := jsonrpc2.NewRPCClient(
+//		"http://localhost:8080/rpc",
+//		jsonrpc2.WithHTTPClient(hc),
+//		jsonrpc2.SetHeader("Authorization", "Bearer token"),
+//		jsonrpc2.SetHeader("X-Request-ID", "req-001"),
+//		jsonrpc2.SetHeader("User-Agent", "jsonrpc2-client/1.0"),
+//	)
+//
+//	ctx := context.Background()
+//	resp, err := client.Call(ctx, "add", []int{1, 2})
+//
+// Options are applied in order.
+// If no http.Client is provided, http.DefaultClient is used.
+func NewRPCClient(baseURL string, opts ...ClientOption) (*RPCClient, error) {
+	if baseURL == "" {
+		return nil, fmt.Errorf("baseURL cannot be empty")
+	}
+
+	_, err := url.Parse(baseURL)
+	if err != nil {
+		return nil, fmt.Errorf("invalid baseURL: %v", baseURL)
+	}
+
+	c := &RPCClient{
+		baseURL: baseURL,
+		http:    http.DefaultClient,
+		headers: http.Header{},
+	}
+
+	for _, opt := range opts {
+		opt(c)
+	}
+	return c, nil
+}
+
+// Call invokes a JSON-RPC method and returns the response.
+func (c *RPCClient) Call(ctx context.Context, method string, params any) (*Response, error) {
+	id := c.nextID()
+
+	req, err := BuildRequest(method, params, id)
+	if err != nil {
+		return nil, fmt.Errorf("build request: %w", err)
+	}
+
+	return c.do(ctx, req)
+}
+
+// Notify sends a notification (no response expected).
+func (c *RPCClient) Notify(ctx context.Context, method string, params any) error {
+	req, err := BuildNotification(method, params)
+	if err != nil {
+		return fmt.Errorf("build notification: %w", err)
+	}
+
+	_, err = c.do(ctx, req)
+	return err
+}
+
+func (c *RPCClient) nextID() int64 {
+	for {
+		id := c.seq.Add(1)
+		if id > 0 && id <= math.MaxInt32 {
+			return id
+		}
+		if c.seq.CompareAndSwap(id, 1) {
+			return 1
+		}
+	}
+}
+
+func (c *RPCClient) do(ctx context.Context, req *Request) (*Response, error) {
+	body, err := json.Marshal(req)
+	if err != nil {
+		return nil, fmt.Errorf("marshal request: %w", err)
+	}
+
+	httpReq, err := http.NewRequestWithContext(
+		ctx,
+		http.MethodPost,
+		c.baseURL,
+		bytes.NewReader(body),
+	)
+	if err != nil {
+		return nil, fmt.Errorf("create request: %w", err)
+	}
+
+	httpReq.Header = c.headers.Clone()
+	httpReq.Header.Set("Content-Type", "application/json")
+	httpReq.Header.Set("Accept", "application/json")
+
+	resp, err := c.http.Do(httpReq)
+	if err != nil {
+		return nil, fmt.Errorf("http request: %w", err)
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+	}
+
+	if req.IsNotification() {
+		_, _ = io.Copy(io.Discard, resp.Body)
+		return nil, nil
+	}
+
+	data, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return nil, fmt.Errorf("read response: %w", err)
+	}
+
+	var rpcResp Response
+	if err := json.Unmarshal(data, &rpcResp); err != nil {
+		return nil, fmt.Errorf("unmarshal response: %w", err)
+	}
+
+	return &rpcResp, nil
+}

+ 18 - 12
utils/jsonrpc2/id.go

@@ -1,21 +1,27 @@
 package jsonrpc2
 
 import (
-	"sync"
+	"math"
+	"sync/atomic"
 )
 
-var (
-	idCounter int
-	idMu      sync.Mutex
-)
+var idCounter atomic.Int64
+
+const maxID = math.MaxInt32 // Ensures compatibility with 32-bit systems and JSON-RPC implementations
+
+// nextID() returns a positive integer for JSON-RPC request correlation.
+// The counter wraps to 1 on overflow.
+// IDs are not guaranteed to be globally unique.
+func nextID() int64 {
+	for {
+		id := idCounter.Add(1)
 
-func nextID() int {
-	idMu.Lock()
-	defer idMu.Unlock()
+		if id > 0 && id <= maxID {
+			return id
+		}
 
-	idCounter++
-	if idCounter < 0 {
-		idCounter = 1
+		if idCounter.CompareAndSwap(id, 1) {
+			return 1
+		}
 	}
-	return idCounter
 }

+ 42 - 15
utils/jsonrpc2/rpc.go

@@ -40,14 +40,14 @@ type Request struct {
 	JSONRPC string          `json:"jsonrpc"`          // 版本号, 固定: "2.0"
 	Method  string          `json:"method"`           // 调用方法, 执行函数名
 	Params  json.RawMessage `json:"params,omitempty"` // 请求参数, 可选可为空
-	ID      *int            `json:"id,omitempty"`     // 请求ID, 用于标识请求
+	ID      *int64          `json:"id,omitempty"`     // 请求ID, 用于标识请求
 }
 
 type Response struct {
 	JSONRPC string          `json:"jsonrpc"`          // 版本号, 固定: "2.0"
 	Result  json.RawMessage `json:"result,omitempty"` // 响应结果, 可选可为空
 	Error   *Error          `json:"error,omitempty"`  // 错误信息, 可选可为空
-	ID      *int            `json:"id"`               // 应答ID, 响应匹配请求
+	ID      *int64          `json:"id"`               // 应答ID, 响应匹配请求
 }
 
 // 解析请求数据
@@ -56,13 +56,13 @@ func ParseRequest(input any) (*Request, error) {
 
 	switch v := input.(type) {
 	case nil:
-		return nil, errors.New("input is nil")
+		return nil, errors.New("request data is nil")
 	case string:
 		raw = []byte(v)
 	case []byte:
 		raw = v
 	default:
-		return nil, errors.New("unsupported input type: must be string or []byte")
+		return nil, errors.New("unsupported request data type: must be string or []byte")
 	}
 
 	var req Request
@@ -71,11 +71,11 @@ func ParseRequest(input any) (*Request, error) {
 	}
 
 	if req.JSONRPC != "2.0" {
-		return nil, errors.New(`"jsonrpc" must be "2.0"`)
+		return nil, errors.New("invalid JSON-RPC version")
 	}
 
 	if req.Method == "" {
-		return nil, errors.New("method must be non-empty")
+		return nil, errors.New("method cannot be empty")
 	}
 
 	return &req, nil
@@ -87,13 +87,13 @@ func ParseResponse(input any) (*Response, error) {
 
 	switch v := input.(type) {
 	case nil:
-		return nil, errors.New("input is nil")
+		return nil, errors.New("response data is nil")
 	case string:
 		raw = []byte(v)
 	case []byte:
 		raw = v
 	default:
-		return nil, errors.New("unsupported input type: must be string or []byte")
+		return nil, errors.New("unsupported response data type: must be string or []byte")
 	}
 
 	var resp Response
@@ -102,7 +102,7 @@ func ParseResponse(input any) (*Response, error) {
 	}
 
 	if resp.JSONRPC != "2.0" {
-		return nil, errors.New(`"jsonrpc" must be "2.0"`)
+		return nil, errors.New("invalid JSON-RPC version")
 	}
 
 	// 情况 1:result 和 error 同时为空 → 错误
@@ -124,7 +124,10 @@ func ParseResponse(input any) (*Response, error) {
 	}
 
 	// 情况 4:错误时的应答 → 必须有id&且不能空
-	if resp.Error.Code == ErrParse { // 特例: Parse error (-32700)时
+	if resp.Error != nil && resp.Error.Code == ErrParse { // 当发生解析错误时, "id"必须为: nil
+		if resp.ID != nil {
+			return nil, errors.New(`"id" must be null for parse error`)
+		}
 		return &resp, nil
 	}
 
@@ -154,9 +157,9 @@ func marshalJSON(input any) (json.RawMessage, error) {
 }
 
 // 构建一个请求
-func BuildRequest(method string, params any, id ...int) (*Request, error) {
+func BuildRequest(method string, params any, id ...int64) (*Request, error) {
 	if method == "" {
-		return nil, errors.New("method must be non-empty")
+		return nil, errors.New("method cannot be empty")
 	}
 
 	raw, err := marshalJSON(params)
@@ -164,7 +167,7 @@ func BuildRequest(method string, params any, id ...int) (*Request, error) {
 		return nil, err
 	}
 
-	var rid int
+	var rid int64
 	if len(id) > 0 {
 		rid = id[0]
 	} else {
@@ -187,7 +190,7 @@ func BuildRequest(method string, params any, id ...int) (*Request, error) {
 // 构建一个通知
 func BuildNotification(method string, params any) (*Request, error) {
 	if method == "" {
-		return nil, errors.New("method must be non-empty")
+		return nil, errors.New("method cannot be empty")
 	}
 
 	raw, err := marshalJSON(params)
@@ -231,12 +234,16 @@ func BuildResult(req *Request, result any) (*Response, error) {
 
 // 构建错误应答
 func BuildError(req *Request, code ErrCode, message string) *Response {
-	id := (*int)(nil)
+	id := (*int64)(nil)
 
 	if req != nil {
 		id = req.ID
 	}
 
+	if code == ErrParse {
+		id = nil
+	}
+
 	emsg := message
 	if emsg == "" {
 		emsg = errMessages[code]
@@ -252,6 +259,26 @@ func BuildError(req *Request, code ErrCode, message string) *Response {
 	}
 }
 
+// 构建一个应答
+// 有错误时, result会被忽略; 没有错误时, error会被忽略
+func BuildResponse(req *Request, result any, err error) *Response {
+	if err != nil {
+		return BuildError(req, ErrInternal, err.Error())
+	}
+
+	resp, err := BuildResult(req, result)
+	if err != nil {
+		return BuildError(req, ErrInternal, "")
+	}
+
+	return resp
+}
+
+// 是否通知消息
+func (req *Request) IsNotification() bool {
+	return req.ID == nil
+}
+
 // 请求转字符串
 func (req *Request) String() (string, error) {
 	b, err := json.Marshal(req)

+ 328 - 0
utils/jsonrpc2/server.go

@@ -0,0 +1,328 @@
+// Package jsonrpc2 implements a minimal JSON-RPC 2.0 server over HTTP.
+//
+// This server supports only single JSON-RPC 2.0 requests.
+// Batch requests (arrays of requests) are explicitly unsupported and
+// will be rejected with an Invalid Request (-32600) error.
+//
+// Author: NiuJiuRu
+// Email: niujiuru@qq.com
+
+// See: https://www.jsonrpc.org/specification
+
+package jsonrpc2
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"maps"
+	"net/http"
+	"runtime/debug"
+	"sort"
+	"strings"
+	"sync"
+	"sync/atomic"
+
+	"github.com/sirupsen/logrus"
+)
+
+const (
+	defaultMaxBodySize = 1 << 20 // 1MB
+)
+
+// MethodHandler defines the execution logic for a JSON-RPC method.
+type MethodHandler func(ctx context.Context, input *Request) *Response
+
+// MethodMap maps JSON-RPC method names to handlers.
+type MethodMap map[string]MethodHandler
+
+// RPCServer implements a JSON-RPC 2.0 HTTP server.
+//
+// Request processing rules:
+//   - Only individual (non-batch) requests are supported.
+//   - Batch requests are explicitly disallowed and rejected.
+//   - Notifications (requests without an id) are supported and do not produce a response.
+//
+// Handler execution model:
+//   - Each handler runs with a context that is canceled when the server stops.
+//   - No request-level timeout is enforced; handlers may run indefinitely.
+//   - Handlers are expected to respect context cancellation.
+//
+// Built-in behavior:
+//   - A built-in "ping" method is provided for health checks.
+//   - The server is thread-safe and supports concurrent request handling.
+//   - Maximum request body size can be configured to prevent abuse.
+//   - Panics in handlers are recovered and converted into internal error responses.
+//   - Incoming requests and outgoing responses are logged for debugging.
+type RPCServer struct {
+	name        string // 服务器名称, 用于日志的打印前缀
+	ctx         context.Context
+	cancel      context.CancelFunc
+	mu          sync.RWMutex // 保护 handlers的并发访问
+	handlers    MethodMap
+	maxBodySize atomic.Int64 // 允许的最大请求负载字节数
+	logger      *logrus.Logger
+}
+
+// NewRPCServer creates and initializes a JSON-RPC 2.0 server.
+func NewRPCServer(name string, logger *logrus.Logger) (*RPCServer, error) {
+	if logger == nil {
+		return nil, errors.New("logger cannot be nil")
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	s := &RPCServer{
+		name:     name,
+		ctx:      ctx,
+		cancel:   cancel,
+		handlers: make(MethodMap),
+		logger:   logger,
+	}
+
+	s.maxBodySize.Store(defaultMaxBodySize)
+
+	s.handlers["ping"] = s.handlePing // 注册内置方法, 例如: ping -> pong
+
+	return s, nil
+}
+
+// Stop gracefully shuts down the server.
+// It is safe to call multiple times.
+func (s *RPCServer) Stop() {
+	s.cancel()
+}
+
+// SetMaxBodySize sets the maximum allowed request body size.
+func (s *RPCServer) SetMaxBodySize(size int64) {
+	if size > 0 {
+		s.maxBodySize.Store(size)
+	}
+}
+
+// RegisterMethods registers multiple JSON-RPC methods atomically.
+// Either all methods are registered successfully, or none are registered if validation fails.
+func (s *RPCServer) RegisterMethods(methods MethodMap) error {
+	if len(methods) == 0 {
+		return errors.New("methods map cannot be empty")
+	}
+
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	tmp := make(MethodMap, len(s.handlers)+len(methods))
+	maps.Copy(tmp, s.handlers)
+
+	for name, handler := range methods {
+		if name == "" {
+			return errors.New("method name cannot be empty, all methods must have a valid name")
+		}
+		if handler == nil {
+			return fmt.Errorf("method %q has nil handler", name)
+		}
+		if _, exists := tmp[name]; exists {
+			return fmt.Errorf("method %q already registered", name)
+		}
+		tmp[name] = handler
+	}
+
+	s.handlers = tmp
+	return nil
+}
+
+// ListMethods returns all registered JSON-RPC method names.
+// The returned list is sorted alphabetically for deterministic output.
+func (s *RPCServer) ListMethods() []string {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+
+	methods := make([]string, 0, len(s.handlers))
+	for m := range s.handlers {
+		methods = append(methods, m)
+	}
+
+	sort.Strings(methods)
+	return methods
+}
+
+// ServeHTTP handles incoming HTTP requests.
+//
+// Example:
+//
+//	server := jsonrpc2.NewRPCServer("MyRPCServer", baseapp.Logger)
+//	http.Handle("/rpc", server)
+//	http.ListenAndServe(":8080", nil)
+//
+// Example usage from a client:
+//
+//	// Standard request (expects a response)
+//	curl -X POST http://localhost:8080/rpc \
+//	  -H "Content-Type: application/json" \
+//	  -d '{"jsonrpc":"2.0","method":"ping","id":1}'
+//
+//	// Notification, no response is returned
+//	curl -X POST http://localhost:8080/rpc \
+//	  -H "Content-Type: application/json" \
+//	  -d '{"jsonrpc":"2.0","method":"ping"}'
+func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	ctx, cancel := context.WithCancel(r.Context()) // per-request context
+	defer cancel()
+
+	stop := context.AfterFunc(s.ctx, cancel) // tie request lifetime to server shutdown(Go >= 1.21)
+	defer stop()
+
+	remoteAddr := r.Header.Get("X-Forwarded-For")
+	if remoteAddr != "" {
+		parts := strings.Split(remoteAddr, ",")
+		remoteAddr = strings.TrimSpace(parts[0])
+	} else {
+		remoteAddr = r.RemoteAddr
+	}
+
+	if r.Method != http.MethodPost {
+		s.logger.Warnf(
+			"[%s] invalid http method | remote=%s | method=%s",
+			s.name, remoteAddr, r.Method,
+		)
+		http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	r.Body = http.MaxBytesReader(w, r.Body, s.maxBodySize.Load())
+	body, err := io.ReadAll(r.Body)
+	if err != nil {
+		s.logger.Warnf(
+			"[%s] failed to read request body | remote=%s | err=%v",
+			s.name, remoteAddr, err,
+		)
+		http.Error(w, "Failed to read body", http.StatusBadRequest)
+		return
+	}
+
+	if isBatchRequest(body) {
+		s.logger.Warnf(
+			"[%s] batch request rejected | remote=%s",
+			s.name, remoteAddr,
+		)
+		resp := BuildError(nil, ErrInvalidRequest, "Batch requests are not supported")
+		s.writeResponse(w, resp, remoteAddr)
+		return
+	}
+
+	req, err := ParseRequest(body)
+	if err != nil {
+		s.logger.Warnf(
+			"[%s] failed to parse RPC request | remote=%s | err=%v",
+			s.name, remoteAddr, err,
+		)
+		resp := BuildError(nil, ErrParse, err.Error())
+		s.writeResponse(w, resp, remoteAddr)
+		return
+	}
+
+	s.logger.Debugf(
+		"[%s] RPC request received | remote=%s | payload=%s",
+		s.name, remoteAddr, string(body),
+	)
+
+	handler, ok := s.getHandler(req.Method)
+	if !ok {
+		s.logger.Warnf(
+			"[%s] method not found | remote=%s | method=%s",
+			s.name, remoteAddr, req.Method,
+		)
+		resp := BuildError(req, ErrMethodNotFound, "Method not found")
+		s.writeResponse(w, resp, remoteAddr)
+		return
+	}
+
+	var resp *Response
+	func() {
+		defer func() {
+			if r := recover(); r != nil {
+				stack := string(debug.Stack())
+				s.logger.Errorf(
+					"[%s] handler panic | remote=%s | method=%s | panic=%v | stack=%s",
+					s.name, remoteAddr, req.Method, r, stack,
+				)
+				resp = BuildError(req, ErrInternal, "Internal server error")
+			}
+		}()
+
+		resp = handler(ctx, req)
+	}() // 闭包处理, 捕获handler中的panic, 转换为JSON-RPC错误
+
+	if req.IsNotification() { // 如果是通知消息, 则不需要响应
+		s.logger.Debugf(
+			"[%s] RPC notification received | remote=%s | method=%s",
+			s.name, remoteAddr, req.Method,
+		)
+		return
+	}
+
+	if resp == nil { // handler没有返回结果, 则构建一个空响应
+		s.logger.Warnf(
+			"[%s] handler returned nil response | remote=%s | method=%s",
+			s.name, remoteAddr, req.Method,
+		)
+		resp = BuildResponse(req, nil, nil)
+	}
+
+	s.writeResponse(w, resp, remoteAddr)
+}
+
+func (s *RPCServer) handlePing(ctx context.Context, req *Request) *Response {
+	return BuildResponse(req, "pong", nil)
+}
+
+func isBatchRequest(body []byte) bool {
+	var raw any
+	if err := json.Unmarshal(body, &raw); err != nil {
+		return false
+	}
+	_, ok := raw.([]any)
+	return ok
+}
+
+func (s *RPCServer) getHandler(method string) (MethodHandler, bool) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	h, ok := s.handlers[method]
+	return h, ok
+}
+
+func (s *RPCServer) writeResponse(w http.ResponseWriter, resp *Response, remoteAddr string) {
+	if resp == nil {
+		w.WriteHeader(http.StatusNoContent)
+		return
+	}
+
+	data, err := json.Marshal(resp)
+	if err != nil {
+		s.logger.Errorf(
+			"[%s] failed to marshal json-rpc response | remote=%s | err=%v",
+			s.name, remoteAddr, err,
+		)
+		http.Error(w, "Failed to encode response", http.StatusInternalServerError)
+		return
+	}
+
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusOK)
+
+	if _, err := w.Write(data); err != nil {
+		s.logger.Errorf(
+			"[%s] failed to write response | remote=%s | err=%v",
+			s.name, remoteAddr, err,
+		)
+	}
+
+	s.logger.Debugf(
+		"[%s] RPC response sent | remote=%s | payload=%s",
+		s.name, remoteAddr, string(data),
+	)
+}