summaryrefslogtreecommitdiff
path: root/vendor/github.com/ugorji/go/codec/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/ugorji/go/codec/rpc.go')
-rw-r--r--vendor/github.com/ugorji/go/codec/rpc.go86
1 files changed, 40 insertions, 46 deletions
diff --git a/vendor/github.com/ugorji/go/codec/rpc.go b/vendor/github.com/ugorji/go/codec/rpc.go
index 0da8ad577..bdc535b6d 100644
--- a/vendor/github.com/ugorji/go/codec/rpc.go
+++ b/vendor/github.com/ugorji/go/codec/rpc.go
@@ -4,10 +4,11 @@
package codec
import (
- "bufio"
"errors"
"io"
+ "net"
"net/rpc"
+ "sync/atomic"
)
var (
@@ -28,57 +29,44 @@ type RPCOptions struct {
// RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
//
// Set RPCNoBuffer=true to turn buffering off.
+ //
// Buffering can still be done if buffered connections are passed in, or
// buffering is configured on the handle.
+ //
+ // Deprecated: Buffering should be configured at the Handle or by using a buffer Reader.
+ // Setting this has no effect anymore (after v1.2.12 - authored 2025-05-06)
RPCNoBuffer bool
}
// rpcCodec defines the struct members and common methods.
type rpcCodec struct {
- c io.Closer
- r io.Reader
- w io.Writer
- f ioFlusher
-
+ c io.Closer
+ r io.Reader
+ w io.Writer
+ f ioFlusher
+ nc net.Conn
dec *Decoder
enc *Encoder
h Handle
- cls atomicClsErr
+ cls atomic.Pointer[clsErr]
}
-func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
- return newRPCCodec2(conn, conn, conn, h)
-}
-
-func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
- bh := h.getBasicHandle()
- // if the writer can flush, ensure we leverage it, else
- // we may hang waiting on read if write isn't flushed.
- // var f ioFlusher
- f, ok := w.(ioFlusher)
- if !bh.RPCNoBuffer {
- if bh.WriterBufferSize <= 0 {
- if !ok { // a flusher means there's already a buffer
- bw := bufio.NewWriter(w)
- f, w = bw, bw
- }
- }
- if bh.ReaderBufferSize <= 0 {
- if _, ok = w.(ioBuffered); !ok {
- r = bufio.NewReader(r)
- }
- }
- }
- return rpcCodec{
- c: c,
- w: w,
- r: r,
- f: f,
+func newRPCCodec(conn io.ReadWriteCloser, h Handle) *rpcCodec {
+ nc, _ := conn.(net.Conn)
+ f, _ := conn.(ioFlusher)
+ rc := &rpcCodec{
h: h,
- enc: NewEncoder(w, h),
- dec: NewDecoder(r, h),
+ c: conn,
+ w: conn,
+ r: conn,
+ f: f,
+ nc: nc,
+ enc: NewEncoder(conn, h),
+ dec: NewDecoder(conn, h),
}
+ rc.cls.Store(new(clsErr))
+ return rc
}
func (c *rpcCodec) write(obj ...interface{}) (err error) {
@@ -116,10 +104,16 @@ func (c *rpcCodec) write(obj ...interface{}) (err error) {
func (c *rpcCodec) read(obj interface{}) (err error) {
err = c.ready()
if err == nil {
- //If nil is passed in, we should read and discard
+ // Setting ReadDeadline should not be necessary,
+ // especially since it only works for net.Conn (not generic ioReadCloser).
+ // if c.nc != nil {
+ // c.nc.SetReadDeadline(time.Now().Add(1 * time.Second))
+ // }
+
+ // Note: If nil is passed in, we should read and discard
if obj == nil {
// return c.dec.Decode(&obj)
- err = c.dec.swallowErr()
+ err = panicToErr(c.dec, func() { c.dec.swallow() })
} else {
err = c.dec.Decode(obj)
}
@@ -129,11 +123,11 @@ func (c *rpcCodec) read(obj interface{}) (err error) {
func (c *rpcCodec) Close() (err error) {
if c.c != nil {
- cls := c.cls.load()
+ cls := c.cls.Load()
if !cls.closed {
- cls.err = c.c.Close()
- cls.closed = true
- c.cls.store(cls)
+ // writing to same pointer could lead to a data race (always make new one)
+ cls = &clsErr{closed: true, err: c.c.Close()}
+ c.cls.Store(cls)
}
err = cls.err
}
@@ -144,8 +138,8 @@ func (c *rpcCodec) ready() (err error) {
if c.c == nil {
err = errRpcNoConn
} else {
- cls := c.cls.load()
- if cls.closed {
+ cls := c.cls.Load()
+ if cls != nil && cls.closed {
if err = cls.err; err == nil {
err = errRpcIsClosed
}
@@ -161,7 +155,7 @@ func (c *rpcCodec) ReadResponseBody(body interface{}) error {
// -------------------------------------
type goRpcCodec struct {
- rpcCodec
+ *rpcCodec
}
func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {