server.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. // Package jsonrpc2 implements a minimal JSON-RPC 2.0 server over HTTP.
  2. //
  3. // This server supports only single JSON-RPC 2.0 requests.
  4. // Batch requests (arrays of requests) are explicitly unsupported and
  5. // will be rejected with an Invalid Request (-32600) error.
  6. //
  7. // Author: NiuJiuRu
  8. // Email: niujiuru@qq.com
  9. // See: https://www.jsonrpc.org/specification
  10. package jsonrpc2
  11. import (
  12. "context"
  13. "encoding/json"
  14. "errors"
  15. "fmt"
  16. "io"
  17. "maps"
  18. "net/http"
  19. "runtime/debug"
  20. "sort"
  21. "strings"
  22. "sync"
  23. "sync/atomic"
  24. "github.com/sirupsen/logrus"
  25. )
  26. const (
  27. defaultMaxBodySize = 1 << 20 // 1MB
  28. )
  29. // MethodHandler defines the execution logic for a JSON-RPC method.
  30. type MethodHandler func(ctx context.Context, input *Request) *Response
  31. // MethodMap maps JSON-RPC method names to handlers.
  32. type MethodMap map[string]MethodHandler
  33. // RPCServer implements a JSON-RPC 2.0 HTTP server.
  34. //
  35. // Request processing rules:
  36. // - Only individual (non-batch) requests are supported.
  37. // - Batch requests are explicitly disallowed and rejected.
  38. // - Notifications (requests without an id) are supported and do not produce a response.
  39. //
  40. // Handler execution model:
  41. // - Each handler runs with a context that is canceled when the server stops.
  42. // - No request-level timeout is enforced; handlers may run indefinitely.
  43. // - Handlers are expected to respect context cancellation.
  44. //
  45. // Built-in behavior:
  46. // - A built-in "ping" method is provided for health checks.
  47. // - The server is thread-safe and supports concurrent request handling.
  48. // - Maximum request body size can be configured to prevent abuse.
  49. // - Panics in handlers are recovered and converted into internal error responses.
  50. // - Incoming requests and outgoing responses are logged for debugging.
  51. type RPCServer struct {
  52. name string // 服务器名称, 用于日志的打印前缀
  53. ctx context.Context
  54. cancel context.CancelFunc
  55. mu sync.RWMutex // 保护 handlers的并发访问
  56. handlers MethodMap
  57. maxBodySize atomic.Int64 // 允许的最大请求负载字节数
  58. logger *logrus.Logger
  59. }
  60. // NewRPCServer creates and initializes a JSON-RPC 2.0 server.
  61. func NewRPCServer(name string, logger *logrus.Logger) (*RPCServer, error) {
  62. if logger == nil {
  63. return nil, errors.New("logger cannot be nil")
  64. }
  65. ctx, cancel := context.WithCancel(context.Background())
  66. s := &RPCServer{
  67. name: name,
  68. ctx: ctx,
  69. cancel: cancel,
  70. handlers: make(MethodMap),
  71. logger: logger,
  72. }
  73. s.maxBodySize.Store(defaultMaxBodySize)
  74. s.handlers["ping"] = s.handlePing // 注册内置方法, 例如: ping -> pong
  75. return s, nil
  76. }
  77. // Stop gracefully shuts down the server.
  78. // It is safe to call multiple times.
  79. func (s *RPCServer) Stop() {
  80. s.cancel()
  81. }
  82. // SetMaxBodySize sets the maximum allowed request body size.
  83. func (s *RPCServer) SetMaxBodySize(size int64) {
  84. if size > 0 {
  85. s.maxBodySize.Store(size)
  86. }
  87. }
  88. // RegisterMethods registers multiple JSON-RPC methods atomically.
  89. // Either all methods are registered successfully, or none are registered if validation fails.
  90. func (s *RPCServer) RegisterMethods(methods MethodMap) error {
  91. if len(methods) == 0 {
  92. return errors.New("methods map cannot be empty")
  93. }
  94. s.mu.Lock()
  95. defer s.mu.Unlock()
  96. tmp := make(MethodMap, len(s.handlers)+len(methods))
  97. maps.Copy(tmp, s.handlers)
  98. for name, handler := range methods {
  99. if name == "" {
  100. return errors.New("method name cannot be empty, all methods must have a valid name")
  101. }
  102. if handler == nil {
  103. return fmt.Errorf("method %q has nil handler", name)
  104. }
  105. if _, exists := tmp[name]; exists {
  106. return fmt.Errorf("method %q already registered", name)
  107. }
  108. tmp[name] = handler
  109. }
  110. s.handlers = tmp
  111. return nil
  112. }
  113. // ListMethods returns all registered JSON-RPC method names.
  114. // The returned list is sorted alphabetically for deterministic output.
  115. func (s *RPCServer) ListMethods() []string {
  116. s.mu.RLock()
  117. defer s.mu.RUnlock()
  118. methods := make([]string, 0, len(s.handlers))
  119. for m := range s.handlers {
  120. methods = append(methods, m)
  121. }
  122. sort.Strings(methods)
  123. return methods
  124. }
  125. // ServeHTTP handles incoming HTTP requests.
  126. //
  127. // Example:
  128. //
  129. // server := jsonrpc2.NewRPCServer("MyRPCServer", baseapp.Logger)
  130. // http.Handle("/rpc", server)
  131. // http.ListenAndServe(":8080", nil)
  132. //
  133. // Example usage from a client:
  134. //
  135. // // Standard request (expects a response)
  136. // curl -X POST http://localhost:8080/rpc \
  137. // -H "Content-Type: application/json" \
  138. // -d '{"jsonrpc":"2.0","method":"ping","id":1}'
  139. //
  140. // // Notification, no response is returned
  141. // curl -X POST http://localhost:8080/rpc \
  142. // -H "Content-Type: application/json" \
  143. // -d '{"jsonrpc":"2.0","method":"ping"}'
  144. func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  145. defer r.Body.Close()
  146. ctx, cancel := context.WithCancel(r.Context()) // per-request context
  147. defer cancel()
  148. stop := context.AfterFunc(s.ctx, cancel) // tie request lifetime to server shutdown(Go >= 1.21)
  149. defer stop()
  150. remoteAddr := r.Header.Get("X-Forwarded-For")
  151. if remoteAddr != "" {
  152. parts := strings.Split(remoteAddr, ",")
  153. remoteAddr = strings.TrimSpace(parts[0])
  154. } else {
  155. remoteAddr = r.RemoteAddr
  156. }
  157. if r.Method != http.MethodPost {
  158. s.logger.Warnf(
  159. "[%s] invalid http method | remote=%s | method=%s",
  160. s.name, remoteAddr, r.Method,
  161. )
  162. http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed)
  163. return
  164. }
  165. r.Body = http.MaxBytesReader(w, r.Body, s.maxBodySize.Load())
  166. body, err := io.ReadAll(r.Body)
  167. if err != nil {
  168. s.logger.Warnf(
  169. "[%s] failed to read request body | remote=%s | err=%v",
  170. s.name, remoteAddr, err,
  171. )
  172. http.Error(w, "Failed to read body", http.StatusBadRequest)
  173. return
  174. }
  175. if isBatchRequest(body) {
  176. s.logger.Warnf(
  177. "[%s] batch request rejected | remote=%s",
  178. s.name, remoteAddr,
  179. )
  180. resp := BuildError(nil, ErrInvalidRequest, "Batch requests are not supported")
  181. s.writeResponse(w, resp, remoteAddr)
  182. return
  183. }
  184. req, err := ParseRequest(body)
  185. if err != nil {
  186. s.logger.Warnf(
  187. "[%s] failed to parse RPC request | remote=%s | err=%v",
  188. s.name, remoteAddr, err,
  189. )
  190. resp := BuildError(nil, ErrParse, err.Error())
  191. s.writeResponse(w, resp, remoteAddr)
  192. return
  193. }
  194. s.logger.Debugf(
  195. "[%s] RPC request received | remote=%s | payload=%s",
  196. s.name, remoteAddr, string(body),
  197. )
  198. handler, ok := s.getHandler(req.Method)
  199. if !ok {
  200. s.logger.Warnf(
  201. "[%s] method not found | remote=%s | method=%s",
  202. s.name, remoteAddr, req.Method,
  203. )
  204. resp := BuildError(req, ErrMethodNotFound, "Method not found")
  205. s.writeResponse(w, resp, remoteAddr)
  206. return
  207. }
  208. var resp *Response
  209. func() {
  210. defer func() {
  211. if r := recover(); r != nil {
  212. stack := string(debug.Stack())
  213. s.logger.Errorf(
  214. "[%s] handler panic | remote=%s | method=%s | panic=%v | stack=%s",
  215. s.name, remoteAddr, req.Method, r, stack,
  216. )
  217. resp = BuildError(req, ErrInternal, "Internal server error")
  218. }
  219. }()
  220. resp = handler(ctx, req)
  221. }() // 闭包处理, 捕获handler中的panic, 转换为JSON-RPC错误
  222. if req.IsNotification() { // 如果是通知消息, 则不需要响应
  223. s.logger.Debugf(
  224. "[%s] RPC notification received | remote=%s | method=%s",
  225. s.name, remoteAddr, req.Method,
  226. )
  227. return
  228. }
  229. if resp == nil { // handler没有返回结果, 则构建一个空响应
  230. s.logger.Warnf(
  231. "[%s] handler returned nil response | remote=%s | method=%s",
  232. s.name, remoteAddr, req.Method,
  233. )
  234. resp = BuildResponse(req, nil, nil)
  235. }
  236. s.writeResponse(w, resp, remoteAddr)
  237. }
  238. func (s *RPCServer) handlePing(ctx context.Context, req *Request) *Response {
  239. return BuildResponse(req, "pong", nil)
  240. }
  241. func isBatchRequest(body []byte) bool {
  242. var raw any
  243. if err := json.Unmarshal(body, &raw); err != nil {
  244. return false
  245. }
  246. _, ok := raw.([]any)
  247. return ok
  248. }
  249. func (s *RPCServer) getHandler(method string) (MethodHandler, bool) {
  250. s.mu.RLock()
  251. defer s.mu.RUnlock()
  252. h, ok := s.handlers[method]
  253. return h, ok
  254. }
  255. func (s *RPCServer) writeResponse(w http.ResponseWriter, resp *Response, remoteAddr string) {
  256. if resp == nil {
  257. w.WriteHeader(http.StatusNoContent)
  258. return
  259. }
  260. data, err := json.Marshal(resp)
  261. if err != nil {
  262. s.logger.Errorf(
  263. "[%s] failed to marshal json-rpc response | remote=%s | err=%v",
  264. s.name, remoteAddr, err,
  265. )
  266. http.Error(w, "Failed to encode response", http.StatusInternalServerError)
  267. return
  268. }
  269. w.Header().Set("Content-Type", "application/json")
  270. w.WriteHeader(http.StatusOK)
  271. if _, err := w.Write(data); err != nil {
  272. s.logger.Errorf(
  273. "[%s] failed to write response | remote=%s | err=%v",
  274. s.name, remoteAddr, err,
  275. )
  276. }
  277. s.logger.Debugf(
  278. "[%s] RPC response sent | remote=%s | payload=%s",
  279. s.name, remoteAddr, string(data),
  280. )
  281. }