summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/binarylog/sink.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/binarylog/sink.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/sink.go170
1 files changed, 0 insertions, 170 deletions
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
deleted file mode 100644
index 9ea598b14..000000000
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package binarylog
-
-import (
- "bufio"
- "encoding/binary"
- "io"
- "sync"
- "time"
-
- binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
- "google.golang.org/protobuf/proto"
-)
-
-var (
- // DefaultSink is the sink where the logs will be written to. It's exported
- // for the binarylog package to update.
- DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
-)
-
-// Sink writes log entry into the binary log sink.
-//
-// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
-type Sink interface {
- // Write will be called to write the log entry into the sink.
- //
- // It should be thread-safe so it can be called in parallel.
- Write(*binlogpb.GrpcLogEntry) error
- // Close will be called when the Sink is replaced by a new Sink.
- Close() error
-}
-
-type noopSink struct{}
-
-func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
-func (ns *noopSink) Close() error { return nil }
-
-// newWriterSink creates a binary log sink with the given writer.
-//
-// Write() marshals the proto message and writes it to the given writer. Each
-// message is prefixed with a 4 byte big endian unsigned integer as the length.
-//
-// No buffer is done, Close() doesn't try to close the writer.
-func newWriterSink(w io.Writer) Sink {
- return &writerSink{out: w}
-}
-
-type writerSink struct {
- out io.Writer
-}
-
-func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
- b, err := proto.Marshal(e)
- if err != nil {
- grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
- return err
- }
- hdr := make([]byte, 4)
- binary.BigEndian.PutUint32(hdr, uint32(len(b)))
- if _, err := ws.out.Write(hdr); err != nil {
- return err
- }
- if _, err := ws.out.Write(b); err != nil {
- return err
- }
- return nil
-}
-
-func (ws *writerSink) Close() error { return nil }
-
-type bufferedSink struct {
- mu sync.Mutex
- closer io.Closer
- out Sink // out is built on buf.
- buf *bufio.Writer // buf is kept for flush.
- flusherStarted bool
-
- writeTicker *time.Ticker
- done chan struct{}
-}
-
-func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
- fs.mu.Lock()
- defer fs.mu.Unlock()
- if !fs.flusherStarted {
- // Start the write loop when Write is called.
- fs.startFlushGoroutine()
- fs.flusherStarted = true
- }
- if err := fs.out.Write(e); err != nil {
- return err
- }
- return nil
-}
-
-const (
- bufFlushDuration = 60 * time.Second
-)
-
-func (fs *bufferedSink) startFlushGoroutine() {
- fs.writeTicker = time.NewTicker(bufFlushDuration)
- go func() {
- for {
- select {
- case <-fs.done:
- return
- case <-fs.writeTicker.C:
- }
- fs.mu.Lock()
- if err := fs.buf.Flush(); err != nil {
- grpclogLogger.Warningf("failed to flush to Sink: %v", err)
- }
- fs.mu.Unlock()
- }
- }()
-}
-
-func (fs *bufferedSink) Close() error {
- fs.mu.Lock()
- defer fs.mu.Unlock()
- if fs.writeTicker != nil {
- fs.writeTicker.Stop()
- }
- close(fs.done)
- if err := fs.buf.Flush(); err != nil {
- grpclogLogger.Warningf("failed to flush to Sink: %v", err)
- }
- if err := fs.closer.Close(); err != nil {
- grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
- }
- if err := fs.out.Close(); err != nil {
- grpclogLogger.Warningf("failed to close the Sink: %v", err)
- }
- return nil
-}
-
-// NewBufferedSink creates a binary log sink with the given WriteCloser.
-//
-// Write() marshals the proto message and writes it to the given writer. Each
-// message is prefixed with a 4 byte big endian unsigned integer as the length.
-//
-// Content is kept in a buffer, and is flushed every 60 seconds.
-//
-// Close closes the WriteCloser.
-func NewBufferedSink(o io.WriteCloser) Sink {
- bufW := bufio.NewWriter(o)
- return &bufferedSink{
- closer: o,
- out: newWriterSink(bufW),
- buf: bufW,
- done: make(chan struct{}),
- }
-}