// Package jsonrpc2 implements a minimal JSON-RPC 2.0 server over HTTP. // // This server supports only single JSON-RPC 2.0 requests. // Batch requests (arrays of requests) are explicitly unsupported and // will be rejected with an Invalid Request (-32600) error. // // Author: NiuJiuRu // Email: niujiuru@qq.com // See: https://www.jsonrpc.org/specification package jsonrpc2 import ( "context" "encoding/json" "errors" "fmt" "io" "maps" "net/http" "runtime/debug" "sort" "strings" "sync" "sync/atomic" "github.com/sirupsen/logrus" ) const ( defaultMaxBodySize = 1 << 20 // 1MB ) // MethodHandler defines the execution logic for a JSON-RPC method. type MethodHandler func(ctx context.Context, input *Request) *Response // MethodMap maps JSON-RPC method names to handlers. type MethodMap map[string]MethodHandler // RPCServer implements a JSON-RPC 2.0 HTTP server. // // Request processing rules: // - Only individual (non-batch) requests are supported. // - Batch requests are explicitly disallowed and rejected. // - Notifications (requests without an id) are supported and do not produce a response. // // Handler execution model: // - Each handler runs with a context that is canceled when the server stops. // - No request-level timeout is enforced; handlers may run indefinitely. // - Handlers are expected to respect context cancellation. // // Built-in behavior: // - A built-in "ping" method is provided for health checks. // - The server is thread-safe and supports concurrent request handling. // - Maximum request body size can be configured to prevent abuse. // - Panics in handlers are recovered and converted into internal error responses. // - Incoming requests and outgoing responses are logged for debugging. type RPCServer struct { name string // 服务器名称, 用于日志的打印前缀 ctx context.Context cancel context.CancelFunc mu sync.RWMutex // 保护 handlers的并发访问 handlers MethodMap maxBodySize atomic.Int64 // 允许的最大请求负载字节数 logger *logrus.Logger } // NewRPCServer creates and initializes a JSON-RPC 2.0 server. func NewRPCServer(name string, logger *logrus.Logger) (*RPCServer, error) { if logger == nil { return nil, errors.New("logger cannot be nil") } ctx, cancel := context.WithCancel(context.Background()) s := &RPCServer{ name: name, ctx: ctx, cancel: cancel, handlers: make(MethodMap), logger: logger, } s.maxBodySize.Store(defaultMaxBodySize) s.handlers["ping"] = s.handlePing // 注册内置方法, 例如: ping -> pong return s, nil } // Stop gracefully shuts down the server. // It is safe to call multiple times. func (s *RPCServer) Stop() { s.cancel() } // SetMaxBodySize sets the maximum allowed request body size. func (s *RPCServer) SetMaxBodySize(size int64) { if size > 0 { s.maxBodySize.Store(size) } } // RegisterMethods registers multiple JSON-RPC methods atomically. // Either all methods are registered successfully, or none are registered if validation fails. func (s *RPCServer) RegisterMethods(methods MethodMap) error { if len(methods) == 0 { return errors.New("methods map cannot be empty") } s.mu.Lock() defer s.mu.Unlock() tmp := make(MethodMap, len(s.handlers)+len(methods)) maps.Copy(tmp, s.handlers) for name, handler := range methods { if name == "" { return errors.New("method name cannot be empty, all methods must have a valid name") } if handler == nil { return fmt.Errorf("method %q has nil handler", name) } if _, exists := tmp[name]; exists { return fmt.Errorf("method %q already registered", name) } tmp[name] = handler } s.handlers = tmp return nil } // ListMethods returns all registered JSON-RPC method names. // The returned list is sorted alphabetically for deterministic output. func (s *RPCServer) ListMethods() []string { s.mu.RLock() defer s.mu.RUnlock() methods := make([]string, 0, len(s.handlers)) for m := range s.handlers { methods = append(methods, m) } sort.Strings(methods) return methods } // ServeHTTP handles incoming HTTP requests. // // Example: // // server := jsonrpc2.NewRPCServer("MyRPCServer", baseapp.Logger) // http.Handle("/rpc", server) // http.ListenAndServe(":8080", nil) // // Example usage from a client: // // // Standard request (expects a response) // curl -X POST http://localhost:8080/rpc \ // -H "Content-Type: application/json" \ // -d '{"jsonrpc":"2.0","method":"ping","id":1}' // // // Notification, no response is returned // curl -X POST http://localhost:8080/rpc \ // -H "Content-Type: application/json" \ // -d '{"jsonrpc":"2.0","method":"ping"}' func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() ctx, cancel := context.WithCancel(r.Context()) // per-request context defer cancel() stop := context.AfterFunc(s.ctx, cancel) // tie request lifetime to server shutdown(Go >= 1.21) defer stop() remoteAddr := r.Header.Get("X-Forwarded-For") if remoteAddr != "" { parts := strings.Split(remoteAddr, ",") remoteAddr = strings.TrimSpace(parts[0]) } else { remoteAddr = r.RemoteAddr } if r.Method != http.MethodPost { s.logger.Warnf( "[%s] invalid http method | remote=%s | method=%s", s.name, remoteAddr, r.Method, ) http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed) return } r.Body = http.MaxBytesReader(w, r.Body, s.maxBodySize.Load()) body, err := io.ReadAll(r.Body) if err != nil { s.logger.Warnf( "[%s] failed to read request body | remote=%s | err=%v", s.name, remoteAddr, err, ) http.Error(w, "Failed to read body", http.StatusBadRequest) return } if isBatchRequest(body) { s.logger.Warnf( "[%s] batch request rejected | remote=%s", s.name, remoteAddr, ) resp := BuildError(nil, ErrInvalidRequest, "Batch requests are not supported") s.writeResponse(w, resp, remoteAddr) return } req, err := ParseRequest(body) if err != nil { s.logger.Warnf( "[%s] failed to parse RPC request | remote=%s | err=%v", s.name, remoteAddr, err, ) resp := BuildError(nil, ErrParse, err.Error()) s.writeResponse(w, resp, remoteAddr) return } s.logger.Debugf( "[%s] RPC request received | remote=%s | payload=%s", s.name, remoteAddr, string(body), ) handler, ok := s.getHandler(req.Method) if !ok { s.logger.Warnf( "[%s] method not found | remote=%s | method=%s", s.name, remoteAddr, req.Method, ) resp := BuildError(req, ErrMethodNotFound, "Method not found") s.writeResponse(w, resp, remoteAddr) return } var resp *Response func() { defer func() { if r := recover(); r != nil { stack := string(debug.Stack()) s.logger.Errorf( "[%s] handler panic | remote=%s | method=%s | panic=%v | stack=%s", s.name, remoteAddr, req.Method, r, stack, ) resp = BuildError(req, ErrInternal, "Internal server error") } }() resp = handler(ctx, req) }() // 闭包处理, 捕获handler中的panic, 转换为JSON-RPC错误 if req.IsNotification() { // 如果是通知消息, 则不需要响应 s.logger.Debugf( "[%s] RPC notification received | remote=%s | method=%s", s.name, remoteAddr, req.Method, ) return } if resp == nil { // handler没有返回结果, 则构建一个空响应 s.logger.Warnf( "[%s] handler returned nil response | remote=%s | method=%s", s.name, remoteAddr, req.Method, ) resp = BuildResponse(req, nil, nil) } s.writeResponse(w, resp, remoteAddr) } func (s *RPCServer) handlePing(ctx context.Context, req *Request) *Response { return BuildResponse(req, "pong", nil) } func isBatchRequest(body []byte) bool { var raw any if err := json.Unmarshal(body, &raw); err != nil { return false } _, ok := raw.([]any) return ok } func (s *RPCServer) getHandler(method string) (MethodHandler, bool) { s.mu.RLock() defer s.mu.RUnlock() h, ok := s.handlers[method] return h, ok } func (s *RPCServer) writeResponse(w http.ResponseWriter, resp *Response, remoteAddr string) { if resp == nil { w.WriteHeader(http.StatusNoContent) return } data, err := json.Marshal(resp) if err != nil { s.logger.Errorf( "[%s] failed to marshal json-rpc response | remote=%s | err=%v", s.name, remoteAddr, err, ) http.Error(w, "Failed to encode response", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) if _, err := w.Write(data); err != nil { s.logger.Errorf( "[%s] failed to write response | remote=%s | err=%v", s.name, remoteAddr, err, ) } s.logger.Debugf( "[%s] RPC response sent | remote=%s | payload=%s", s.name, remoteAddr, string(data), ) }