diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/binarylog')
5 files changed, 1044 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go new file mode 100644 index 000000000..809d73cca --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go @@ -0,0 +1,189 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package binarylog implementation binary logging as defined in +// https://github.com/grpc/proposal/blob/master/A16-binary-logging.md. +package binarylog + +import ( +	"fmt" +	"os" + +	"google.golang.org/grpc/grpclog" +	"google.golang.org/grpc/internal/grpcutil" +) + +// Logger is the global binary logger. It can be used to get binary logger for +// each method. +type Logger interface { +	GetMethodLogger(methodName string) MethodLogger +} + +// binLogger is the global binary logger for the binary. One of this should be +// built at init time from the configuration (environment variable or flags). +// +// It is used to get a MethodLogger for each individual method. +var binLogger Logger + +var grpclogLogger = grpclog.Component("binarylog") + +// SetLogger sets the binary logger. +// +// Only call this at init time. +func SetLogger(l Logger) { +	binLogger = l +} + +// GetLogger gets the binary logger. +// +// Only call this at init time. +func GetLogger() Logger { +	return binLogger +} + +// GetMethodLogger returns the MethodLogger for the given methodName. +// +// methodName should be in the format of "/service/method". +// +// Each MethodLogger returned by this method is a new instance. This is to +// generate sequence id within the call. +func GetMethodLogger(methodName string) MethodLogger { +	if binLogger == nil { +		return nil +	} +	return binLogger.GetMethodLogger(methodName) +} + +func init() { +	const envStr = "GRPC_BINARY_LOG_FILTER" +	configStr := os.Getenv(envStr) +	binLogger = NewLoggerFromConfigString(configStr) +} + +// MethodLoggerConfig contains the setting for logging behavior of a method +// logger. Currently, it contains the max length of header and message. +type MethodLoggerConfig struct { +	// Max length of header and message. +	Header, Message uint64 +} + +// LoggerConfig contains the config for loggers to create method loggers. +type LoggerConfig struct { +	All      *MethodLoggerConfig +	Services map[string]*MethodLoggerConfig +	Methods  map[string]*MethodLoggerConfig + +	Blacklist map[string]struct{} +} + +type logger struct { +	config LoggerConfig +} + +// NewLoggerFromConfig builds a logger with the given LoggerConfig. +func NewLoggerFromConfig(config LoggerConfig) Logger { +	return &logger{config: config} +} + +// newEmptyLogger creates an empty logger. The map fields need to be filled in +// using the set* functions. +func newEmptyLogger() *logger { +	return &logger{} +} + +// Set method logger for "*". +func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error { +	if l.config.All != nil { +		return fmt.Errorf("conflicting global rules found") +	} +	l.config.All = ml +	return nil +} + +// Set method logger for "service/*". +// +// New MethodLogger with same service overrides the old one. +func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error { +	if _, ok := l.config.Services[service]; ok { +		return fmt.Errorf("conflicting service rules for service %v found", service) +	} +	if l.config.Services == nil { +		l.config.Services = make(map[string]*MethodLoggerConfig) +	} +	l.config.Services[service] = ml +	return nil +} + +// Set method logger for "service/method". +// +// New MethodLogger with same method overrides the old one. +func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error { +	if _, ok := l.config.Blacklist[method]; ok { +		return fmt.Errorf("conflicting blacklist rules for method %v found", method) +	} +	if _, ok := l.config.Methods[method]; ok { +		return fmt.Errorf("conflicting method rules for method %v found", method) +	} +	if l.config.Methods == nil { +		l.config.Methods = make(map[string]*MethodLoggerConfig) +	} +	l.config.Methods[method] = ml +	return nil +} + +// Set blacklist method for "-service/method". +func (l *logger) setBlacklist(method string) error { +	if _, ok := l.config.Blacklist[method]; ok { +		return fmt.Errorf("conflicting blacklist rules for method %v found", method) +	} +	if _, ok := l.config.Methods[method]; ok { +		return fmt.Errorf("conflicting method rules for method %v found", method) +	} +	if l.config.Blacklist == nil { +		l.config.Blacklist = make(map[string]struct{}) +	} +	l.config.Blacklist[method] = struct{}{} +	return nil +} + +// getMethodLogger returns the MethodLogger for the given methodName. +// +// methodName should be in the format of "/service/method". +// +// Each MethodLogger returned by this method is a new instance. This is to +// generate sequence id within the call. +func (l *logger) GetMethodLogger(methodName string) MethodLogger { +	s, m, err := grpcutil.ParseMethod(methodName) +	if err != nil { +		grpclogLogger.Infof("binarylogging: failed to parse %q: %v", methodName, err) +		return nil +	} +	if ml, ok := l.config.Methods[s+"/"+m]; ok { +		return NewTruncatingMethodLogger(ml.Header, ml.Message) +	} +	if _, ok := l.config.Blacklist[s+"/"+m]; ok { +		return nil +	} +	if ml, ok := l.config.Services[s]; ok { +		return NewTruncatingMethodLogger(ml.Header, ml.Message) +	} +	if l.config.All == nil { +		return nil +	} +	return NewTruncatingMethodLogger(l.config.All.Header, l.config.All.Message) +} diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog_testutil.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog_testutil.go new file mode 100644 index 000000000..1ee00a39a --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog_testutil.go @@ -0,0 +1,42 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// This file contains exported variables/functions that are exported for testing +// only. +// +// An ideal way for this would be to put those in a *_test.go but in binarylog +// package. But this doesn't work with staticcheck with go module. Error was: +// "MdToMetadataProto not declared by package binarylog". This could be caused +// by the way staticcheck looks for files for a certain package, which doesn't +// support *_test.go files. +// +// Move those to binary_test.go when staticcheck is fixed. + +package binarylog + +var ( +	// AllLogger is a logger that logs all headers/messages for all RPCs. It's +	// for testing only. +	AllLogger = NewLoggerFromConfigString("*") +	// MdToMetadataProto converts metadata to a binary logging proto message. +	// It's for testing only. +	MdToMetadataProto = mdToMetadataProto +	// AddrToProto converts an address to a binary logging proto message. It's +	// for testing only. +	AddrToProto = addrToProto +) diff --git a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go new file mode 100644 index 000000000..f9e80e27a --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go @@ -0,0 +1,208 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package binarylog + +import ( +	"errors" +	"fmt" +	"regexp" +	"strconv" +	"strings" +) + +// NewLoggerFromConfigString reads the string and build a logger. It can be used +// to build a new logger and assign it to binarylog.Logger. +// +// Example filter config strings: +//   - "" Nothing will be logged +//   - "*" All headers and messages will be fully logged. +//   - "*{h}" Only headers will be logged. +//   - "*{m:256}" Only the first 256 bytes of each message will be logged. +//   - "Foo/*" Logs every method in service Foo +//   - "Foo/*,-Foo/Bar" Logs every method in service Foo except method /Foo/Bar +//   - "Foo/*,Foo/Bar{m:256}" Logs the first 256 bytes of each message in method +//     /Foo/Bar, logs all headers and messages in every other method in service +//     Foo. +// +// If two configs exist for one certain method or service, the one specified +// later overrides the previous config. +func NewLoggerFromConfigString(s string) Logger { +	if s == "" { +		return nil +	} +	l := newEmptyLogger() +	methods := strings.Split(s, ",") +	for _, method := range methods { +		if err := l.fillMethodLoggerWithConfigString(method); err != nil { +			grpclogLogger.Warningf("failed to parse binary log config: %v", err) +			return nil +		} +	} +	return l +} + +// fillMethodLoggerWithConfigString parses config, creates TruncatingMethodLogger and adds +// it to the right map in the logger. +func (l *logger) fillMethodLoggerWithConfigString(config string) error { +	// "" is invalid. +	if config == "" { +		return errors.New("empty string is not a valid method binary logging config") +	} + +	// "-service/method", blacklist, no * or {} allowed. +	if config[0] == '-' { +		s, m, suffix, err := parseMethodConfigAndSuffix(config[1:]) +		if err != nil { +			return fmt.Errorf("invalid config: %q, %v", config, err) +		} +		if m == "*" { +			return fmt.Errorf("invalid config: %q, %v", config, "* not allowed in blacklist config") +		} +		if suffix != "" { +			return fmt.Errorf("invalid config: %q, %v", config, "header/message limit not allowed in blacklist config") +		} +		if err := l.setBlacklist(s + "/" + m); err != nil { +			return fmt.Errorf("invalid config: %v", err) +		} +		return nil +	} + +	// "*{h:256;m:256}" +	if config[0] == '*' { +		hdr, msg, err := parseHeaderMessageLengthConfig(config[1:]) +		if err != nil { +			return fmt.Errorf("invalid config: %q, %v", config, err) +		} +		if err := l.setDefaultMethodLogger(&MethodLoggerConfig{Header: hdr, Message: msg}); err != nil { +			return fmt.Errorf("invalid config: %v", err) +		} +		return nil +	} + +	s, m, suffix, err := parseMethodConfigAndSuffix(config) +	if err != nil { +		return fmt.Errorf("invalid config: %q, %v", config, err) +	} +	hdr, msg, err := parseHeaderMessageLengthConfig(suffix) +	if err != nil { +		return fmt.Errorf("invalid header/message length config: %q, %v", suffix, err) +	} +	if m == "*" { +		if err := l.setServiceMethodLogger(s, &MethodLoggerConfig{Header: hdr, Message: msg}); err != nil { +			return fmt.Errorf("invalid config: %v", err) +		} +	} else { +		if err := l.setMethodMethodLogger(s+"/"+m, &MethodLoggerConfig{Header: hdr, Message: msg}); err != nil { +			return fmt.Errorf("invalid config: %v", err) +		} +	} +	return nil +} + +const ( +	// TODO: this const is only used by env_config now. But could be useful for +	// other config. Move to binarylog.go if necessary. +	maxUInt = ^uint64(0) + +	// For "p.s/m" plus any suffix. Suffix will be parsed again. See test for +	// expected output. +	longMethodConfigRegexpStr = `^([\w./]+)/((?:\w+)|[*])(.+)?$` + +	// For suffix from above, "{h:123,m:123}". See test for expected output. +	optionalLengthRegexpStr      = `(?::(\d+))?` // Optional ":123". +	headerConfigRegexpStr        = `^{h` + optionalLengthRegexpStr + `}$` +	messageConfigRegexpStr       = `^{m` + optionalLengthRegexpStr + `}$` +	headerMessageConfigRegexpStr = `^{h` + optionalLengthRegexpStr + `;m` + optionalLengthRegexpStr + `}$` +) + +var ( +	longMethodConfigRegexp    = regexp.MustCompile(longMethodConfigRegexpStr) +	headerConfigRegexp        = regexp.MustCompile(headerConfigRegexpStr) +	messageConfigRegexp       = regexp.MustCompile(messageConfigRegexpStr) +	headerMessageConfigRegexp = regexp.MustCompile(headerMessageConfigRegexpStr) +) + +// Turn "service/method{h;m}" into "service", "method", "{h;m}". +func parseMethodConfigAndSuffix(c string) (service, method, suffix string, _ error) { +	// Regexp result: +	// +	// in:  "p.s/m{h:123,m:123}", +	// out: []string{"p.s/m{h:123,m:123}", "p.s", "m", "{h:123,m:123}"}, +	match := longMethodConfigRegexp.FindStringSubmatch(c) +	if match == nil { +		return "", "", "", fmt.Errorf("%q contains invalid substring", c) +	} +	service = match[1] +	method = match[2] +	suffix = match[3] +	return +} + +// Turn "{h:123;m:345}" into 123, 345. +// +// Return maxUInt if length is unspecified. +func parseHeaderMessageLengthConfig(c string) (hdrLenStr, msgLenStr uint64, err error) { +	if c == "" { +		return maxUInt, maxUInt, nil +	} +	// Header config only. +	if match := headerConfigRegexp.FindStringSubmatch(c); match != nil { +		if s := match[1]; s != "" { +			hdrLenStr, err = strconv.ParseUint(s, 10, 64) +			if err != nil { +				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) +			} +			return hdrLenStr, 0, nil +		} +		return maxUInt, 0, nil +	} + +	// Message config only. +	if match := messageConfigRegexp.FindStringSubmatch(c); match != nil { +		if s := match[1]; s != "" { +			msgLenStr, err = strconv.ParseUint(s, 10, 64) +			if err != nil { +				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) +			} +			return 0, msgLenStr, nil +		} +		return 0, maxUInt, nil +	} + +	// Header and message config both. +	if match := headerMessageConfigRegexp.FindStringSubmatch(c); match != nil { +		// Both hdr and msg are specified, but one or two of them might be empty. +		hdrLenStr = maxUInt +		msgLenStr = maxUInt +		if s := match[1]; s != "" { +			hdrLenStr, err = strconv.ParseUint(s, 10, 64) +			if err != nil { +				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) +			} +		} +		if s := match[2]; s != "" { +			msgLenStr, err = strconv.ParseUint(s, 10, 64) +			if err != nil { +				return 0, 0, fmt.Errorf("failed to convert %q to uint", s) +			} +		} +		return hdrLenStr, msgLenStr, nil +	} +	return 0, 0, fmt.Errorf("%q contains invalid substring", c) +} diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go new file mode 100644 index 000000000..d71e44177 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -0,0 +1,435 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package binarylog + +import ( +	"net" +	"strings" +	"sync/atomic" +	"time" + +	"github.com/golang/protobuf/proto" +	"github.com/golang/protobuf/ptypes" +	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" +	"google.golang.org/grpc/metadata" +	"google.golang.org/grpc/status" +) + +type callIDGenerator struct { +	id uint64 +} + +func (g *callIDGenerator) next() uint64 { +	id := atomic.AddUint64(&g.id, 1) +	return id +} + +// reset is for testing only, and doesn't need to be thread safe. +func (g *callIDGenerator) reset() { +	g.id = 0 +} + +var idGen callIDGenerator + +// MethodLogger is the sub-logger for each method. +type MethodLogger interface { +	Log(LogEntryConfig) +} + +// TruncatingMethodLogger is a method logger that truncates headers and messages +// based on configured fields. +type TruncatingMethodLogger struct { +	headerMaxLen, messageMaxLen uint64 + +	callID          uint64 +	idWithinCallGen *callIDGenerator + +	sink Sink // TODO(blog): make this plugable. +} + +// NewTruncatingMethodLogger returns a new truncating method logger. +func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger { +	return &TruncatingMethodLogger{ +		headerMaxLen:  h, +		messageMaxLen: m, + +		callID:          idGen.next(), +		idWithinCallGen: &callIDGenerator{}, + +		sink: DefaultSink, // TODO(blog): make it plugable. +	} +} + +// Build is an internal only method for building the proto message out of the +// input event. It's made public to enable other library to reuse as much logic +// in TruncatingMethodLogger as possible. +func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry { +	m := c.toProto() +	timestamp, _ := ptypes.TimestampProto(time.Now()) +	m.Timestamp = timestamp +	m.CallId = ml.callID +	m.SequenceIdWithinCall = ml.idWithinCallGen.next() + +	switch pay := m.Payload.(type) { +	case *binlogpb.GrpcLogEntry_ClientHeader: +		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) +	case *binlogpb.GrpcLogEntry_ServerHeader: +		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) +	case *binlogpb.GrpcLogEntry_Message: +		m.PayloadTruncated = ml.truncateMessage(pay.Message) +	} +	return m +} + +// Log creates a proto binary log entry, and logs it to the sink. +func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) { +	ml.sink.Write(ml.Build(c)) +} + +func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) { +	if ml.headerMaxLen == maxUInt { +		return false +	} +	var ( +		bytesLimit = ml.headerMaxLen +		index      int +	) +	// At the end of the loop, index will be the first entry where the total +	// size is greater than the limit: +	// +	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr. +	for ; index < len(mdPb.Entry); index++ { +		entry := mdPb.Entry[index] +		if entry.Key == "grpc-trace-bin" { +			// "grpc-trace-bin" is a special key. It's kept in the log entry, +			// but not counted towards the size limit. +			continue +		} +		currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue())) +		if currentEntryLen > bytesLimit { +			break +		} +		bytesLimit -= currentEntryLen +	} +	truncated = index < len(mdPb.Entry) +	mdPb.Entry = mdPb.Entry[:index] +	return truncated +} + +func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) { +	if ml.messageMaxLen == maxUInt { +		return false +	} +	if ml.messageMaxLen >= uint64(len(msgPb.Data)) { +		return false +	} +	msgPb.Data = msgPb.Data[:ml.messageMaxLen] +	return true +} + +// LogEntryConfig represents the configuration for binary log entry. +type LogEntryConfig interface { +	toProto() *binlogpb.GrpcLogEntry +} + +// ClientHeader configs the binary log entry to be a ClientHeader entry. +type ClientHeader struct { +	OnClientSide bool +	Header       metadata.MD +	MethodName   string +	Authority    string +	Timeout      time.Duration +	// PeerAddr is required only when it's on server side. +	PeerAddr net.Addr +} + +func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry { +	// This function doesn't need to set all the fields (e.g. seq ID). The Log +	// function will set the fields when necessary. +	clientHeader := &binlogpb.ClientHeader{ +		Metadata:   mdToMetadataProto(c.Header), +		MethodName: c.MethodName, +		Authority:  c.Authority, +	} +	if c.Timeout > 0 { +		clientHeader.Timeout = ptypes.DurationProto(c.Timeout) +	} +	ret := &binlogpb.GrpcLogEntry{ +		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, +		Payload: &binlogpb.GrpcLogEntry_ClientHeader{ +			ClientHeader: clientHeader, +		}, +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	if c.PeerAddr != nil { +		ret.Peer = addrToProto(c.PeerAddr) +	} +	return ret +} + +// ServerHeader configs the binary log entry to be a ServerHeader entry. +type ServerHeader struct { +	OnClientSide bool +	Header       metadata.MD +	// PeerAddr is required only when it's on client side. +	PeerAddr net.Addr +} + +func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry { +	ret := &binlogpb.GrpcLogEntry{ +		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, +		Payload: &binlogpb.GrpcLogEntry_ServerHeader{ +			ServerHeader: &binlogpb.ServerHeader{ +				Metadata: mdToMetadataProto(c.Header), +			}, +		}, +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	if c.PeerAddr != nil { +		ret.Peer = addrToProto(c.PeerAddr) +	} +	return ret +} + +// ClientMessage configs the binary log entry to be a ClientMessage entry. +type ClientMessage struct { +	OnClientSide bool +	// Message can be a proto.Message or []byte. Other messages formats are not +	// supported. +	Message interface{} +} + +func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry { +	var ( +		data []byte +		err  error +	) +	if m, ok := c.Message.(proto.Message); ok { +		data, err = proto.Marshal(m) +		if err != nil { +			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err) +		} +	} else if b, ok := c.Message.([]byte); ok { +		data = b +	} else { +		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") +	} +	ret := &binlogpb.GrpcLogEntry{ +		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, +		Payload: &binlogpb.GrpcLogEntry_Message{ +			Message: &binlogpb.Message{ +				Length: uint32(len(data)), +				Data:   data, +			}, +		}, +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	return ret +} + +// ServerMessage configs the binary log entry to be a ServerMessage entry. +type ServerMessage struct { +	OnClientSide bool +	// Message can be a proto.Message or []byte. Other messages formats are not +	// supported. +	Message interface{} +} + +func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry { +	var ( +		data []byte +		err  error +	) +	if m, ok := c.Message.(proto.Message); ok { +		data, err = proto.Marshal(m) +		if err != nil { +			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err) +		} +	} else if b, ok := c.Message.([]byte); ok { +		data = b +	} else { +		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") +	} +	ret := &binlogpb.GrpcLogEntry{ +		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, +		Payload: &binlogpb.GrpcLogEntry_Message{ +			Message: &binlogpb.Message{ +				Length: uint32(len(data)), +				Data:   data, +			}, +		}, +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	return ret +} + +// ClientHalfClose configs the binary log entry to be a ClientHalfClose entry. +type ClientHalfClose struct { +	OnClientSide bool +} + +func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry { +	ret := &binlogpb.GrpcLogEntry{ +		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, +		Payload: nil, // No payload here. +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	return ret +} + +// ServerTrailer configs the binary log entry to be a ServerTrailer entry. +type ServerTrailer struct { +	OnClientSide bool +	Trailer      metadata.MD +	// Err is the status error. +	Err error +	// PeerAddr is required only when it's on client side and the RPC is trailer +	// only. +	PeerAddr net.Addr +} + +func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry { +	st, ok := status.FromError(c.Err) +	if !ok { +		grpclogLogger.Info("binarylogging: error in trailer is not a status error") +	} +	var ( +		detailsBytes []byte +		err          error +	) +	stProto := st.Proto() +	if stProto != nil && len(stProto.Details) != 0 { +		detailsBytes, err = proto.Marshal(stProto) +		if err != nil { +			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err) +		} +	} +	ret := &binlogpb.GrpcLogEntry{ +		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, +		Payload: &binlogpb.GrpcLogEntry_Trailer{ +			Trailer: &binlogpb.Trailer{ +				Metadata:      mdToMetadataProto(c.Trailer), +				StatusCode:    uint32(st.Code()), +				StatusMessage: st.Message(), +				StatusDetails: detailsBytes, +			}, +		}, +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	if c.PeerAddr != nil { +		ret.Peer = addrToProto(c.PeerAddr) +	} +	return ret +} + +// Cancel configs the binary log entry to be a Cancel entry. +type Cancel struct { +	OnClientSide bool +} + +func (c *Cancel) toProto() *binlogpb.GrpcLogEntry { +	ret := &binlogpb.GrpcLogEntry{ +		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL, +		Payload: nil, +	} +	if c.OnClientSide { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT +	} else { +		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER +	} +	return ret +} + +// metadataKeyOmit returns whether the metadata entry with this key should be +// omitted. +func metadataKeyOmit(key string) bool { +	switch key { +	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te": +		return true +	case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users. +		return false +	} +	return strings.HasPrefix(key, "grpc-") +} + +func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata { +	ret := &binlogpb.Metadata{} +	for k, vv := range md { +		if metadataKeyOmit(k) { +			continue +		} +		for _, v := range vv { +			ret.Entry = append(ret.Entry, +				&binlogpb.MetadataEntry{ +					Key:   k, +					Value: []byte(v), +				}, +			) +		} +	} +	return ret +} + +func addrToProto(addr net.Addr) *binlogpb.Address { +	ret := &binlogpb.Address{} +	switch a := addr.(type) { +	case *net.TCPAddr: +		if a.IP.To4() != nil { +			ret.Type = binlogpb.Address_TYPE_IPV4 +		} else if a.IP.To16() != nil { +			ret.Type = binlogpb.Address_TYPE_IPV6 +		} else { +			ret.Type = binlogpb.Address_TYPE_UNKNOWN +			// Do not set address and port fields. +			break +		} +		ret.Address = a.IP.String() +		ret.IpPort = uint32(a.Port) +	case *net.UnixAddr: +		ret.Type = binlogpb.Address_TYPE_UNIX +		ret.Address = a.String() +	default: +		ret.Type = binlogpb.Address_TYPE_UNKNOWN +	} +	return ret +} diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go new file mode 100644 index 000000000..264de387c --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go @@ -0,0 +1,170 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package binarylog + +import ( +	"bufio" +	"encoding/binary" +	"io" +	"sync" +	"time" + +	"github.com/golang/protobuf/proto" +	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" +) + +var ( +	// DefaultSink is the sink where the logs will be written to. It's exported +	// for the binarylog package to update. +	DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). +) + +// Sink writes log entry into the binary log sink. +// +// sink is a copy of the exported binarylog.Sink, to avoid circular dependency. +type Sink interface { +	// Write will be called to write the log entry into the sink. +	// +	// It should be thread-safe so it can be called in parallel. +	Write(*binlogpb.GrpcLogEntry) error +	// Close will be called when the Sink is replaced by a new Sink. +	Close() error +} + +type noopSink struct{} + +func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil } +func (ns *noopSink) Close() error                       { return nil } + +// newWriterSink creates a binary log sink with the given writer. +// +// Write() marshals the proto message and writes it to the given writer. Each +// message is prefixed with a 4 byte big endian unsigned integer as the length. +// +// No buffer is done, Close() doesn't try to close the writer. +func newWriterSink(w io.Writer) Sink { +	return &writerSink{out: w} +} + +type writerSink struct { +	out io.Writer +} + +func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error { +	b, err := proto.Marshal(e) +	if err != nil { +		grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err) +		return err +	} +	hdr := make([]byte, 4) +	binary.BigEndian.PutUint32(hdr, uint32(len(b))) +	if _, err := ws.out.Write(hdr); err != nil { +		return err +	} +	if _, err := ws.out.Write(b); err != nil { +		return err +	} +	return nil +} + +func (ws *writerSink) Close() error { return nil } + +type bufferedSink struct { +	mu             sync.Mutex +	closer         io.Closer +	out            Sink          // out is built on buf. +	buf            *bufio.Writer // buf is kept for flush. +	flusherStarted bool + +	writeTicker *time.Ticker +	done        chan struct{} +} + +func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error { +	fs.mu.Lock() +	defer fs.mu.Unlock() +	if !fs.flusherStarted { +		// Start the write loop when Write is called. +		fs.startFlushGoroutine() +		fs.flusherStarted = true +	} +	if err := fs.out.Write(e); err != nil { +		return err +	} +	return nil +} + +const ( +	bufFlushDuration = 60 * time.Second +) + +func (fs *bufferedSink) startFlushGoroutine() { +	fs.writeTicker = time.NewTicker(bufFlushDuration) +	go func() { +		for { +			select { +			case <-fs.done: +				return +			case <-fs.writeTicker.C: +			} +			fs.mu.Lock() +			if err := fs.buf.Flush(); err != nil { +				grpclogLogger.Warningf("failed to flush to Sink: %v", err) +			} +			fs.mu.Unlock() +		} +	}() +} + +func (fs *bufferedSink) Close() error { +	fs.mu.Lock() +	defer fs.mu.Unlock() +	if fs.writeTicker != nil { +		fs.writeTicker.Stop() +	} +	close(fs.done) +	if err := fs.buf.Flush(); err != nil { +		grpclogLogger.Warningf("failed to flush to Sink: %v", err) +	} +	if err := fs.closer.Close(); err != nil { +		grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err) +	} +	if err := fs.out.Close(); err != nil { +		grpclogLogger.Warningf("failed to close the Sink: %v", err) +	} +	return nil +} + +// NewBufferedSink creates a binary log sink with the given WriteCloser. +// +// Write() marshals the proto message and writes it to the given writer. Each +// message is prefixed with a 4 byte big endian unsigned integer as the length. +// +// Content is kept in a buffer, and is flushed every 60 seconds. +// +// Close closes the WriteCloser. +func NewBufferedSink(o io.WriteCloser) Sink { +	bufW := bufio.NewWriter(o) +	return &bufferedSink{ +		closer: o, +		out:    newWriterSink(bufW), +		buf:    bufW, +		done:   make(chan struct{}), +	} +}  | 
