| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- // Package jsonrpc2 implements a minimal JSON-RPC 2.0 server over HTTP.
- //
- // This server supports only single JSON-RPC 2.0 requests.
- // Batch requests (arrays of requests) are explicitly unsupported and
- // will be rejected with an Invalid Request (-32600) error.
- //
- // Author: NiuJiuRu
- // Email: niujiuru@qq.com
- // See: https://www.jsonrpc.org/specification
- package jsonrpc2
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "maps"
- "net/http"
- "runtime/debug"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "github.com/sirupsen/logrus"
- )
- const (
- defaultMaxBodySize = 1 << 20 // 1MB
- )
- // MethodHandler defines the execution logic for a JSON-RPC method.
- type MethodHandler func(ctx context.Context, input *Request) *Response
- // MethodMap maps JSON-RPC method names to handlers.
- type MethodMap map[string]MethodHandler
- // RPCServer implements a JSON-RPC 2.0 HTTP server.
- //
- // Request processing rules:
- // - Only individual (non-batch) requests are supported.
- // - Batch requests are explicitly disallowed and rejected.
- // - Notifications (requests without an id) are supported and do not produce a response.
- //
- // Handler execution model:
- // - Each handler runs with a context that is canceled when the server stops.
- // - No request-level timeout is enforced; handlers may run indefinitely.
- // - Handlers are expected to respect context cancellation.
- //
- // Built-in behavior:
- // - A built-in "ping" method is provided for health checks.
- // - The server is thread-safe and supports concurrent request handling.
- // - Maximum request body size can be configured to prevent abuse.
- // - Panics in handlers are recovered and converted into internal error responses.
- // - Incoming requests and outgoing responses are logged for debugging.
- type RPCServer struct {
- name string // 服务器名称, 用于日志的打印前缀
- ctx context.Context
- cancel context.CancelFunc
- mu sync.RWMutex // 保护 handlers的并发访问
- handlers MethodMap
- maxBodySize atomic.Int64 // 允许的最大请求负载字节数
- logger *logrus.Logger
- }
- // NewRPCServer creates and initializes a JSON-RPC 2.0 server.
- func NewRPCServer(name string, logger *logrus.Logger) (*RPCServer, error) {
- if logger == nil {
- return nil, errors.New("logger cannot be nil")
- }
- ctx, cancel := context.WithCancel(context.Background())
- s := &RPCServer{
- name: name,
- ctx: ctx,
- cancel: cancel,
- handlers: make(MethodMap),
- logger: logger,
- }
- s.maxBodySize.Store(defaultMaxBodySize)
- s.handlers["ping"] = s.handlePing // 注册内置方法, 例如: ping -> pong
- return s, nil
- }
- // Stop gracefully shuts down the server.
- // It is safe to call multiple times.
- func (s *RPCServer) Stop() {
- s.cancel()
- }
- // SetMaxBodySize sets the maximum allowed request body size.
- func (s *RPCServer) SetMaxBodySize(size int64) {
- if size > 0 {
- s.maxBodySize.Store(size)
- }
- }
- // RegisterMethods registers multiple JSON-RPC methods atomically.
- // Either all methods are registered successfully, or none are registered if validation fails.
- func (s *RPCServer) RegisterMethods(methods MethodMap) error {
- if len(methods) == 0 {
- return errors.New("methods map cannot be empty")
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- tmp := make(MethodMap, len(s.handlers)+len(methods))
- maps.Copy(tmp, s.handlers)
- for name, handler := range methods {
- if name == "" {
- return errors.New("method name cannot be empty, all methods must have a valid name")
- }
- if handler == nil {
- return fmt.Errorf("method %q has nil handler", name)
- }
- if _, exists := tmp[name]; exists {
- return fmt.Errorf("method %q already registered", name)
- }
- tmp[name] = handler
- }
- s.handlers = tmp
- return nil
- }
- // ListMethods returns all registered JSON-RPC method names.
- // The returned list is sorted alphabetically for deterministic output.
- func (s *RPCServer) ListMethods() []string {
- s.mu.RLock()
- defer s.mu.RUnlock()
- methods := make([]string, 0, len(s.handlers))
- for m := range s.handlers {
- methods = append(methods, m)
- }
- sort.Strings(methods)
- return methods
- }
- // ServeHTTP handles incoming HTTP requests.
- //
- // Example:
- //
- // server := jsonrpc2.NewRPCServer("MyRPCServer", baseapp.Logger)
- // http.Handle("/rpc", server)
- // http.ListenAndServe(":8080", nil)
- //
- // Example usage from a client:
- //
- // // Standard request (expects a response)
- // curl -X POST http://localhost:8080/rpc \
- // -H "Content-Type: application/json" \
- // -d '{"jsonrpc":"2.0","method":"ping","id":1}'
- //
- // // Notification, no response is returned
- // curl -X POST http://localhost:8080/rpc \
- // -H "Content-Type: application/json" \
- // -d '{"jsonrpc":"2.0","method":"ping"}'
- func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- defer r.Body.Close()
- ctx, cancel := context.WithCancel(r.Context()) // per-request context
- defer cancel()
- stop := context.AfterFunc(s.ctx, cancel) // tie request lifetime to server shutdown(Go >= 1.21)
- defer stop()
- remoteAddr := r.Header.Get("X-Forwarded-For")
- if remoteAddr != "" {
- parts := strings.Split(remoteAddr, ",")
- remoteAddr = strings.TrimSpace(parts[0])
- } else {
- remoteAddr = r.RemoteAddr
- }
- if r.Method != http.MethodPost {
- s.logger.Warnf(
- "[%s] invalid http method | remote=%s | method=%s",
- s.name, remoteAddr, r.Method,
- )
- http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed)
- return
- }
- r.Body = http.MaxBytesReader(w, r.Body, s.maxBodySize.Load())
- body, err := io.ReadAll(r.Body)
- if err != nil {
- s.logger.Warnf(
- "[%s] failed to read request body | remote=%s | err=%v",
- s.name, remoteAddr, err,
- )
- http.Error(w, "Failed to read body", http.StatusBadRequest)
- return
- }
- if isBatchRequest(body) {
- s.logger.Warnf(
- "[%s] batch request rejected | remote=%s",
- s.name, remoteAddr,
- )
- resp := BuildError(nil, ErrInvalidRequest, "Batch requests are not supported")
- s.writeResponse(w, resp, remoteAddr)
- return
- }
- req, err := ParseRequest(body)
- if err != nil {
- s.logger.Warnf(
- "[%s] failed to parse RPC request | remote=%s | err=%v",
- s.name, remoteAddr, err,
- )
- resp := BuildError(nil, ErrParse, err.Error())
- s.writeResponse(w, resp, remoteAddr)
- return
- }
- s.logger.Debugf(
- "[%s] RPC request received | remote=%s | payload=%s",
- s.name, remoteAddr, string(body),
- )
- handler, ok := s.getHandler(req.Method)
- if !ok {
- s.logger.Warnf(
- "[%s] method not found | remote=%s | method=%s",
- s.name, remoteAddr, req.Method,
- )
- resp := BuildError(req, ErrMethodNotFound, "Method not found")
- s.writeResponse(w, resp, remoteAddr)
- return
- }
- var resp *Response
- func() {
- defer func() {
- if r := recover(); r != nil {
- stack := string(debug.Stack())
- s.logger.Errorf(
- "[%s] handler panic | remote=%s | method=%s | panic=%v | stack=%s",
- s.name, remoteAddr, req.Method, r, stack,
- )
- resp = BuildError(req, ErrInternal, "Internal server error")
- }
- }()
- resp = handler(ctx, req)
- }() // 闭包处理, 捕获handler中的panic, 转换为JSON-RPC错误
- if req.IsNotification() { // 如果是通知消息, 则不需要响应
- s.logger.Debugf(
- "[%s] RPC notification received | remote=%s | method=%s",
- s.name, remoteAddr, req.Method,
- )
- return
- }
- if resp == nil { // handler没有返回结果, 则构建一个空响应
- s.logger.Warnf(
- "[%s] handler returned nil response | remote=%s | method=%s",
- s.name, remoteAddr, req.Method,
- )
- resp = BuildResponse(req, nil, nil)
- }
- s.writeResponse(w, resp, remoteAddr)
- }
- func (s *RPCServer) handlePing(ctx context.Context, req *Request) *Response {
- return BuildResponse(req, "pong", nil)
- }
- func isBatchRequest(body []byte) bool {
- var raw any
- if err := json.Unmarshal(body, &raw); err != nil {
- return false
- }
- _, ok := raw.([]any)
- return ok
- }
- func (s *RPCServer) getHandler(method string) (MethodHandler, bool) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- h, ok := s.handlers[method]
- return h, ok
- }
- func (s *RPCServer) writeResponse(w http.ResponseWriter, resp *Response, remoteAddr string) {
- if resp == nil {
- w.WriteHeader(http.StatusNoContent)
- return
- }
- data, err := json.Marshal(resp)
- if err != nil {
- s.logger.Errorf(
- "[%s] failed to marshal json-rpc response | remote=%s | err=%v",
- s.name, remoteAddr, err,
- )
- http.Error(w, "Failed to encode response", http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- if _, err := w.Write(data); err != nil {
- s.logger.Errorf(
- "[%s] failed to write response | remote=%s | err=%v",
- s.name, remoteAddr, err,
- )
- }
- s.logger.Debugf(
- "[%s] RPC response sent | remote=%s | payload=%s",
- s.name, remoteAddr, string(data),
- )
- }
|