diff options
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/exporters/jaeger/reconnecting_udp_client.go')
-rw-r--r-- | vendor/go.opentelemetry.io/otel/exporters/jaeger/reconnecting_udp_client.go | 204 |
1 files changed, 0 insertions, 204 deletions
diff --git a/vendor/go.opentelemetry.io/otel/exporters/jaeger/reconnecting_udp_client.go b/vendor/go.opentelemetry.io/otel/exporters/jaeger/reconnecting_udp_client.go deleted file mode 100644 index 88055c8a3..000000000 --- a/vendor/go.opentelemetry.io/otel/exporters/jaeger/reconnecting_udp_client.go +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger" - -import ( - "fmt" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/go-logr/logr" -) - -// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is -// different than the current conn then the new address is dialed and the conn is swapped. -type reconnectingUDPConn struct { - // `sync/atomic` expects the first word in an allocated struct to be 64-bit - // aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details. - bufferBytes int64 - hostPort string - resolveFunc resolveFunc - dialFunc dialFunc - logger logr.Logger - - connMtx sync.RWMutex - conn *net.UDPConn - destAddr *net.UDPAddr - closeChan chan struct{} -} - -type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) -type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) - -// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is -// different than the current conn then the new address is dialed and the conn is swapped. -func newReconnectingUDPConn(hostPort string, bufferBytes int, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger logr.Logger) (*reconnectingUDPConn, error) { - conn := &reconnectingUDPConn{ - hostPort: hostPort, - resolveFunc: resolveFunc, - dialFunc: dialFunc, - logger: logger, - closeChan: make(chan struct{}), - bufferBytes: int64(bufferBytes), - } - - if err := conn.attemptResolveAndDial(); err != nil { - conn.logf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout) - } - - go conn.reconnectLoop(resolveTimeout) - - return conn, nil -} - -func (c *reconnectingUDPConn) logf(format string, args ...interface{}) { - if c.logger != emptyLogger { - c.logger.Info(format, args...) - } -} - -func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { - ticker := time.NewTicker(resolveTimeout) - defer ticker.Stop() - - for { - select { - case <-c.closeChan: - return - case <-ticker.C: - if err := c.attemptResolveAndDial(); err != nil { - c.logf("%s", err.Error()) - } - } - } -} - -func (c *reconnectingUDPConn) attemptResolveAndDial() error { - newAddr, err := c.resolveFunc("udp", c.hostPort) - if err != nil { - return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) - } - - c.connMtx.RLock() - curAddr := c.destAddr - c.connMtx.RUnlock() - - // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn - if curAddr != nil && newAddr.String() == curAddr.String() { - return nil - } - - if err := c.attemptDialNewAddr(newAddr); err != nil { - return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) - } - - return nil -} - -func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { - connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) - if err != nil { - return err - } - - if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { - if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { - return err - } - } - - c.connMtx.Lock() - c.destAddr = newAddr - // store prev to close later - prevConn := c.conn - c.conn = connUDP - c.connMtx.Unlock() - - if prevConn != nil { - return prevConn.Close() - } - - return nil -} - -// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning. -func (c *reconnectingUDPConn) Write(b []byte) (int, error) { - var bytesWritten int - var err error - - c.connMtx.RLock() - conn := c.conn - c.connMtx.RUnlock() - - if conn == nil { - // if connection is not initialized indicate this with err in order to hook into retry logic - err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") - } else { - bytesWritten, err = conn.Write(b) - } - - if err == nil { - return bytesWritten, nil - } - - // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again - if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { - c.connMtx.RLock() - conn := c.conn - c.connMtx.RUnlock() - - return conn.Write(b) - } - - // return original error if reconn fails - return bytesWritten, err -} - -// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation. -func (c *reconnectingUDPConn) Close() error { - close(c.closeChan) - - // acquire rw lock before closing conn to ensure calls to Write drain - c.connMtx.Lock() - defer c.connMtx.Unlock() - - if c.conn != nil { - return c.conn.Close() - } - - return nil -} - -// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held -// and SetWriteBuffer is called store bufferBytes to be set for new conns. -func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { - var err error - - c.connMtx.RLock() - conn := c.conn - c.connMtx.RUnlock() - - if conn != nil { - err = c.conn.SetWriteBuffer(bytes) - } - - if err == nil { - atomic.StoreInt64(&c.bufferBytes, int64(bytes)) - } - - return err -} |