summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org')
-rw-r--r--vendor/google.golang.org/genproto/googleapis/api/LICENSE (renamed from vendor/google.golang.org/genproto/LICENSE)0
-rw-r--r--vendor/google.golang.org/genproto/googleapis/rpc/LICENSE202
-rw-r--r--vendor/google.golang.org/genproto/protobuf/field_mask/field_mask.go23
-rw-r--r--vendor/google.golang.org/grpc/attributes/attributes.go41
-rw-r--r--vendor/google.golang.org/grpc/balancer/balancer.go2
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go486
-rw-r--r--vendor/google.golang.org/grpc/call.go5
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go694
-rw-r--r--vendor/google.golang.org/grpc/dialoptions.go44
-rw-r--r--vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go308
-rw-r--r--vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go223
-rw-r--r--vendor/google.golang.org/grpc/idle.go287
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/binarylog.go3
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/method_logger.go9
-rw-r--r--vendor/google.golang.org/grpc/internal/buffer/unbounded.go26
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/envconfig.go7
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/observability.go6
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/xds.go19
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go21
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go64
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go136
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go24
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go74
-rw-r--r--vendor/google.golang.org/grpc/internal/serviceconfig/duration.go130
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go12
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go2
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go38
-rw-r--r--vendor/google.golang.org/grpc/pickfirst.go52
-rw-r--r--vendor/google.golang.org/grpc/resolver/resolver.go24
-rw-r--r--vendor/google.golang.org/grpc/resolver_conn_wrapper.go229
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go27
-rw-r--r--vendor/google.golang.org/grpc/server.go33
-rw-r--r--vendor/google.golang.org/grpc/service_config.go75
-rw-r--r--vendor/google.golang.org/grpc/shared_buffer_pool.go154
-rw-r--r--vendor/google.golang.org/grpc/status/status.go29
-rw-r--r--vendor/google.golang.org/grpc/stream.go33
-rw-r--r--vendor/google.golang.org/grpc/version.go2
-rw-r--r--vendor/google.golang.org/protobuf/types/known/structpb/struct.pb.go810
40 files changed, 3640 insertions, 718 deletions
diff --git a/vendor/google.golang.org/genproto/LICENSE b/vendor/google.golang.org/genproto/googleapis/api/LICENSE
index d64569567..d64569567 100644
--- a/vendor/google.golang.org/genproto/LICENSE
+++ b/vendor/google.golang.org/genproto/googleapis/api/LICENSE
diff --git a/vendor/google.golang.org/genproto/googleapis/rpc/LICENSE b/vendor/google.golang.org/genproto/googleapis/rpc/LICENSE
new file mode 100644
index 000000000..d64569567
--- /dev/null
+++ b/vendor/google.golang.org/genproto/googleapis/rpc/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/vendor/google.golang.org/genproto/protobuf/field_mask/field_mask.go b/vendor/google.golang.org/genproto/protobuf/field_mask/field_mask.go
deleted file mode 100644
index d10ad6653..000000000
--- a/vendor/google.golang.org/genproto/protobuf/field_mask/field_mask.go
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright 2020 Google LLC
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package field_mask aliases all exported identifiers in
-// package "google.golang.org/protobuf/types/known/fieldmaskpb".
-package field_mask
-
-import "google.golang.org/protobuf/types/known/fieldmaskpb"
-
-type FieldMask = fieldmaskpb.FieldMask
-
-var File_google_protobuf_field_mask_proto = fieldmaskpb.File_google_protobuf_field_mask_proto
diff --git a/vendor/google.golang.org/grpc/attributes/attributes.go b/vendor/google.golang.org/grpc/attributes/attributes.go
index 02f5dc531..49712aca3 100644
--- a/vendor/google.golang.org/grpc/attributes/attributes.go
+++ b/vendor/google.golang.org/grpc/attributes/attributes.go
@@ -25,6 +25,11 @@
// later release.
package attributes
+import (
+ "fmt"
+ "strings"
+)
+
// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
@@ -99,3 +104,39 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}
+
+// String prints the attribute map. If any key or values throughout the map
+// implement fmt.Stringer, it calls that method and appends.
+func (a *Attributes) String() string {
+ var sb strings.Builder
+ sb.WriteString("{")
+ first := true
+ for k, v := range a.m {
+ if !first {
+ sb.WriteString(", ")
+ }
+ sb.WriteString(fmt.Sprintf("%q: %q ", str(k), str(v)))
+ first = false
+ }
+ sb.WriteString("}")
+ return sb.String()
+}
+
+func str(x interface{}) string {
+ if v, ok := x.(fmt.Stringer); ok {
+ return v.String()
+ } else if v, ok := x.(string); ok {
+ return v
+ }
+ return fmt.Sprintf("<%p>", x)
+}
+
+// MarshalJSON helps implement the json.Marshaler interface, thereby rendering
+// the Attributes correctly when printing (via pretty.JSON) structs containing
+// Attributes as fields.
+//
+// Is it impossible to unmarshal attributes from a JSON representation and this
+// method is meant only for debugging purposes.
+func (a *Attributes) MarshalJSON() ([]byte, error) {
+ return []byte(a.String()), nil
+}
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 09d61dd1b..8f00523c0 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -286,7 +286,7 @@ type PickResult struct {
//
// LB policies with child policies are responsible for propagating metadata
// injected by their children to the ClientConn, as part of Pick().
- Metatada metadata.MD
+ Metadata metadata.MD
}
// TransientFailureError returns e. It exists for backward compatibility and
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 0359956d3..04b9ad411 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -25,14 +25,20 @@ import (
"sync"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
- "google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
- "google.golang.org/grpc/status"
+)
+
+type ccbMode int
+
+const (
+ ccbModeActive = iota
+ ccbModeIdle
+ ccbModeClosed
+ ccbModeExitingIdle
)
// ccBalancerWrapper sits between the ClientConn and the Balancer.
@@ -49,192 +55,101 @@ import (
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
- cc *ClientConn
-
- // Since these fields are accessed only from handleXxx() methods which are
- // synchronized by the watcher goroutine, we do not need a mutex to protect
- // these fields.
+ // The following fields are initialized when the wrapper is created and are
+ // read-only afterwards, and therefore can be accessed without a mutex.
+ cc *ClientConn
+ opts balancer.BuildOptions
+
+ // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
+ // mutually exclusive manner as they are scheduled in the serializer. Fields
+ // accessed *only* in these serializer callbacks, can therefore be accessed
+ // without a mutex.
balancer *gracefulswitch.Balancer
curBalancerName string
- updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
- resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
- closed *grpcsync.Event // Indicates if close has been called.
- done *grpcsync.Event // Indicates if close has completed its work.
+ // mu guards access to the below fields. Access to the serializer and its
+ // cancel function needs to be mutex protected because they are overwritten
+ // when the wrapper exits idle mode.
+ mu sync.Mutex
+ serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
+ serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
+ mode ccbMode // Tracks the current mode of the wrapper.
}
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
+ ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
- cc: cc,
- updateCh: buffer.NewUnbounded(),
- resultCh: buffer.NewUnbounded(),
- closed: grpcsync.NewEvent(),
- done: grpcsync.NewEvent(),
+ cc: cc,
+ opts: bopts,
+ serializer: grpcsync.NewCallbackSerializer(ctx),
+ serializerCancel: cancel,
}
- go ccb.watcher()
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}
-// The following xxxUpdate structs wrap the arguments received as part of the
-// corresponding update. The watcher goroutine uses the 'type' of the update to
-// invoke the appropriate handler routine to handle the update.
-
-type ccStateUpdate struct {
- ccs *balancer.ClientConnState
-}
-
-type scStateUpdate struct {
- sc balancer.SubConn
- state connectivity.State
- err error
-}
-
-type exitIdleUpdate struct{}
-
-type resolverErrorUpdate struct {
- err error
-}
-
-type switchToUpdate struct {
- name string
-}
-
-type subConnUpdate struct {
- acbw *acBalancerWrapper
-}
-
-// watcher is a long-running goroutine which reads updates from a channel and
-// invokes corresponding methods on the underlying balancer. It ensures that
-// these methods are invoked in a synchronous fashion. It also ensures that
-// these methods are invoked in the order in which the updates were received.
-func (ccb *ccBalancerWrapper) watcher() {
- for {
- select {
- case u := <-ccb.updateCh.Get():
- ccb.updateCh.Load()
- if ccb.closed.HasFired() {
- break
- }
- switch update := u.(type) {
- case *ccStateUpdate:
- ccb.handleClientConnStateChange(update.ccs)
- case *scStateUpdate:
- ccb.handleSubConnStateChange(update)
- case *exitIdleUpdate:
- ccb.handleExitIdle()
- case *resolverErrorUpdate:
- ccb.handleResolverError(update.err)
- case *switchToUpdate:
- ccb.handleSwitchTo(update.name)
- case *subConnUpdate:
- ccb.handleRemoveSubConn(update.acbw)
- default:
- logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update)
- }
- case <-ccb.closed.Done():
- }
-
- if ccb.closed.HasFired() {
- ccb.handleClose()
- return
- }
- }
-}
-
// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
-//
-// Unlike other methods invoked by grpc to push updates to the underlying
-// balancer, this method cannot simply push the update onto the update channel
-// and return. It needs to return the error returned by the underlying balancer
-// back to grpc which propagates that to the resolver.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
- ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
-
- var res interface{}
- select {
- case res = <-ccb.resultCh.Get():
- ccb.resultCh.Load()
- case <-ccb.closed.Done():
- // Return early if the balancer wrapper is closed while we are waiting for
- // the underlying balancer to process a ClientConnState update.
- return nil
- }
- // If the returned error is nil, attempting to type assert to error leads to
- // panic. So, this needs to handled separately.
- if res == nil {
- return nil
- }
- return res.(error)
-}
-
-// handleClientConnStateChange handles a ClientConnState update from the update
-// channel and invokes the appropriate method on the underlying balancer.
-//
-// If the addresses specified in the update contain addresses of type "grpclb"
-// and the selected LB policy is not "grpclb", these addresses will be filtered
-// out and ccs will be modified with the updated address list.
-func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
- if ccb.curBalancerName != grpclbName {
- // Filter any grpclb addresses since we don't have the grpclb balancer.
- var addrs []resolver.Address
- for _, addr := range ccs.ResolverState.Addresses {
- if addr.Type == resolver.GRPCLB {
- continue
+ ccb.mu.Lock()
+ errCh := make(chan error, 1)
+ // Here and everywhere else where Schedule() is called, it is done with the
+ // lock held. But the lock guards only the scheduling part. The actual
+ // callback is called asynchronously without the lock being held.
+ ok := ccb.serializer.Schedule(func(_ context.Context) {
+ // If the addresses specified in the update contain addresses of type
+ // "grpclb" and the selected LB policy is not "grpclb", these addresses
+ // will be filtered out and ccs will be modified with the updated
+ // address list.
+ if ccb.curBalancerName != grpclbName {
+ var addrs []resolver.Address
+ for _, addr := range ccs.ResolverState.Addresses {
+ if addr.Type == resolver.GRPCLB {
+ continue
+ }
+ addrs = append(addrs, addr)
}
- addrs = append(addrs, addr)
+ ccs.ResolverState.Addresses = addrs
}
- ccs.ResolverState.Addresses = addrs
+ errCh <- ccb.balancer.UpdateClientConnState(*ccs)
+ })
+ if !ok {
+ // If we are unable to schedule a function with the serializer, it
+ // indicates that it has been closed. A serializer is only closed when
+ // the wrapper is closed or is in idle.
+ ccb.mu.Unlock()
+ return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
}
- ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
+ ccb.mu.Unlock()
+
+ // We get here only if the above call to Schedule succeeds, in which case it
+ // is guaranteed that the scheduled function will run. Therefore it is safe
+ // to block on this channel.
+ err := <-errCh
+ if logger.V(2) && err != nil {
+ logger.Infof("error from balancer.UpdateClientConnState: %v", err)
+ }
+ return err
}
// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
- // When updating addresses for a SubConn, if the address in use is not in
- // the new addresses, the old ac will be tearDown() and a new ac will be
- // created. tearDown() generates a state change with Shutdown state, we
- // don't want the balancer to receive this state change. So before
- // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
- // this function will be called with (nil, Shutdown). We don't need to call
- // balancer method in this case.
- if sc == nil {
- return
- }
- ccb.updateCh.Put(&scStateUpdate{
- sc: sc,
- state: s,
- err: err,
+ ccb.mu.Lock()
+ ccb.serializer.Schedule(func(_ context.Context) {
+ ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
-}
-
-// handleSubConnStateChange handles a SubConnState update from the update
-// channel and invokes the appropriate method on the underlying balancer.
-func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
- ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
-}
-
-func (ccb *ccBalancerWrapper) exitIdle() {
- ccb.updateCh.Put(&exitIdleUpdate{})
-}
-
-func (ccb *ccBalancerWrapper) handleExitIdle() {
- if ccb.cc.GetState() != connectivity.Idle {
- return
- }
- ccb.balancer.ExitIdle()
+ ccb.mu.Unlock()
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
- ccb.updateCh.Put(&resolverErrorUpdate{err: err})
-}
-
-func (ccb *ccBalancerWrapper) handleResolverError(err error) {
- ccb.balancer.ResolverError(err)
+ ccb.mu.Lock()
+ ccb.serializer.Schedule(func(_ context.Context) {
+ ccb.balancer.ResolverError(err)
+ })
+ ccb.mu.Unlock()
}
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
@@ -248,24 +163,27 @@ func (ccb *ccBalancerWrapper) handleResolverError(err error) {
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
- ccb.updateCh.Put(&switchToUpdate{name: name})
+ ccb.mu.Lock()
+ ccb.serializer.Schedule(func(_ context.Context) {
+ // TODO: Other languages use case-sensitive balancer registries. We should
+ // switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
+ if strings.EqualFold(ccb.curBalancerName, name) {
+ return
+ }
+ ccb.buildLoadBalancingPolicy(name)
+ })
+ ccb.mu.Unlock()
}
-// handleSwitchTo handles a balancer switch update from the update channel. It
-// calls the SwitchTo() method on the gracefulswitch.Balancer with a
-// balancer.Builder corresponding to name. If no balancer.Builder is registered
-// for the given name, it uses the default LB policy which is "pick_first".
-func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
- // TODO: Other languages use case-insensitive balancer registries. We should
- // switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
- if strings.EqualFold(ccb.curBalancerName, name) {
- return
- }
-
- // TODO: Ensure that name is a registered LB policy when we get here.
- // We currently only validate the `loadBalancingConfig` field. We need to do
- // the same for the `loadBalancingPolicy` field and reject the service config
- // if the specified policy is not registered.
+// buildLoadBalancingPolicy performs the following:
+// - retrieve a balancer builder for the given name. Use the default LB
+// policy, pick_first, if no LB policy with name is found in the registry.
+// - instruct the gracefulswitch balancer to switch to the above builder. This
+// will actually build the new balancer.
+// - update the `curBalancerName` field
+//
+// Must be called from a serializer callback.
+func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
@@ -281,26 +199,114 @@ func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
ccb.curBalancerName = builder.Name()
}
-// handleRemoveSucConn handles a request from the underlying balancer to remove
-// a subConn.
-//
-// See comments in RemoveSubConn() for more details.
-func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) {
- ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
+func (ccb *ccBalancerWrapper) close() {
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
+ ccb.closeBalancer(ccbModeClosed)
}
-func (ccb *ccBalancerWrapper) close() {
- ccb.closed.Fire()
- <-ccb.done.Done()
+// enterIdleMode is invoked by grpc when the channel enters idle mode upon
+// expiry of idle_timeout. This call blocks until the balancer is closed.
+func (ccb *ccBalancerWrapper) enterIdleMode() {
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
+ ccb.closeBalancer(ccbModeIdle)
+}
+
+// closeBalancer is invoked when the channel is being closed or when it enters
+// idle mode upon expiry of idle_timeout.
+func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
+ ccb.mu.Lock()
+ if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
+ ccb.mu.Unlock()
+ return
+ }
+
+ ccb.mode = m
+ done := ccb.serializer.Done
+ b := ccb.balancer
+ ok := ccb.serializer.Schedule(func(_ context.Context) {
+ // Close the serializer to ensure that no more calls from gRPC are sent
+ // to the balancer.
+ ccb.serializerCancel()
+ // Empty the current balancer name because we don't have a balancer
+ // anymore and also so that we act on the next call to switchTo by
+ // creating a new balancer specified by the new resolver.
+ ccb.curBalancerName = ""
+ })
+ if !ok {
+ ccb.mu.Unlock()
+ return
+ }
+ ccb.mu.Unlock()
+
+ // Give enqueued callbacks a chance to finish.
+ <-done
+ // Spawn a goroutine to close the balancer (since it may block trying to
+ // cleanup all allocated resources) and return early.
+ go b.Close()
}
-func (ccb *ccBalancerWrapper) handleClose() {
- ccb.balancer.Close()
- ccb.done.Fire()
+// exitIdleMode is invoked by grpc when the channel exits idle mode either
+// because of an RPC or because of an invocation of the Connect() API. This
+// recreates the balancer that was closed previously when entering idle mode.
+//
+// If the channel is not in idle mode, we know for a fact that we are here as a
+// result of the user calling the Connect() method on the ClientConn. In this
+// case, we can simply forward the call to the underlying balancer, instructing
+// it to reconnect to the backends.
+func (ccb *ccBalancerWrapper) exitIdleMode() {
+ ccb.mu.Lock()
+ if ccb.mode == ccbModeClosed {
+ // Request to exit idle is a no-op when wrapper is already closed.
+ ccb.mu.Unlock()
+ return
+ }
+
+ if ccb.mode == ccbModeIdle {
+ // Recreate the serializer which was closed when we entered idle.
+ ctx, cancel := context.WithCancel(context.Background())
+ ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
+ ccb.serializerCancel = cancel
+ }
+
+ // The ClientConn guarantees that mutual exclusion between close() and
+ // exitIdleMode(), and since we just created a new serializer, we can be
+ // sure that the below function will be scheduled.
+ done := make(chan struct{})
+ ccb.serializer.Schedule(func(_ context.Context) {
+ defer close(done)
+
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+
+ if ccb.mode != ccbModeIdle {
+ ccb.balancer.ExitIdle()
+ return
+ }
+
+ // Gracefulswitch balancer does not support a switchTo operation after
+ // being closed. Hence we need to create a new one here.
+ ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
+ ccb.mode = ccbModeActive
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
+
+ })
+ ccb.mu.Unlock()
+
+ <-done
+}
+
+func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+ return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
- if len(addrs) <= 0 {
+ if ccb.isIdleOrClosed() {
+ return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
+ }
+
+ if len(addrs) == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
@@ -309,31 +315,35 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
- acbw.ac.mu.Lock()
ac.acbw = acbw
- acbw.ac.mu.Unlock()
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- // Before we switched the ccBalancerWrapper to use gracefulswitch.Balancer, it
- // was required to handle the RemoveSubConn() method asynchronously by pushing
- // the update onto the update channel. This was done to avoid a deadlock as
- // switchBalancer() was holding cc.mu when calling Close() on the old
- // balancer, which would in turn call RemoveSubConn().
- //
- // With the use of gracefulswitch.Balancer in ccBalancerWrapper, handling this
- // asynchronously is probably not required anymore since the switchTo() method
- // handles the balancer switch by pushing the update onto the channel.
- // TODO(easwars): Handle this inline.
+ if ccb.isIdleOrClosed() {
+ // It it safe to ignore this call when the balancer is closed or in idle
+ // because the ClientConn takes care of closing the connections.
+ //
+ // Not returning early from here when the balancer is closed or in idle
+ // leads to a deadlock though, because of the following sequence of
+ // calls when holding cc.mu:
+ // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
+ // ccb.RemoveAddrConn --> cc.removeAddrConn
+ return
+ }
+
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
- ccb.updateCh.Put(&subConnUpdate{acbw: acbw})
+ ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
+ if ccb.isIdleOrClosed() {
+ return
+ }
+
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
@@ -342,6 +352,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
+ if ccb.isIdleOrClosed() {
+ return
+ }
+
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
@@ -352,6 +366,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
}
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
+ if ccb.isIdleOrClosed() {
+ return
+ }
+
ccb.cc.resolveNow(o)
}
@@ -362,71 +380,31 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
+ ac *addrConn // read-only
+
mu sync.Mutex
- ac *addrConn
producers map[balancer.ProducerBuilder]*refCountedProducer
}
-func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
- acbw.mu.Lock()
- defer acbw.mu.Unlock()
- if len(addrs) <= 0 {
- acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
- return
- }
- if !acbw.ac.tryUpdateAddrs(addrs) {
- cc := acbw.ac.cc
- opts := acbw.ac.scopts
- acbw.ac.mu.Lock()
- // Set old ac.acbw to nil so the Shutdown state update will be ignored
- // by balancer.
- //
- // TODO(bar) the state transition could be wrong when tearDown() old ac
- // and creating new ac, fix the transition.
- acbw.ac.acbw = nil
- acbw.ac.mu.Unlock()
- acState := acbw.ac.getState()
- acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
-
- if acState == connectivity.Shutdown {
- return
- }
+func (acbw *acBalancerWrapper) String() string {
+ return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())
+}
- newAC, err := cc.newAddrConn(addrs, opts)
- if err != nil {
- channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
- return
- }
- acbw.ac = newAC
- newAC.mu.Lock()
- newAC.acbw = acbw
- newAC.mu.Unlock()
- if acState != connectivity.Idle {
- go newAC.connect()
- }
- }
+func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
+ acbw.ac.updateAddrs(addrs)
}
func (acbw *acBalancerWrapper) Connect() {
- acbw.mu.Lock()
- defer acbw.mu.Unlock()
go acbw.ac.connect()
}
-func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
- acbw.mu.Lock()
- defer acbw.mu.Unlock()
- return acbw.ac
-}
-
-var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected")
-
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
-// ready, returns errSubConnNotReady.
+// ready, blocks until it is or ctx expires. Returns an error when the context
+// expires or the addrConn is shut down.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
- transport := acbw.ac.getReadyTransport()
- if transport == nil {
- return nil, errSubConnNotReady
+ transport, err := acbw.ac.getTransport(ctx)
+ if err != nil {
+ return nil, err
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 9e20e4d38..e6a1dc5d7 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -27,6 +27,11 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
+ if err := cc.idlenessMgr.onCallBegin(); err != nil {
+ return err
+ }
+ defer cc.idlenessMgr.onCallEnd()
+
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 3a7614242..bfd7555a8 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -24,7 +24,6 @@ import (
"fmt"
"math"
"net/url"
- "reflect"
"strings"
"sync"
"sync/atomic"
@@ -38,6 +37,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@@ -69,6 +69,9 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
+ // errConnIdling indicates the the connection is being closed as the channel
+ // is moving to an idle mode due to inactivity.
+ errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
@@ -134,17 +137,29 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
- target: target,
- csMgr: &connectivityStateManager{},
- conns: make(map[*addrConn]struct{}),
- dopts: defaultDialOptions(),
- blockingpicker: newPickerWrapper(),
- czData: new(channelzData),
- firstResolveEvent: grpcsync.NewEvent(),
- }
+ target: target,
+ csMgr: &connectivityStateManager{},
+ conns: make(map[*addrConn]struct{}),
+ dopts: defaultDialOptions(),
+ czData: new(channelzData),
+ }
+
+ // We start the channel off in idle mode, but kick it out of idle at the end
+ // of this method, instead of waiting for the first RPC. Other gRPC
+ // implementations do wait for the first RPC to kick the channel out of
+ // idle. But doing so would be a major behavior change for our users who are
+ // used to seeing the channel active after Dial.
+ //
+ // Taking this approach of kicking it out of idle at the end of this method
+ // allows us to share the code between channel creation and exiting idle
+ // mode. This will also make it easy for us to switch to starting the
+ // channel off in idle, if at all we ever get to do that.
+ cc.idlenessState = ccIdlenessStateIdle
+
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ cc.exitIdleCond = sync.NewCond(&cc.mu)
disableGlobalOpts := false
for _, opt := range opts {
@@ -173,40 +188,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
- pid := cc.dopts.channelzParentID
- cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target)
- ted := &channelz.TraceEventDesc{
- Desc: "Channel created",
- Severity: channelz.CtInfo,
- }
- if cc.dopts.channelzParentID != nil {
- ted.Parent = &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()),
- Severity: channelz.CtInfo,
- }
- }
- channelz.AddTraceEvent(logger, cc.channelzID, 1, ted)
- cc.csMgr.channelzID = cc.channelzID
+ // Register ClientConn with channelz.
+ cc.channelzRegistration(target)
- if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
- return nil, errNoTransportSecurity
- }
- if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
- return nil, errTransportCredsAndBundle
- }
- if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
- return nil, errNoTransportCredsInBundle
- }
- transportCreds := cc.dopts.copts.TransportCredentials
- if transportCreds == nil {
- transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
- }
- if transportCreds.Info().SecurityProtocol == "insecure" {
- for _, cd := range cc.dopts.copts.PerRPCCredentials {
- if cd.RequireTransportSecurity() {
- return nil, errTransportCredentialsMissing
- }
- }
+ if err := cc.validateTransportCredentials(); err != nil {
+ return nil, err
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
@@ -249,15 +235,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
// Determine the resolver to use.
- resolverBuilder, err := cc.parseTargetAndFindResolver()
- if err != nil {
+ if err := cc.parseTargetAndFindResolver(); err != nil {
return nil, err
}
- cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint(), cc.target, cc.dopts)
- if err != nil {
+ if err = cc.determineAuthority(); err != nil {
return nil, err
}
- channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
if cc.dopts.scChan != nil {
// Blocking wait for the initial service config.
@@ -275,57 +258,224 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
go cc.scWatcher()
}
+ // This creates the name resolver, load balancer, blocking picker etc.
+ if err := cc.exitIdleMode(); err != nil {
+ return nil, err
+ }
+
+ // Configure idleness support with configured idle timeout or default idle
+ // timeout duration. Idleness can be explicitly disabled by the user, by
+ // setting the dial option to 0.
+ cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
+
+ // Return early for non-blocking dials.
+ if !cc.dopts.block {
+ return cc, nil
+ }
+
+ // A blocking dial blocks until the clientConn is ready.
+ for {
+ s := cc.GetState()
+ if s == connectivity.Idle {
+ cc.Connect()
+ }
+ if s == connectivity.Ready {
+ return cc, nil
+ } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
+ if err = cc.connectionError(); err != nil {
+ terr, ok := err.(interface {
+ Temporary() bool
+ })
+ if ok && !terr.Temporary() {
+ return nil, err
+ }
+ }
+ }
+ if !cc.WaitForStateChange(ctx, s) {
+ // ctx got timeout or canceled.
+ if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
+ return nil, err
+ }
+ return nil, ctx.Err()
+ }
+ }
+}
+
+// addTraceEvent is a helper method to add a trace event on the channel. If the
+// channel is a nested one, the same event is also added on the parent channel.
+func (cc *ClientConn) addTraceEvent(msg string) {
+ ted := &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Channel %s", msg),
+ Severity: channelz.CtInfo,
+ }
+ if cc.dopts.channelzParentID != nil {
+ ted.Parent = &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg),
+ Severity: channelz.CtInfo,
+ }
+ }
+ channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
+}
+
+// exitIdleMode moves the channel out of idle mode by recreating the name
+// resolver and load balancer.
+func (cc *ClientConn) exitIdleMode() error {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return errConnClosing
+ }
+ if cc.idlenessState != ccIdlenessStateIdle {
+ cc.mu.Unlock()
+ logger.Info("ClientConn asked to exit idle mode when not in idle mode")
+ return nil
+ }
+
+ defer func() {
+ // When Close() and exitIdleMode() race against each other, one of the
+ // following two can happen:
+ // - Close() wins the race and runs first. exitIdleMode() runs after, and
+ // sees that the ClientConn is already closed and hence returns early.
+ // - exitIdleMode() wins the race and runs first and recreates the balancer
+ // and releases the lock before recreating the resolver. If Close() runs
+ // in this window, it will wait for exitIdleMode to complete.
+ //
+ // We achieve this synchronization using the below condition variable.
+ cc.mu.Lock()
+ cc.idlenessState = ccIdlenessStateActive
+ cc.exitIdleCond.Signal()
+ cc.mu.Unlock()
+ }()
+
+ cc.idlenessState = ccIdlenessStateExitingIdle
+ exitedIdle := false
+ if cc.blockingpicker == nil {
+ cc.blockingpicker = newPickerWrapper()
+ } else {
+ cc.blockingpicker.exitIdleMode()
+ exitedIdle = true
+ }
+
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
- cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
- DialCreds: credsClone,
- CredsBundle: cc.dopts.copts.CredsBundle,
- Dialer: cc.dopts.copts.Dialer,
- Authority: cc.authority,
- CustomUserAgent: cc.dopts.copts.UserAgent,
- ChannelzParentID: cc.channelzID,
- Target: cc.parsedTarget,
- })
+ if cc.balancerWrapper == nil {
+ cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
+ DialCreds: credsClone,
+ CredsBundle: cc.dopts.copts.CredsBundle,
+ Dialer: cc.dopts.copts.Dialer,
+ Authority: cc.authority,
+ CustomUserAgent: cc.dopts.copts.UserAgent,
+ ChannelzParentID: cc.channelzID,
+ Target: cc.parsedTarget,
+ })
+ } else {
+ cc.balancerWrapper.exitIdleMode()
+ }
+ cc.firstResolveEvent = grpcsync.NewEvent()
+ cc.mu.Unlock()
- // Build the resolver.
- rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
- if err != nil {
- return nil, fmt.Errorf("failed to build resolver: %v", err)
+ // This needs to be called without cc.mu because this builds a new resolver
+ // which might update state or report error inline which needs to be handled
+ // by cc.updateResolverState() which also grabs cc.mu.
+ if err := cc.initResolverWrapper(credsClone); err != nil {
+ return err
+ }
+
+ if exitedIdle {
+ cc.addTraceEvent("exiting idle mode")
}
+ return nil
+}
+
+// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
+// name resolver, load balancer and any subchannels.
+func (cc *ClientConn) enterIdleMode() error {
cc.mu.Lock()
- cc.resolverWrapper = rWrapper
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return ErrClientConnClosing
+ }
+ if cc.idlenessState != ccIdlenessStateActive {
+ logger.Error("ClientConn asked to enter idle mode when not active")
+ return nil
+ }
+
+ // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
+ // of setting it to nil here, we recreate the map. This also means that we
+ // don't have to do this when exiting idle mode.
+ conns := cc.conns
+ cc.conns = make(map[*addrConn]struct{})
+
+ // TODO: Currently, we close the resolver wrapper upon entering idle mode
+ // and create a new one upon exiting idle mode. This means that the
+ // `cc.resolverWrapper` field would be overwritten everytime we exit idle
+ // mode. While this means that we need to hold `cc.mu` when accessing
+ // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
+ // try to do the same for the balancer and picker wrappers too.
+ cc.resolverWrapper.close()
+ cc.blockingpicker.enterIdleMode()
+ cc.balancerWrapper.enterIdleMode()
+ cc.csMgr.updateState(connectivity.Idle)
+ cc.idlenessState = ccIdlenessStateIdle
cc.mu.Unlock()
- // A blocking dial blocks until the clientConn is ready.
- if cc.dopts.block {
- for {
- cc.Connect()
- s := cc.GetState()
- if s == connectivity.Ready {
- break
- } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
- if err = cc.connectionError(); err != nil {
- terr, ok := err.(interface {
- Temporary() bool
- })
- if ok && !terr.Temporary() {
- return nil, err
- }
- }
- }
- if !cc.WaitForStateChange(ctx, s) {
- // ctx got timeout or canceled.
- if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
- return nil, err
- }
- return nil, ctx.Err()
+ go func() {
+ cc.addTraceEvent("entering idle mode")
+ for ac := range conns {
+ ac.tearDown(errConnIdling)
+ }
+ }()
+ return nil
+}
+
+// validateTransportCredentials performs a series of checks on the configured
+// transport credentials. It returns a non-nil error if any of these conditions
+// are met:
+// - no transport creds and no creds bundle is configured
+// - both transport creds and creds bundle are configured
+// - creds bundle is configured, but it lacks a transport credentials
+// - insecure transport creds configured alongside call creds that require
+// transport level security
+//
+// If none of the above conditions are met, the configured credentials are
+// deemed valid and a nil error is returned.
+func (cc *ClientConn) validateTransportCredentials() error {
+ if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
+ return errNoTransportSecurity
+ }
+ if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
+ return errTransportCredsAndBundle
+ }
+ if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
+ return errNoTransportCredsInBundle
+ }
+ transportCreds := cc.dopts.copts.TransportCredentials
+ if transportCreds == nil {
+ transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
+ }
+ if transportCreds.Info().SecurityProtocol == "insecure" {
+ for _, cd := range cc.dopts.copts.PerRPCCredentials {
+ if cd.RequireTransportSecurity() {
+ return errTransportCredentialsMissing
}
}
}
+ return nil
+}
- return cc, nil
+// channelzRegistration registers the newly created ClientConn with channelz and
+// stores the returned identifier in `cc.channelzID` and `cc.csMgr.channelzID`.
+// A channelz trace event is emitted for ClientConn creation. If the newly
+// created ClientConn is a nested one, i.e a valid parent ClientConn ID is
+// specified via a dial option, the trace event is also added to the parent.
+//
+// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
+func (cc *ClientConn) channelzRegistration(target string) {
+ cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
+ cc.addTraceEvent("created")
+ cc.csMgr.channelzID = cc.channelzID
}
// chainUnaryClientInterceptors chains all unary client interceptors into one.
@@ -471,7 +621,9 @@ type ClientConn struct {
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
+ resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
+ idlenessMgr idlenessManager
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
@@ -492,11 +644,31 @@ type ClientConn struct {
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
+ idlenessState ccIdlenessState // Tracks idleness state of the channel.
+ exitIdleCond *sync.Cond // Signalled when channel exits idle.
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
+// ccIdlenessState tracks the idleness state of the channel.
+//
+// Channels start off in `active` and move to `idle` after a period of
+// inactivity. When moving back to `active` upon an incoming RPC, they
+// transition through `exiting_idle`. This state is useful for synchronization
+// with Close().
+//
+// This state tracking is mostly for self-protection. The idlenessManager is
+// expected to keep track of the state as well, and is expected not to call into
+// the ClientConn unnecessarily.
+type ccIdlenessState int8
+
+const (
+ ccIdlenessStateActive ccIdlenessState = iota
+ ccIdlenessStateIdle
+ ccIdlenessStateExitingIdle
+)
+
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
//
@@ -536,7 +708,10 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
- cc.balancerWrapper.exitIdle()
+ cc.exitIdleMode()
+ // If the ClientConn was not in idle mode, we need to call ExitIdle on the
+ // LB policy so that connections can be created.
+ cc.balancerWrapper.exitIdleMode()
}
func (cc *ClientConn) scWatcher() {
@@ -693,6 +868,20 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
cc.balancerWrapper.updateSubConnState(sc, s, err)
}
+// Makes a copy of the input addresses slice and clears out the balancer
+// attributes field. Addresses are passed during subconn creation and address
+// update operations. In both cases, we will clear the balancer attributes by
+// calling this function, and therefore we will be able to use the Equal method
+// provided by the resolver.Address type for comparison.
+func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address {
+ out := make([]resolver.Address, len(in))
+ for i := range in {
+ out[i] = in[i]
+ out[i].BalancerAttributes = nil
+ }
+ return out
+}
+
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
@@ -700,11 +889,12 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
- addrs: addrs,
+ addrs: copyAddressesWithoutBalancerAttributes(addrs),
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
+ stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@@ -798,9 +988,6 @@ func (ac *addrConn) connect() error {
ac.mu.Unlock()
return nil
}
- // Update connectivity state within the lock to prevent subsequent or
- // concurrent calls from resetting the transport more than once.
- ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
ac.resetTransport()
@@ -819,58 +1006,63 @@ func equalAddresses(a, b []resolver.Address) bool {
return true
}
-// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
-//
-// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
-// addresses will be picked up by retry in the next iteration after backoff.
-//
-// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
-//
-// If the addresses is the same as the old list, it does nothing and returns
-// true.
-//
-// If ac is Connecting, it returns false. The caller should tear down the ac and
-// create a new one. Note that the backoff will be reset when this happens.
-//
-// If ac is Ready, it checks whether current connected address of ac is in the
-// new addrs list.
-// - If true, it updates ac.addrs and returns true. The ac will keep using
-// the existing connection.
-// - If false, it does nothing and returns false.
-func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
+// updateAddrs updates ac.addrs with the new addresses list and handles active
+// connections or connection attempts.
+func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.mu.Lock()
- defer ac.mu.Unlock()
- channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
+ channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))
+
+ addrs = copyAddressesWithoutBalancerAttributes(addrs)
+ if equalAddresses(ac.addrs, addrs) {
+ ac.mu.Unlock()
+ return
+ }
+
+ ac.addrs = addrs
+
if ac.state == connectivity.Shutdown ||
ac.state == connectivity.TransientFailure ||
ac.state == connectivity.Idle {
- ac.addrs = addrs
- return true
+ // We were not connecting, so do nothing but update the addresses.
+ ac.mu.Unlock()
+ return
}
- if equalAddresses(ac.addrs, addrs) {
- return true
+ if ac.state == connectivity.Ready {
+ // Try to find the connected address.
+ for _, a := range addrs {
+ a.ServerName = ac.cc.getServerName(a)
+ if a.Equal(ac.curAddr) {
+ // We are connected to a valid address, so do nothing but
+ // update the addresses.
+ ac.mu.Unlock()
+ return
+ }
+ }
}
- if ac.state == connectivity.Connecting {
- return false
- }
+ // We are either connected to the wrong address or currently connecting.
+ // Stop the current iteration and restart.
- // ac.state is Ready, try to find the connected address.
- var curAddrFound bool
- for _, a := range addrs {
- a.ServerName = ac.cc.getServerName(a)
- if reflect.DeepEqual(ac.curAddr, a) {
- curAddrFound = true
- break
- }
+ ac.cancel()
+ ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
+
+ // We have to defer here because GracefulClose => Close => onClose, which
+ // requires locking ac.mu.
+ if ac.transport != nil {
+ defer ac.transport.GracefulClose()
+ ac.transport = nil
}
- channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
- if curAddrFound {
- ac.addrs = addrs
+
+ if len(addrs) == 0 {
+ ac.updateConnectivityState(connectivity.Idle, nil)
}
- return curAddrFound
+ ac.mu.Unlock()
+
+ // Since we were connecting/connected, we should start a new connection
+ // attempt.
+ go ac.resetTransport()
}
// getServerName determines the serverName to be used in the connection
@@ -1023,39 +1215,40 @@ func (cc *ClientConn) Close() error {
cc.mu.Unlock()
return ErrClientConnClosing
}
+
+ for cc.idlenessState == ccIdlenessStateExitingIdle {
+ cc.exitIdleCond.Wait()
+ }
+
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
+ pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
- cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
+ idlenessMgr := cc.idlenessMgr
cc.mu.Unlock()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
- cc.blockingpicker.close()
+ if pWrapper != nil {
+ pWrapper.close()
+ }
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
rWrapper.close()
}
+ if idlenessMgr != nil {
+ idlenessMgr.close()
+ }
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
- ted := &channelz.TraceEventDesc{
- Desc: "Channel deleted",
- Severity: channelz.CtInfo,
- }
- if cc.dopts.channelzParentID != nil {
- ted.Parent = &channelz.TraceEventDesc{
- Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()),
- Severity: channelz.CtInfo,
- }
- }
- channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
+ cc.addTraceEvent("deleted")
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
// deleted right away.
@@ -1085,7 +1278,8 @@ type addrConn struct {
addrs []resolver.Address // All addresses that the resolver resolved to.
// Use updateConnectivityState for updating addrConn's connectivity state.
- state connectivity.State
+ state connectivity.State
+ stateChan chan struct{} // closed and recreated on every state change.
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
@@ -1099,6 +1293,9 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
if ac.state == s {
return
}
+ // When changing states, reset the state change channel.
+ close(ac.stateChan)
+ ac.stateChan = make(chan struct{})
ac.state = s
if lastErr == nil {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
@@ -1124,7 +1321,8 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
func (ac *addrConn) resetTransport() {
ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
+ acCtx := ac.ctx
+ if acCtx.Err() != nil {
ac.mu.Unlock()
return
}
@@ -1152,15 +1350,14 @@ func (ac *addrConn) resetTransport() {
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
- if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
+ if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
// After exhausting all addresses, the addrConn enters
// TRANSIENT_FAILURE.
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
+ if acCtx.Err() != nil {
return
}
+ ac.mu.Lock()
ac.updateConnectivityState(connectivity.TransientFailure, err)
// Backoff.
@@ -1175,13 +1372,13 @@ func (ac *addrConn) resetTransport() {
ac.mu.Unlock()
case <-b:
timer.Stop()
- case <-ac.ctx.Done():
+ case <-acCtx.Done():
timer.Stop()
return
}
ac.mu.Lock()
- if ac.state != connectivity.Shutdown {
+ if acCtx.Err() == nil {
ac.updateConnectivityState(connectivity.Idle, err)
}
ac.mu.Unlock()
@@ -1196,14 +1393,13 @@ func (ac *addrConn) resetTransport() {
// tryAllAddrs tries to creates a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
-func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
+func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
var firstConnErr error
for _, addr := range addrs {
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
+ if ctx.Err() != nil {
return errConnClosing
}
+ ac.mu.Lock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
@@ -1217,7 +1413,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
- err := ac.createTransport(addr, copts, connectDeadline)
+ err := ac.createTransport(ctx, addr, copts, connectDeadline)
if err == nil {
return nil
}
@@ -1234,19 +1430,20 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
// createTransport creates a connection to addr. It returns an error if the
// address was not successfully connected, or updates ac appropriately with the
// new transport.
-func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
+func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
addr.ServerName = ac.cc.getServerName(addr)
- hctx, hcancel := context.WithCancel(ac.ctx)
+ hctx, hcancel := context.WithCancel(ctx)
onClose := func(r transport.GoAwayReason) {
ac.mu.Lock()
defer ac.mu.Unlock()
// adjust params based on GoAwayReason
ac.adjustParams(r)
- if ac.state == connectivity.Shutdown {
- // Already shut down. tearDown() already cleared the transport and
- // canceled hctx via ac.ctx, and we expected this connection to be
- // closed, so do nothing here.
+ if ctx.Err() != nil {
+ // Already shut down or connection attempt canceled. tearDown() or
+ // updateAddrs() already cleared the transport and canceled hctx
+ // via ac.ctx, and we expected this connection to be closed, so do
+ // nothing here.
return
}
hcancel()
@@ -1265,7 +1462,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
ac.updateConnectivityState(connectivity.Idle, nil)
}
- connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
+ connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
defer cancel()
copts.ChannelzParentID = ac.channelzID
@@ -1282,7 +1479,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
ac.mu.Lock()
defer ac.mu.Unlock()
- if ac.state == connectivity.Shutdown {
+ if ctx.Err() != nil {
// This can happen if the subConn was removed while in `Connecting`
// state. tearDown() would have set the state to `Shutdown`, but
// would not have closed the transport since ac.transport would not
@@ -1294,6 +1491,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
// The error we pass to Close() is immaterial since there are no open
// streams at this point, so no trailers with error details will be sent
// out. We just need to pass a non-nil error.
+ //
+ // This can also happen when updateAddrs is called during a connection
+ // attempt.
go newTr.Close(transport.ErrConnClosing)
return nil
}
@@ -1401,6 +1601,29 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {
return nil
}
+// getTransport waits until the addrconn is ready and returns the transport.
+// If the context expires first, returns an appropriate status. If the
+// addrConn is stopped first, returns an Unavailable status error.
+func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
+ for ctx.Err() == nil {
+ ac.mu.Lock()
+ t, state, sc := ac.transport, ac.state, ac.stateChan
+ ac.mu.Unlock()
+ if state == connectivity.Ready {
+ return t, nil
+ }
+ if state == connectivity.Shutdown {
+ return nil, status.Errorf(codes.Unavailable, "SubConn shutting down")
+ }
+
+ select {
+ case <-ctx.Done():
+ case <-sc:
+ }
+ }
+ return nil, status.FromContextError(ctx.Err()).Err()
+}
+
// tearDown starts to tear down the addrConn.
//
// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
@@ -1552,7 +1775,14 @@ func (cc *ClientConn) connectionError() error {
return cc.lastConnectionError
}
-func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
+// parseTargetAndFindResolver parses the user's dial target and stores the
+// parsed target in `cc.parsedTarget`.
+//
+// The resolver to use is determined based on the scheme in the parsed target
+// and the same is stored in `cc.resolverBuilder`.
+//
+// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
+func (cc *ClientConn) parseTargetAndFindResolver() error {
channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
var rb resolver.Builder
@@ -1564,7 +1794,8 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
- return rb, nil
+ cc.resolverBuilder = rb
+ return nil
}
}
@@ -1579,38 +1810,98 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
- return nil, err
+ return err
}
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb == nil {
- return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
+ return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
}
cc.parsedTarget = parsedTarget
- return rb, nil
+ cc.resolverBuilder = rb
+ return nil
}
// parseTarget uses RFC 3986 semantics to parse the given target into a
-// resolver.Target struct containing scheme, authority and url. Query
-// params are stripped from the endpoint.
+// resolver.Target struct containing url. Query params are stripped from the
+// endpoint.
func parseTarget(target string) (resolver.Target, error) {
u, err := url.Parse(target)
if err != nil {
return resolver.Target{}, err
}
- return resolver.Target{
- Scheme: u.Scheme,
- Authority: u.Host,
- URL: *u,
- }, nil
+ return resolver.Target{URL: *u}, nil
+}
+
+func encodeAuthority(authority string) string {
+ const upperhex = "0123456789ABCDEF"
+
+ // Return for characters that must be escaped as per
+ // Valid chars are mentioned here:
+ // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2
+ shouldEscape := func(c byte) bool {
+ // Alphanum are always allowed.
+ if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' {
+ return false
+ }
+ switch c {
+ case '-', '_', '.', '~': // Unreserved characters
+ return false
+ case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
+ return false
+ case ':', '[', ']', '@': // Authority related delimeters
+ return false
+ }
+ // Everything else must be escaped.
+ return true
+ }
+
+ hexCount := 0
+ for i := 0; i < len(authority); i++ {
+ c := authority[i]
+ if shouldEscape(c) {
+ hexCount++
+ }
+ }
+
+ if hexCount == 0 {
+ return authority
+ }
+
+ required := len(authority) + 2*hexCount
+ t := make([]byte, required)
+
+ j := 0
+ // This logic is a barebones version of escape in the go net/url library.
+ for i := 0; i < len(authority); i++ {
+ switch c := authority[i]; {
+ case shouldEscape(c):
+ t[j] = '%'
+ t[j+1] = upperhex[c>>4]
+ t[j+2] = upperhex[c&15]
+ j += 3
+ default:
+ t[j] = authority[i]
+ j++
+ }
+ }
+ return string(t)
}
// Determine channel authority. The order of precedence is as follows:
// - user specified authority override using `WithAuthority` dial option
// - creds' notion of server name for the authentication handshake
// - endpoint from dial target of the form "scheme://[authority]/endpoint"
-func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) {
+//
+// Stores the determined authority in `cc.authority`.
+//
+// Returns a non-nil error if the authority returned by the transport
+// credentials do not match the authority configured through the dial option.
+//
+// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
+func (cc *ClientConn) determineAuthority() error {
+ dopts := cc.dopts
// Historically, we had two options for users to specify the serverName or
// authority for a channel. One was through the transport credentials
// (either in its constructor, or through the OverrideServerName() method).
@@ -1627,25 +1918,62 @@ func determineAuthority(endpoint, target string, dopts dialOptions) (string, err
}
authorityFromDialOption := dopts.authority
if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
- return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
+ return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
}
+ endpoint := cc.parsedTarget.Endpoint()
+ target := cc.target
switch {
case authorityFromDialOption != "":
- return authorityFromDialOption, nil
+ cc.authority = authorityFromDialOption
case authorityFromCreds != "":
- return authorityFromCreds, nil
+ cc.authority = authorityFromCreds
case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
// TODO: remove when the unix resolver implements optional interface to
// return channel authority.
- return "localhost", nil
+ cc.authority = "localhost"
case strings.HasPrefix(endpoint, ":"):
- return "localhost" + endpoint, nil
+ cc.authority = "localhost" + endpoint
default:
// TODO: Define an optional interface on the resolver builder to return
// the channel authority given the user's dial target. For resolvers
// which don't implement this interface, we will use the endpoint from
// "scheme://authority/endpoint" as the default authority.
- return endpoint, nil
+ // Escape the endpoint to handle use cases where the endpoint
+ // might not be a valid authority by default.
+ // For example an endpoint which has multiple paths like
+ // 'a/b/c', which is not a valid authority by default.
+ cc.authority = encodeAuthority(endpoint)
}
+ channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
+ return nil
+}
+
+// initResolverWrapper creates a ccResolverWrapper, which builds the name
+// resolver. This method grabs the lock to assign the newly built resolver
+// wrapper to the cc.resolverWrapper field.
+func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error {
+ rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{
+ target: cc.parsedTarget,
+ builder: cc.resolverBuilder,
+ bOpts: resolver.BuildOptions{
+ DisableServiceConfig: cc.dopts.disableServiceConfig,
+ DialCreds: creds,
+ CredsBundle: cc.dopts.copts.CredsBundle,
+ Dialer: cc.dopts.copts.Dialer,
+ },
+ channelzID: cc.channelzID,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to build resolver: %v", err)
+ }
+ // Resolver implementations may report state update or error inline when
+ // built (or right after), and this is handled in cc.updateResolverState.
+ // Also, an error from the resolver might lead to a re-resolution request
+ // from the balancer, which is handled in resolveNow() where
+ // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here.
+ cc.mu.Lock()
+ cc.resolverWrapper = rw
+ cc.mu.Unlock()
+ return nil
}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index cdc8263bd..23ea95237 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -77,6 +77,8 @@ type dialOptions struct {
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
+ idleTimeout time.Duration
+ recvBufferPool SharedBufferPool
}
// DialOption configures how we set up the connection.
@@ -627,6 +629,7 @@ func defaultDialOptions() dialOptions {
ReadBufferSize: defaultReadBufSize,
UseProxy: true,
},
+ recvBufferPool: nopBufferPool{},
}
}
@@ -655,3 +658,44 @@ func WithResolvers(rs ...resolver.Builder) DialOption {
o.resolvers = append(o.resolvers, rs...)
})
}
+
+// WithIdleTimeout returns a DialOption that configures an idle timeout for the
+// channel. If the channel is idle for the configured timeout, i.e there are no
+// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode
+// and as a result the name resolver and load balancer will be shut down. The
+// channel will exit idle mode when the Connect() method is called or when an
+// RPC is initiated.
+//
+// By default this feature is disabled, which can also be explicitly configured
+// by passing zero to this function.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func WithIdleTimeout(d time.Duration) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.idleTimeout = d
+ })
+}
+
+// WithRecvBufferPool returns a DialOption that configures the ClientConn
+// to use the provided shared buffer pool for parsing incoming messages. Depending
+// on the application's workload, this could result in reduced memory allocation.
+//
+// If you are unsure about how to implement a memory pool but want to utilize one,
+// begin with grpc.NewSharedBufferPool.
+//
+// Note: The shared buffer pool feature will not be active if any of the following
+// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
+// cases, the shared buffer pool will be ignored.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.recvBufferPool = bufferPool
+ })
+}
diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go
new file mode 100644
index 000000000..142d35f75
--- /dev/null
+++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go
@@ -0,0 +1,308 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// The canonical version of this proto can be found at
+// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.30.0
+// protoc v4.22.0
+// source: grpc/health/v1/health.proto
+
+package grpc_health_v1
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type HealthCheckResponse_ServingStatus int32
+
+const (
+ HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0
+ HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1
+ HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2
+ HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 // Used only by the Watch method.
+)
+
+// Enum value maps for HealthCheckResponse_ServingStatus.
+var (
+ HealthCheckResponse_ServingStatus_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "SERVING",
+ 2: "NOT_SERVING",
+ 3: "SERVICE_UNKNOWN",
+ }
+ HealthCheckResponse_ServingStatus_value = map[string]int32{
+ "UNKNOWN": 0,
+ "SERVING": 1,
+ "NOT_SERVING": 2,
+ "SERVICE_UNKNOWN": 3,
+ }
+)
+
+func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus {
+ p := new(HealthCheckResponse_ServingStatus)
+ *p = x
+ return p
+}
+
+func (x HealthCheckResponse_ServingStatus) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor {
+ return file_grpc_health_v1_health_proto_enumTypes[0].Descriptor()
+}
+
+func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType {
+ return &file_grpc_health_v1_health_proto_enumTypes[0]
+}
+
+func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead.
+func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) {
+ return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{1, 0}
+}
+
+type HealthCheckRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
+}
+
+func (x *HealthCheckRequest) Reset() {
+ *x = HealthCheckRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_grpc_health_v1_health_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HealthCheckRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HealthCheckRequest) ProtoMessage() {}
+
+func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_grpc_health_v1_health_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead.
+func (*HealthCheckRequest) Descriptor() ([]byte, []int) {
+ return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *HealthCheckRequest) GetService() string {
+ if x != nil {
+ return x.Service
+ }
+ return ""
+}
+
+type HealthCheckResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"`
+}
+
+func (x *HealthCheckResponse) Reset() {
+ *x = HealthCheckResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_grpc_health_v1_health_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HealthCheckResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HealthCheckResponse) ProtoMessage() {}
+
+func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_grpc_health_v1_health_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead.
+func (*HealthCheckResponse) Descriptor() ([]byte, []int) {
+ return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
+ if x != nil {
+ return x.Status
+ }
+ return HealthCheckResponse_UNKNOWN
+}
+
+var File_grpc_health_v1_health_proto protoreflect.FileDescriptor
+
+var file_grpc_health_v1_health_proto_rawDesc = []byte{
+ 0x0a, 0x1b, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x76, 0x31,
+ 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x67,
+ 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x22, 0x2e, 0x0a,
+ 0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75,
+ 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xb1, 0x01,
+ 0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73,
+ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61,
+ 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65,
+ 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69,
+ 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
+ 0x22, 0x4f, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75,
+ 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b,
+ 0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e,
+ 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f,
+ 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10,
+ 0x03, 0x32, 0xae, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05,
+ 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61,
+ 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65,
+ 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63,
+ 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74,
+ 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52,
+ 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68,
+ 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43,
+ 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72,
+ 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61,
+ 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+ 0x30, 0x01, 0x42, 0x61, 0x0a, 0x11, 0x69, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65,
+ 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x42, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x50,
+ 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x67,
+ 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68,
+ 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74,
+ 0x68, 0x5f, 0x76, 0x31, 0xaa, 0x02, 0x0e, 0x47, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x6c,
+ 0x74, 0x68, 0x2e, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_grpc_health_v1_health_proto_rawDescOnce sync.Once
+ file_grpc_health_v1_health_proto_rawDescData = file_grpc_health_v1_health_proto_rawDesc
+)
+
+func file_grpc_health_v1_health_proto_rawDescGZIP() []byte {
+ file_grpc_health_v1_health_proto_rawDescOnce.Do(func() {
+ file_grpc_health_v1_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_health_v1_health_proto_rawDescData)
+ })
+ return file_grpc_health_v1_health_proto_rawDescData
+}
+
+var file_grpc_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_grpc_health_v1_health_proto_goTypes = []interface{}{
+ (HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus
+ (*HealthCheckRequest)(nil), // 1: grpc.health.v1.HealthCheckRequest
+ (*HealthCheckResponse)(nil), // 2: grpc.health.v1.HealthCheckResponse
+}
+var file_grpc_health_v1_health_proto_depIdxs = []int32{
+ 0, // 0: grpc.health.v1.HealthCheckResponse.status:type_name -> grpc.health.v1.HealthCheckResponse.ServingStatus
+ 1, // 1: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest
+ 1, // 2: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest
+ 2, // 3: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse
+ 2, // 4: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse
+ 3, // [3:5] is the sub-list for method output_type
+ 1, // [1:3] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_grpc_health_v1_health_proto_init() }
+func file_grpc_health_v1_health_proto_init() {
+ if File_grpc_health_v1_health_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_grpc_health_v1_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HealthCheckRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_grpc_health_v1_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HealthCheckResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_grpc_health_v1_health_proto_rawDesc,
+ NumEnums: 1,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 1,
+ },
+ GoTypes: file_grpc_health_v1_health_proto_goTypes,
+ DependencyIndexes: file_grpc_health_v1_health_proto_depIdxs,
+ EnumInfos: file_grpc_health_v1_health_proto_enumTypes,
+ MessageInfos: file_grpc_health_v1_health_proto_msgTypes,
+ }.Build()
+ File_grpc_health_v1_health_proto = out.File
+ file_grpc_health_v1_health_proto_rawDesc = nil
+ file_grpc_health_v1_health_proto_goTypes = nil
+ file_grpc_health_v1_health_proto_depIdxs = nil
+}
diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go
new file mode 100644
index 000000000..a01a1b4d5
--- /dev/null
+++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go
@@ -0,0 +1,223 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// The canonical version of this proto can be found at
+// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
+
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.3.0
+// - protoc v4.22.0
+// source: grpc/health/v1/health.proto
+
+package grpc_health_v1
+
+import (
+ context "context"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.32.0 or later.
+const _ = grpc.SupportPackageIsVersion7
+
+const (
+ Health_Check_FullMethodName = "/grpc.health.v1.Health/Check"
+ Health_Watch_FullMethodName = "/grpc.health.v1.Health/Watch"
+)
+
+// HealthClient is the client API for Health service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type HealthClient interface {
+ // If the requested service is unknown, the call will fail with status
+ // NOT_FOUND.
+ Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
+ // Performs a watch for the serving status of the requested service.
+ // The server will immediately send back a message indicating the current
+ // serving status. It will then subsequently send a new message whenever
+ // the service's serving status changes.
+ //
+ // If the requested service is unknown when the call is received, the
+ // server will send a message setting the serving status to
+ // SERVICE_UNKNOWN but will *not* terminate the call. If at some
+ // future point, the serving status of the service becomes known, the
+ // server will send a new message with the service's serving status.
+ //
+ // If the call terminates with status UNIMPLEMENTED, then clients
+ // should assume this method is not supported and should not retry the
+ // call. If the call terminates with any other status (including OK),
+ // clients should retry the call with appropriate exponential backoff.
+ Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error)
+}
+
+type healthClient struct {
+ cc grpc.ClientConnInterface
+}
+
+func NewHealthClient(cc grpc.ClientConnInterface) HealthClient {
+ return &healthClient{cc}
+}
+
+func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
+ out := new(HealthCheckResponse)
+ err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) {
+ stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &healthWatchClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Health_WatchClient interface {
+ Recv() (*HealthCheckResponse, error)
+ grpc.ClientStream
+}
+
+type healthWatchClient struct {
+ grpc.ClientStream
+}
+
+func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) {
+ m := new(HealthCheckResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+// HealthServer is the server API for Health service.
+// All implementations should embed UnimplementedHealthServer
+// for forward compatibility
+type HealthServer interface {
+ // If the requested service is unknown, the call will fail with status
+ // NOT_FOUND.
+ Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
+ // Performs a watch for the serving status of the requested service.
+ // The server will immediately send back a message indicating the current
+ // serving status. It will then subsequently send a new message whenever
+ // the service's serving status changes.
+ //
+ // If the requested service is unknown when the call is received, the
+ // server will send a message setting the serving status to
+ // SERVICE_UNKNOWN but will *not* terminate the call. If at some
+ // future point, the serving status of the service becomes known, the
+ // server will send a new message with the service's serving status.
+ //
+ // If the call terminates with status UNIMPLEMENTED, then clients
+ // should assume this method is not supported and should not retry the
+ // call. If the call terminates with any other status (including OK),
+ // clients should retry the call with appropriate exponential backoff.
+ Watch(*HealthCheckRequest, Health_WatchServer) error
+}
+
+// UnimplementedHealthServer should be embedded to have forward compatible implementations.
+type UnimplementedHealthServer struct {
+}
+
+func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
+}
+func (UnimplementedHealthServer) Watch(*HealthCheckRequest, Health_WatchServer) error {
+ return status.Errorf(codes.Unimplemented, "method Watch not implemented")
+}
+
+// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service.
+// Use of this interface is not recommended, as added methods to HealthServer will
+// result in compilation errors.
+type UnsafeHealthServer interface {
+ mustEmbedUnimplementedHealthServer()
+}
+
+func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) {
+ s.RegisterService(&Health_ServiceDesc, srv)
+}
+
+func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(HealthCheckRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(HealthServer).Check(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: Health_Check_FullMethodName,
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(HealthCheckRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(HealthServer).Watch(m, &healthWatchServer{stream})
+}
+
+type Health_WatchServer interface {
+ Send(*HealthCheckResponse) error
+ grpc.ServerStream
+}
+
+type healthWatchServer struct {
+ grpc.ServerStream
+}
+
+func (x *healthWatchServer) Send(m *HealthCheckResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+// Health_ServiceDesc is the grpc.ServiceDesc for Health service.
+// It's only intended for direct use with grpc.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var Health_ServiceDesc = grpc.ServiceDesc{
+ ServiceName: "grpc.health.v1.Health",
+ HandlerType: (*HealthServer)(nil),
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "Check",
+ Handler: _Health_Check_Handler,
+ },
+ },
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "Watch",
+ Handler: _Health_Watch_Handler,
+ ServerStreams: true,
+ },
+ },
+ Metadata: "grpc/health/v1/health.proto",
+}
diff --git a/vendor/google.golang.org/grpc/idle.go b/vendor/google.golang.org/grpc/idle.go
new file mode 100644
index 000000000..dc3dc72f6
--- /dev/null
+++ b/vendor/google.golang.org/grpc/idle.go
@@ -0,0 +1,287 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// For overriding in unit tests.
+var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
+ return time.AfterFunc(d, f)
+}
+
+// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter
+// and exit from idle mode.
+type idlenessEnforcer interface {
+ exitIdleMode() error
+ enterIdleMode() error
+}
+
+// idlenessManager defines the functionality required to track RPC activity on a
+// channel.
+type idlenessManager interface {
+ onCallBegin() error
+ onCallEnd()
+ close()
+}
+
+type noopIdlenessManager struct{}
+
+func (noopIdlenessManager) onCallBegin() error { return nil }
+func (noopIdlenessManager) onCallEnd() {}
+func (noopIdlenessManager) close() {}
+
+// idlenessManagerImpl implements the idlenessManager interface. It uses atomic
+// operations to synchronize access to shared state and a mutex to guarantee
+// mutual exclusion in a critical section.
+type idlenessManagerImpl struct {
+ // State accessed atomically.
+ lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
+ activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
+ activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
+ closed int32 // Boolean; True when the manager is closed.
+
+ // Can be accessed without atomics or mutex since these are set at creation
+ // time and read-only after that.
+ enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
+ timeout int64 // Idle timeout duration nanos stored as an int64.
+
+ // idleMu is used to guarantee mutual exclusion in two scenarios:
+ // - Opposing intentions:
+ // - a: Idle timeout has fired and handleIdleTimeout() is trying to put
+ // the channel in idle mode because the channel has been inactive.
+ // - b: At the same time an RPC is made on the channel, and onCallBegin()
+ // is trying to prevent the channel from going idle.
+ // - Competing intentions:
+ // - The channel is in idle mode and there are multiple RPCs starting at
+ // the same time, all trying to move the channel out of idle. Only one
+ // of them should succeed in doing so, while the other RPCs should
+ // piggyback on the first one and be successfully handled.
+ idleMu sync.RWMutex
+ actuallyIdle bool
+ timer *time.Timer
+}
+
+// newIdlenessManager creates a new idleness manager implementation for the
+// given idle timeout.
+func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
+ if idleTimeout == 0 {
+ return noopIdlenessManager{}
+ }
+
+ i := &idlenessManagerImpl{
+ enforcer: enforcer,
+ timeout: int64(idleTimeout),
+ }
+ i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout)
+ return i
+}
+
+// resetIdleTimer resets the idle timer to the given duration. This method
+// should only be called from the timer callback.
+func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) {
+ i.idleMu.Lock()
+ defer i.idleMu.Unlock()
+
+ if i.timer == nil {
+ // Only close sets timer to nil. We are done.
+ return
+ }
+
+ // It is safe to ignore the return value from Reset() because this method is
+ // only ever called from the timer callback, which means the timer has
+ // already fired.
+ i.timer.Reset(d)
+}
+
+// handleIdleTimeout is the timer callback that is invoked upon expiry of the
+// configured idle timeout. The channel is considered inactive if there are no
+// ongoing calls and no RPC activity since the last time the timer fired.
+func (i *idlenessManagerImpl) handleIdleTimeout() {
+ if i.isClosed() {
+ return
+ }
+
+ if atomic.LoadInt32(&i.activeCallsCount) > 0 {
+ i.resetIdleTimer(time.Duration(i.timeout))
+ return
+ }
+
+ // There has been activity on the channel since we last got here. Reset the
+ // timer and return.
+ if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
+ // Set the timer to fire after a duration of idle timeout, calculated
+ // from the time the most recent RPC completed.
+ atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0)
+ i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano()))
+ return
+ }
+
+ // This CAS operation is extremely likely to succeed given that there has
+ // been no activity since the last time we were here. Setting the
+ // activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the
+ // channel is either in idle mode or is trying to get there.
+ if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) {
+ // This CAS operation can fail if an RPC started after we checked for
+ // activity at the top of this method, or one was ongoing from before
+ // the last time we were here. In both case, reset the timer and return.
+ i.resetIdleTimer(time.Duration(i.timeout))
+ return
+ }
+
+ // Now that we've set the active calls count to -math.MaxInt32, it's time to
+ // actually move to idle mode.
+ if i.tryEnterIdleMode() {
+ // Successfully entered idle mode. No timer needed until we exit idle.
+ return
+ }
+
+ // Failed to enter idle mode due to a concurrent RPC that kept the channel
+ // active, or because of an error from the channel. Undo the attempt to
+ // enter idle, and reset the timer to try again later.
+ atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
+ i.resetIdleTimer(time.Duration(i.timeout))
+}
+
+// tryEnterIdleMode instructs the channel to enter idle mode. But before
+// that, it performs a last minute check to ensure that no new RPC has come in,
+// making the channel active.
+//
+// Return value indicates whether or not the channel moved to idle mode.
+//
+// Holds idleMu which ensures mutual exclusion with exitIdleMode.
+func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
+ i.idleMu.Lock()
+ defer i.idleMu.Unlock()
+
+ if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 {
+ // We raced and lost to a new RPC. Very rare, but stop entering idle.
+ return false
+ }
+ if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
+ // An very short RPC could have come in (and also finished) after we
+ // checked for calls count and activity in handleIdleTimeout(), but
+ // before the CAS operation. So, we need to check for activity again.
+ return false
+ }
+
+ // No new RPCs have come in since we last set the active calls count value
+ // -math.MaxInt32 in the timer callback. And since we have the lock, it is
+ // safe to enter idle mode now.
+ if err := i.enforcer.enterIdleMode(); err != nil {
+ logger.Errorf("Failed to enter idle mode: %v", err)
+ return false
+ }
+
+ // Successfully entered idle mode.
+ i.actuallyIdle = true
+ return true
+}
+
+// onCallBegin is invoked at the start of every RPC.
+func (i *idlenessManagerImpl) onCallBegin() error {
+ if i.isClosed() {
+ return nil
+ }
+
+ if atomic.AddInt32(&i.activeCallsCount, 1) > 0 {
+ // Channel is not idle now. Set the activity bit and allow the call.
+ atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
+ return nil
+ }
+
+ // Channel is either in idle mode or is in the process of moving to idle
+ // mode. Attempt to exit idle mode to allow this RPC.
+ if err := i.exitIdleMode(); err != nil {
+ // Undo the increment to calls count, and return an error causing the
+ // RPC to fail.
+ atomic.AddInt32(&i.activeCallsCount, -1)
+ return err
+ }
+
+ atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
+ return nil
+}
+
+// exitIdleMode instructs the channel to exit idle mode.
+//
+// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
+func (i *idlenessManagerImpl) exitIdleMode() error {
+ i.idleMu.Lock()
+ defer i.idleMu.Unlock()
+
+ if !i.actuallyIdle {
+ // This can happen in two scenarios:
+ // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
+ // tryEnterIdleMode(). But before the latter could grab the lock, an RPC
+ // came in and onCallBegin() noticed that the calls count is negative.
+ // - Channel is in idle mode, and multiple new RPCs come in at the same
+ // time, all of them notice a negative calls count in onCallBegin and get
+ // here. The first one to get the lock would got the channel to exit idle.
+ //
+ // Either way, nothing to do here.
+ return nil
+ }
+
+ if err := i.enforcer.exitIdleMode(); err != nil {
+ return fmt.Errorf("channel failed to exit idle mode: %v", err)
+ }
+
+ // Undo the idle entry process. This also respects any new RPC attempts.
+ atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
+ i.actuallyIdle = false
+
+ // Start a new timer to fire after the configured idle timeout.
+ i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout)
+ return nil
+}
+
+// onCallEnd is invoked at the end of every RPC.
+func (i *idlenessManagerImpl) onCallEnd() {
+ if i.isClosed() {
+ return
+ }
+
+ // Record the time at which the most recent call finished.
+ atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano())
+
+ // Decrement the active calls count. This count can temporarily go negative
+ // when the timer callback is in the process of moving the channel to idle
+ // mode, but one or more RPCs come in and complete before the timer callback
+ // can get done with the process of moving to idle mode.
+ atomic.AddInt32(&i.activeCallsCount, -1)
+}
+
+func (i *idlenessManagerImpl) isClosed() bool {
+ return atomic.LoadInt32(&i.closed) == 1
+}
+
+func (i *idlenessManagerImpl) close() {
+ atomic.StoreInt32(&i.closed, 1)
+
+ i.idleMu.Lock()
+ i.timer.Stop()
+ i.timer = nil
+ i.idleMu.Unlock()
+}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index af03a40d9..755fdebc1 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -32,6 +32,9 @@ var grpclogLogger = grpclog.Component("binarylog")
// Logger specifies MethodLoggers for method names with a Log call that
// takes a context.
+//
+// This is used in the 1.0 release of gcp/observability, and thus must not be
+// deleted or changed.
type Logger interface {
GetMethodLogger(methodName string) MethodLogger
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
index 56fcf008d..6c3f63221 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
@@ -49,6 +49,9 @@ func (g *callIDGenerator) reset() {
var idGen callIDGenerator
// MethodLogger is the sub-logger for each method.
+//
+// This is used in the 1.0 release of gcp/observability, and thus must not be
+// deleted or changed.
type MethodLogger interface {
Log(context.Context, LogEntryConfig)
}
@@ -65,6 +68,9 @@ type TruncatingMethodLogger struct {
}
// NewTruncatingMethodLogger returns a new truncating method logger.
+//
+// This is used in the 1.0 release of gcp/observability, and thus must not be
+// deleted or changed.
func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
return &TruncatingMethodLogger{
headerMaxLen: h,
@@ -145,6 +151,9 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (trun
}
// LogEntryConfig represents the configuration for binary log entry.
+//
+// This is used in the 1.0 release of gcp/observability, and thus must not be
+// deleted or changed.
type LogEntryConfig interface {
toProto() *binlogpb.GrpcLogEntry
}
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
index 9f6a0c120..81c2f5fd7 100644
--- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -35,6 +35,7 @@ import "sync"
// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
+ closed bool
mu sync.Mutex
backlog []interface{}
}
@@ -47,16 +48,18 @@ func NewUnbounded() *Unbounded {
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t interface{}) {
b.mu.Lock()
+ defer b.mu.Unlock()
+ if b.closed {
+ return
+ }
if len(b.backlog) == 0 {
select {
case b.c <- t:
- b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
- b.mu.Unlock()
}
// Load sends the earliest buffered data, if any, onto the read channel
@@ -64,6 +67,10 @@ func (b *Unbounded) Put(t interface{}) {
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
+ defer b.mu.Unlock()
+ if b.closed {
+ return
+ }
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
@@ -72,7 +79,6 @@ func (b *Unbounded) Load() {
default:
}
}
- b.mu.Unlock()
}
// Get returns a read channel on which values added to the buffer, via Put(),
@@ -80,6 +86,20 @@ func (b *Unbounded) Load() {
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
+//
+// If the unbounded buffer is closed, the read channel returned by this method
+// is closed.
func (b *Unbounded) Get() <-chan interface{} {
return b.c
}
+
+// Close closes the unbounded buffer.
+func (b *Unbounded) Close() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ if b.closed {
+ return
+ }
+ b.closed = true
+ close(b.c)
+}
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 5ba9d94d4..77c2c0b89 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -36,6 +36,13 @@ var (
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
+ // PickFirstLBConfig is set if we should support configuration of the
+ // pick_first LB policy, which can be enabled by setting the environment
+ // variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true".
+ PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false)
+ // ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
+ // handshakes that can be performed.
+ ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)
)
func boolFromEnv(envVar string, def bool) bool {
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/observability.go b/vendor/google.golang.org/grpc/internal/envconfig/observability.go
index 821dd0a7c..dd314cfb1 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/observability.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/observability.go
@@ -28,9 +28,15 @@ const (
var (
// ObservabilityConfig is the json configuration for the gcp/observability
// package specified directly in the envObservabilityConfig env var.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
ObservabilityConfig = os.Getenv(envObservabilityConfig)
// ObservabilityConfigFile is the json configuration for the
// gcp/observability specified in a file with the location specified in
// envObservabilityConfigFile env var.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)
)
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
index 3b17705ba..02b4b6a1c 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
@@ -61,11 +61,10 @@ var (
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true)
- // XDSAggregateAndDNS indicates whether processing of aggregated cluster
- // and DNS cluster is enabled, which can be enabled by setting the
- // environment variable
- // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
- // "true".
+ // XDSAggregateAndDNS indicates whether processing of aggregated cluster and
+ // DNS cluster is enabled, which can be disabled by setting the environment
+ // variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
+ // to "false".
XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
@@ -82,11 +81,15 @@ var (
XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true)
// XDSRLS indicates whether processing of Cluster Specifier plugins and
- // support for the RLS CLuster Specifier is enabled, which can be enabled by
+ // support for the RLS CLuster Specifier is enabled, which can be disabled by
// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to
- // "true".
- XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", false)
+ // "false".
+ XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
+ // XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which
+ // can be disabled by setting the environment variable
+ // "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false".
+ XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)
)
diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
index 517ea7064..aa97273e7 100644
--- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
+++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
@@ -72,3 +72,24 @@ func Uint64() uint64 {
defer mu.Unlock()
return r.Uint64()
}
+
+// Uint32 implements rand.Uint32 on the grpcrand global source.
+func Uint32() uint32 {
+ mu.Lock()
+ defer mu.Unlock()
+ return r.Uint32()
+}
+
+// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source.
+func ExpFloat64() float64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return r.ExpFloat64()
+}
+
+// Shuffle implements rand.Shuffle on the grpcrand global source.
+var Shuffle = func(n int, f func(int, int)) {
+ mu.Lock()
+ defer mu.Unlock()
+ r.Shuffle(n, f)
+}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
index 79993d343..37b8d4117 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
@@ -20,6 +20,7 @@ package grpcsync
import (
"context"
+ "sync"
"google.golang.org/grpc/internal/buffer"
)
@@ -31,15 +32,26 @@ import (
//
// This type is safe for concurrent access.
type CallbackSerializer struct {
+ // Done is closed once the serializer is shut down completely, i.e all
+ // scheduled callbacks are executed and the serializer has deallocated all
+ // its resources.
+ Done chan struct{}
+
callbacks *buffer.Unbounded
+ closedMu sync.Mutex
+ closed bool
}
// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the CallbackSerializer. It is guaranteed that no
-// callbacks will be executed once this context is canceled.
+// callbacks will be added once this context is canceled, and any pending un-run
+// callbacks will be executed before the serializer is shut down.
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
- t := &CallbackSerializer{callbacks: buffer.NewUnbounded()}
+ t := &CallbackSerializer{
+ Done: make(chan struct{}),
+ callbacks: buffer.NewUnbounded(),
+ }
go t.run(ctx)
return t
}
@@ -48,18 +60,60 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
-func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) {
+//
+// Return value indicates if the callback was successfully added to the list of
+// callbacks to be executed by the serializer. It is not possible to add
+// callbacks once the context passed to NewCallbackSerializer is cancelled.
+func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
+ t.closedMu.Lock()
+ defer t.closedMu.Unlock()
+
+ if t.closed {
+ return false
+ }
t.callbacks.Put(f)
+ return true
}
func (t *CallbackSerializer) run(ctx context.Context) {
+ var backlog []func(context.Context)
+
+ defer close(t.Done)
for ctx.Err() == nil {
select {
case <-ctx.Done():
- return
- case callback := <-t.callbacks.Get():
+ // Do nothing here. Next iteration of the for loop will not happen,
+ // since ctx.Err() would be non-nil.
+ case callback, ok := <-t.callbacks.Get():
+ if !ok {
+ return
+ }
t.callbacks.Load()
callback.(func(ctx context.Context))(ctx)
}
}
+
+ // Fetch pending callbacks if any, and execute them before returning from
+ // this method and closing t.Done.
+ t.closedMu.Lock()
+ t.closed = true
+ backlog = t.fetchPendingCallbacks()
+ t.callbacks.Close()
+ t.closedMu.Unlock()
+ for _, b := range backlog {
+ b(ctx)
+ }
+}
+
+func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
+ var backlog []func(context.Context)
+ for {
+ select {
+ case b := <-t.callbacks.Get():
+ backlog = append(backlog, b.(func(context.Context)))
+ t.callbacks.Load()
+ default:
+ return backlog
+ }
+ }
}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
new file mode 100644
index 000000000..f58b5ffa6
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
@@ -0,0 +1,136 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpcsync
+
+import (
+ "context"
+ "sync"
+)
+
+// Subscriber represents an entity that is subscribed to messages published on
+// a PubSub. It wraps the callback to be invoked by the PubSub when a new
+// message is published.
+type Subscriber interface {
+ // OnMessage is invoked when a new message is published. Implementations
+ // must not block in this method.
+ OnMessage(msg interface{})
+}
+
+// PubSub is a simple one-to-many publish-subscribe system that supports
+// messages of arbitrary type. It guarantees that messages are delivered in
+// the same order in which they were published.
+//
+// Publisher invokes the Publish() method to publish new messages, while
+// subscribers interested in receiving these messages register a callback
+// via the Subscribe() method.
+//
+// Once a PubSub is stopped, no more messages can be published, and
+// it is guaranteed that no more subscriber callback will be invoked.
+type PubSub struct {
+ cs *CallbackSerializer
+ cancel context.CancelFunc
+
+ // Access to the below fields are guarded by this mutex.
+ mu sync.Mutex
+ msg interface{}
+ subscribers map[Subscriber]bool
+ stopped bool
+}
+
+// NewPubSub returns a new PubSub instance.
+func NewPubSub() *PubSub {
+ ctx, cancel := context.WithCancel(context.Background())
+ return &PubSub{
+ cs: NewCallbackSerializer(ctx),
+ cancel: cancel,
+ subscribers: map[Subscriber]bool{},
+ }
+}
+
+// Subscribe registers the provided Subscriber to the PubSub.
+//
+// If the PubSub contains a previously published message, the Subscriber's
+// OnMessage() callback will be invoked asynchronously with the existing
+// message to begin with, and subsequently for every newly published message.
+//
+// The caller is responsible for invoking the returned cancel function to
+// unsubscribe itself from the PubSub.
+func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
+ ps.mu.Lock()
+ defer ps.mu.Unlock()
+
+ if ps.stopped {
+ return func() {}
+ }
+
+ ps.subscribers[sub] = true
+
+ if ps.msg != nil {
+ msg := ps.msg
+ ps.cs.Schedule(func(context.Context) {
+ ps.mu.Lock()
+ defer ps.mu.Unlock()
+ if !ps.subscribers[sub] {
+ return
+ }
+ sub.OnMessage(msg)
+ })
+ }
+
+ return func() {
+ ps.mu.Lock()
+ defer ps.mu.Unlock()
+ delete(ps.subscribers, sub)
+ }
+}
+
+// Publish publishes the provided message to the PubSub, and invokes
+// callbacks registered by subscribers asynchronously.
+func (ps *PubSub) Publish(msg interface{}) {
+ ps.mu.Lock()
+ defer ps.mu.Unlock()
+
+ if ps.stopped {
+ return
+ }
+
+ ps.msg = msg
+ for sub := range ps.subscribers {
+ s := sub
+ ps.cs.Schedule(func(context.Context) {
+ ps.mu.Lock()
+ defer ps.mu.Unlock()
+ if !ps.subscribers[s] {
+ return
+ }
+ s.OnMessage(msg)
+ })
+ }
+}
+
+// Stop shuts down the PubSub and releases any resources allocated by it.
+// It is guaranteed that no subscriber callbacks would be invoked once this
+// method returns.
+func (ps *PubSub) Stop() {
+ ps.mu.Lock()
+ defer ps.mu.Unlock()
+ ps.stopped = true
+
+ ps.cancel()
+}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 836b6a3b3..42ff39c84 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -60,6 +60,9 @@ var (
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
CanonicalString interface{} // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
@@ -69,20 +72,35 @@ var (
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
ClearGlobalServerOptions func()
// AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
AddGlobalDialOptions interface{} // func(opt ...DialOption)
// DisableGlobalDialOptions returns a DialOption that prevents the
// ClientConn from applying the global DialOptions (set via
// AddGlobalDialOptions).
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
DisableGlobalDialOptions interface{} // func() grpc.DialOption
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
ClearGlobalDialOptions func()
// JoinDialOptions combines the dial options passed as arguments into a
// single dial option.
@@ -93,9 +111,15 @@ var (
// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
+ //
+ // This is used in the 1.0 release of gcp/observability, and thus must not be
+ // deleted or changed.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index 09a667f33..99e1e5b36 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -62,7 +62,8 @@ const (
defaultPort = "443"
defaultDNSSvrPort = "53"
golang = "GO"
- // txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
+ // txtPrefix is the prefix string to be prepended to the host name for txt
+ // record lookup.
txtPrefix = "_grpc_config."
// In DNS, service config is encoded in a TXT record via the mechanism
// described in RFC-1464 using the attribute name grpc_config.
@@ -86,14 +87,14 @@ var (
minDNSResRate = 30 * time.Second
)
-var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) {
- return func(ctx context.Context, network, address string) (net.Conn, error) {
+var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {
+ return func(ctx context.Context, network, _ string) (net.Conn, error) {
var dialer net.Dialer
- return dialer.DialContext(ctx, network, authority)
+ return dialer.DialContext(ctx, network, address)
}
}
-var customAuthorityResolver = func(authority string) (netResolver, error) {
+var newNetResolver = func(authority string) (netResolver, error) {
host, port, err := parseTarget(authority, defaultDNSSvrPort)
if err != nil {
return nil, err
@@ -103,7 +104,7 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {
return &net.Resolver{
PreferGo: true,
- Dial: customAuthorityDialler(authorityWithPort),
+ Dial: addressDialer(authorityWithPort),
}, nil
}
@@ -114,7 +115,8 @@ func NewBuilder() resolver.Builder {
type dnsBuilder struct{}
-// Build creates and starts a DNS resolver that watches the name resolution of the target.
+// Build creates and starts a DNS resolver that watches the name resolution of
+// the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint(), defaultPort)
if err != nil {
@@ -143,7 +145,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
if target.URL.Host == "" {
d.resolver = defaultResolver
} else {
- d.resolver, err = customAuthorityResolver(target.URL.Host)
+ d.resolver, err = newNetResolver(target.URL.Host)
if err != nil {
return nil, err
}
@@ -180,19 +182,22 @@ type dnsResolver struct {
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
- // rn channel is used by ResolveNow() to force an immediate resolution of the target.
+ // rn channel is used by ResolveNow() to force an immediate resolution of the
+ // target.
rn chan struct{}
- // wg is used to enforce Close() to return after the watcher() goroutine has finished.
- // Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
- // replace the real lookup functions with mocked ones to facilitate testing.
- // If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
- // will warns lookup (READ the lookup function pointers) inside watcher() goroutine
- // has data race with replaceNetFunc (WRITE the lookup function pointers).
+ // wg is used to enforce Close() to return after the watcher() goroutine has
+ // finished. Otherwise, data race will be possible. [Race Example] in
+ // dns_resolver_test we replace the real lookup functions with mocked ones to
+ // facilitate testing. If Close() doesn't wait for watcher() goroutine
+ // finishes, race detector sometimes will warns lookup (READ the lookup
+ // function pointers) inside watcher() goroutine has data race with
+ // replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
disableServiceConfig bool
}
-// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
+// ResolveNow invoke an immediate resolution of the target that this
+// dnsResolver watches.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
@@ -220,8 +225,8 @@ func (d *dnsResolver) watcher() {
var timer *time.Timer
if err == nil {
- // Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
- // to prevent constantly re-resolving.
+ // Success resolving, wait for the next ResolveNow. However, also wait 30
+ // seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
select {
@@ -231,7 +236,8 @@ func (d *dnsResolver) watcher() {
case <-d.rn:
}
} else {
- // Poll on an error found in DNS Resolver or an error received from ClientConn.
+ // Poll on an error found in DNS Resolver or an error received from
+ // ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
backoffIndex++
}
@@ -278,7 +284,8 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
}
func handleDNSError(err error, lookupType string) error {
- if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
+ dnsErr, ok := err.(*net.DNSError)
+ if ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
// Timeouts and temporary errors should be communicated to gRPC to
// attempt another DNS query (with backoff). Other errors should be
// suppressed (they may represent the absence of a TXT record).
@@ -307,10 +314,12 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
res += s
}
- // TXT record must have "grpc_config=" attribute in order to be used as service config.
+ // TXT record must have "grpc_config=" attribute in order to be used as
+ // service config.
if !strings.HasPrefix(res, txtAttribute) {
logger.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
- // This is not an error; it is the equivalent of not having a service config.
+ // This is not an error; it is the equivalent of not having a service
+ // config.
return nil
}
sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
@@ -352,9 +361,10 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {
return &state, nil
}
-// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
-// If addr is an IPv4 address, return the addr and ok = true.
-// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true.
+// formatIP returns ok = false if addr is not a valid textual representation of
+// an IP address. If addr is an IPv4 address, return the addr and ok = true.
+// If addr is an IPv6 address, return the addr enclosed in square brackets and
+// ok = true.
func formatIP(addr string) (addrIP string, ok bool) {
ip := net.ParseIP(addr)
if ip == nil {
@@ -366,10 +376,10 @@ func formatIP(addr string) (addrIP string, ok bool) {
return "[" + addr + "]", true
}
-// parseTarget takes the user input target string and default port, returns formatted host and port info.
-// If target doesn't specify a port, set the port to be the defaultPort.
-// If target is in IPv6 format and host-name is enclosed in square brackets, brackets
-// are stripped when setting the host.
+// parseTarget takes the user input target string and default port, returns
+// formatted host and port info. If target doesn't specify a port, set the port
+// to be the defaultPort. If target is in IPv6 format and host-name is enclosed
+// in square brackets, brackets are stripped when setting the host.
// examples:
// target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"
// target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80"
@@ -385,12 +395,14 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {
}
if host, port, err = net.SplitHostPort(target); err == nil {
if port == "" {
- // If the port field is empty (target ends with colon), e.g. "[::1]:", this is an error.
+ // If the port field is empty (target ends with colon), e.g. "[::1]:",
+ // this is an error.
return "", "", errEndsWithColon
}
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
- // Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed.
+ // Keep consistent with net.Dial(): If the host is empty, as in ":80",
+ // the local system is assumed.
host = "localhost"
}
return host, port, nil
diff --git a/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go b/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go
new file mode 100644
index 000000000..11d82afcc
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package serviceconfig
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// Duration defines JSON marshal and unmarshal methods to conform to the
+// protobuf JSON spec defined [here].
+//
+// [here]: https://protobuf.dev/reference/protobuf/google.protobuf/#duration
+type Duration time.Duration
+
+func (d Duration) String() string {
+ return fmt.Sprint(time.Duration(d))
+}
+
+// MarshalJSON converts from d to a JSON string output.
+func (d Duration) MarshalJSON() ([]byte, error) {
+ ns := time.Duration(d).Nanoseconds()
+ sec := ns / int64(time.Second)
+ ns = ns % int64(time.Second)
+
+ var sign string
+ if sec < 0 || ns < 0 {
+ sign, sec, ns = "-", -1*sec, -1*ns
+ }
+
+ // Generated output always contains 0, 3, 6, or 9 fractional digits,
+ // depending on required precision.
+ str := fmt.Sprintf("%s%d.%09d", sign, sec, ns)
+ str = strings.TrimSuffix(str, "000")
+ str = strings.TrimSuffix(str, "000")
+ str = strings.TrimSuffix(str, ".000")
+ return []byte(fmt.Sprintf("\"%ss\"", str)), nil
+}
+
+// UnmarshalJSON unmarshals b as a duration JSON string into d.
+func (d *Duration) UnmarshalJSON(b []byte) error {
+ var s string
+ if err := json.Unmarshal(b, &s); err != nil {
+ return err
+ }
+ if !strings.HasSuffix(s, "s") {
+ return fmt.Errorf("malformed duration %q: missing seconds unit", s)
+ }
+ neg := false
+ if s[0] == '-' {
+ neg = true
+ s = s[1:]
+ }
+ ss := strings.SplitN(s[:len(s)-1], ".", 3)
+ if len(ss) > 2 {
+ return fmt.Errorf("malformed duration %q: too many decimals", s)
+ }
+ // hasDigits is set if either the whole or fractional part of the number is
+ // present, since both are optional but one is required.
+ hasDigits := false
+ var sec, ns int64
+ if len(ss[0]) > 0 {
+ var err error
+ if sec, err = strconv.ParseInt(ss[0], 10, 64); err != nil {
+ return fmt.Errorf("malformed duration %q: %v", s, err)
+ }
+ // Maximum seconds value per the durationpb spec.
+ const maxProtoSeconds = 315_576_000_000
+ if sec > maxProtoSeconds {
+ return fmt.Errorf("out of range: %q", s)
+ }
+ hasDigits = true
+ }
+ if len(ss) == 2 && len(ss[1]) > 0 {
+ if len(ss[1]) > 9 {
+ return fmt.Errorf("malformed duration %q: too many digits after decimal", s)
+ }
+ var err error
+ if ns, err = strconv.ParseInt(ss[1], 10, 64); err != nil {
+ return fmt.Errorf("malformed duration %q: %v", s, err)
+ }
+ for i := 9; i > len(ss[1]); i-- {
+ ns *= 10
+ }
+ hasDigits = true
+ }
+ if !hasDigits {
+ return fmt.Errorf("malformed duration %q: contains no numbers", s)
+ }
+
+ if neg {
+ sec *= -1
+ ns *= -1
+ }
+
+ // Maximum/minimum seconds/nanoseconds representable by Go's time.Duration.
+ const maxSeconds = math.MaxInt64 / int64(time.Second)
+ const maxNanosAtMaxSeconds = math.MaxInt64 % int64(time.Second)
+ const minSeconds = math.MinInt64 / int64(time.Second)
+ const minNanosAtMinSeconds = math.MinInt64 % int64(time.Second)
+
+ if sec > maxSeconds || (sec == maxSeconds && ns >= maxNanosAtMaxSeconds) {
+ *d = Duration(math.MaxInt64)
+ } else if sec < minSeconds || (sec == minSeconds && ns <= minNanosAtMinSeconds) {
+ *d = Duration(math.MinInt64)
+ } else {
+ *d = Duration(sec*int64(time.Second) + ns)
+ }
+ return nil
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index fbee581b8..98f80e3fa 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -453,7 +453,7 @@ func (ht *serverHandlerTransport) IncrMsgSent() {}
func (ht *serverHandlerTransport) IncrMsgRecv() {}
-func (ht *serverHandlerTransport) Drain() {
+func (ht *serverHandlerTransport) Drain(debugData string) {
panic("Drain() is not implemented")
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 5216998a8..326bf0848 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -1337,7 +1337,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
-// It expects a lock on transport's mutext to be held by
+// It expects a lock on transport's mutex to be held by
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayNoReason
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 4b406b8cb..f96064012 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -238,7 +238,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
kp.Timeout = defaultServerKeepaliveTimeout
}
if kp.Time != infinity {
- if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
+ if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
}
@@ -1166,12 +1166,12 @@ func (t *http2Server) keepalive() {
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
- t.Drain()
+ t.Drain("max_idle")
return
}
idleTimer.Reset(val)
case <-ageTimer.C:
- t.Drain()
+ t.Drain("max_age")
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
@@ -1318,14 +1318,14 @@ func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
}
-func (t *http2Server) Drain() {
+func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainEvent != nil {
return
}
t.drainEvent = grpcsync.NewEvent()
- t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
+ t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
}
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
@@ -1367,7 +1367,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
- if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
+ if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 1b7d7fabc..aa1c89659 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -726,7 +726,7 @@ type ServerTransport interface {
RemoteAddr() net.Addr
// Drain notifies the client this ServerTransport stops accepting new RPCs.
- Drain()
+ Drain(debugData string)
// IncrMsgSent increments the number of message sent through this transport.
IncrMsgSent()
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index c525dc070..02f975951 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -36,6 +36,7 @@ import (
type pickerWrapper struct {
mu sync.Mutex
done bool
+ idle bool
blockingCh chan struct{}
picker balancer.Picker
}
@@ -47,7 +48,11 @@ func newPickerWrapper() *pickerWrapper {
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
- if pw.done {
+ if pw.done || pw.idle {
+ // There is a small window where a picker update from the LB policy can
+ // race with the channel going to idle mode. If the picker is idle here,
+ // it is because the channel asked it to do so, and therefore it is sage
+ // to ignore the update from the LB policy.
pw.mu.Unlock()
return
}
@@ -63,10 +68,8 @@ func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
// - wraps the done function in the passed in result to increment the calls
// failed or calls succeeded channelz counter before invoking the actual
// done function.
-func doneChannelzWrapper(acw *acBalancerWrapper, result *balancer.PickResult) {
- acw.mu.Lock()
- ac := acw.ac
- acw.mu.Unlock()
+func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
+ ac := acbw.ac
ac.incrCallsStarted()
done := result.Done
result.Done = func(b balancer.DoneInfo) {
@@ -152,14 +155,14 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
}
- acw, ok := pickResult.SubConn.(*acBalancerWrapper)
+ acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
if !ok {
logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
continue
}
- if t := acw.getAddrConn().getReadyTransport(); t != nil {
+ if t := acbw.ac.getReadyTransport(); t != nil {
if channelz.IsOn() {
- doneChannelzWrapper(acw, &pickResult)
+ doneChannelzWrapper(acbw, &pickResult)
return t, pickResult, nil
}
return t, pickResult, nil
@@ -187,6 +190,25 @@ func (pw *pickerWrapper) close() {
close(pw.blockingCh)
}
+func (pw *pickerWrapper) enterIdleMode() {
+ pw.mu.Lock()
+ defer pw.mu.Unlock()
+ if pw.done {
+ return
+ }
+ pw.idle = true
+}
+
+func (pw *pickerWrapper) exitIdleMode() {
+ pw.mu.Lock()
+ defer pw.mu.Unlock()
+ if pw.done {
+ return
+ }
+ pw.blockingCh = make(chan struct{})
+ pw.idle = false
+}
+
// dropError is a wrapper error that indicates the LB policy wishes to drop the
// RPC and not retry it.
type dropError struct {
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index fc91b4d26..abe266b02 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -19,11 +19,15 @@
package grpc
import (
+ "encoding/json"
"errors"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/internal/envconfig"
+ "google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/serviceconfig"
)
// PickFirstBalancerName is the name of the pick_first balancer.
@@ -43,10 +47,28 @@ func (*pickfirstBuilder) Name() string {
return PickFirstBalancerName
}
+type pfConfig struct {
+ serviceconfig.LoadBalancingConfig `json:"-"`
+
+ // If set to true, instructs the LB policy to shuffle the order of the list
+ // of addresses received from the name resolver before attempting to
+ // connect to them.
+ ShuffleAddressList bool `json:"shuffleAddressList"`
+}
+
+func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+ cfg := &pfConfig{}
+ if err := json.Unmarshal(js, cfg); err != nil {
+ return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
+ }
+ return cfg, nil
+}
+
type pickfirstBalancer struct {
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
+ cfg *pfConfig
}
func (b *pickfirstBalancer) ResolverError(err error) {
@@ -69,7 +91,8 @@ func (b *pickfirstBalancer) ResolverError(err error) {
}
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
- if len(state.ResolverState.Addresses) == 0 {
+ addrs := state.ResolverState.Addresses
+ if len(addrs) == 0 {
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
if b.subConn != nil {
@@ -82,12 +105,23 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return balancer.ErrBadResolverState
}
+ if state.BalancerConfig != nil {
+ cfg, ok := state.BalancerConfig.(*pfConfig)
+ if !ok {
+ return fmt.Errorf("pickfirstBalancer: received nil or illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
+ }
+ b.cfg = cfg
+ }
+
+ if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList {
+ grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
+ }
if b.subConn != nil {
- b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
+ b.cc.UpdateAddresses(b.subConn, addrs)
return nil
}
- subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
+ subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
@@ -119,7 +153,6 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
}
return
}
- b.state = state.ConnectivityState
if state.ConnectivityState == connectivity.Shutdown {
b.subConn = nil
return
@@ -132,11 +165,21 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
})
case connectivity.Connecting:
+ if b.state == connectivity.TransientFailure {
+ // We stay in TransientFailure until we are Ready. See A62.
+ return
+ }
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.Idle:
+ if b.state == connectivity.TransientFailure {
+ // We stay in TransientFailure until we are Ready. Also kick the
+ // subConn out of Idle into Connecting. See A62.
+ b.subConn.Connect()
+ return
+ }
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &idlePicker{subConn: subConn},
@@ -147,6 +190,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b
Picker: &picker{err: state.ConnectionError},
})
}
+ b.state = state.ConnectivityState
}
func (b *pickfirstBalancer) Close() {
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index 6215e5ef2..d8db6f5d3 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -22,13 +22,13 @@ package resolver
import (
"context"
+ "fmt"
"net"
"net/url"
"strings"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
- "google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)
@@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes
// BalancerAttributes contains arbitrary data about this address intended
- // for consumption by the LB policy. These attribes do not affect SubConn
+ // for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes
@@ -142,6 +142,10 @@ type Address struct {
// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
+//
+// This method compares all fields of the address. When used to tell apart
+// addresses during subchannel creation or connection establishment, it might be
+// more appropriate for the caller to implement custom equality logic.
func (a Address) Equal(o Address) bool {
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
a.Attributes.Equal(o.Attributes) &&
@@ -151,7 +155,17 @@ func (a Address) Equal(o Address) bool {
// String returns JSON formatted string representation of the address.
func (a Address) String() string {
- return pretty.ToJSON(a)
+ var sb strings.Builder
+ sb.WriteString(fmt.Sprintf("{Addr: %q, ", a.Addr))
+ sb.WriteString(fmt.Sprintf("ServerName: %q, ", a.ServerName))
+ if a.Attributes != nil {
+ sb.WriteString(fmt.Sprintf("Attributes: %v, ", a.Attributes.String()))
+ }
+ if a.BalancerAttributes != nil {
+ sb.WriteString(fmt.Sprintf("BalancerAttributes: %v", a.BalancerAttributes.String()))
+ }
+ sb.WriteString("}")
+ return sb.String()
}
// BuildOptions includes additional information for the builder to create
@@ -254,10 +268,6 @@ type ClientConn interface {
// - "unknown_scheme://authority/endpoint"
// Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}
type Target struct {
- // Deprecated: use URL.Scheme instead.
- Scheme string
- // Deprecated: use URL.Host instead.
- Authority string
// URL contains the parsed dial target with an optional default scheme added
// to it if the original dial target contained no scheme or contained an
// unregistered scheme. Any query params specified in the original dial
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 05a9d4e0b..b408b3688 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -19,11 +19,11 @@
package grpc
import (
+ "context"
"strings"
"sync"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
@@ -31,129 +31,192 @@ import (
"google.golang.org/grpc/serviceconfig"
)
+// resolverStateUpdater wraps the single method used by ccResolverWrapper to
+// report a state update from the actual resolver implementation.
+type resolverStateUpdater interface {
+ updateResolverState(s resolver.State, err error) error
+}
+
// ccResolverWrapper is a wrapper on top of cc for resolvers.
// It implements resolver.ClientConn interface.
type ccResolverWrapper struct {
- cc *ClientConn
- resolverMu sync.Mutex
- resolver resolver.Resolver
- done *grpcsync.Event
- curState resolver.State
+ // The following fields are initialized when the wrapper is created and are
+ // read-only afterwards, and therefore can be accessed without a mutex.
+ cc resolverStateUpdater
+ channelzID *channelz.Identifier
+ ignoreServiceConfig bool
+ opts ccResolverWrapperOpts
+ serializer *grpcsync.CallbackSerializer // To serialize all incoming calls.
+ serializerCancel context.CancelFunc // To close the serializer, accessed only from close().
+
+ // All incoming (resolver --> gRPC) calls are guaranteed to execute in a
+ // mutually exclusive manner as they are scheduled on the serializer.
+ // Fields accessed *only* in these serializer callbacks, can therefore be
+ // accessed without a mutex.
+ curState resolver.State
+
+ // mu guards access to the below fields.
+ mu sync.Mutex
+ closed bool
+ resolver resolver.Resolver // Accessed only from outgoing calls.
+}
- incomingMu sync.Mutex // Synchronizes all the incoming calls.
+// ccResolverWrapperOpts wraps the arguments to be passed when creating a new
+// ccResolverWrapper.
+type ccResolverWrapperOpts struct {
+ target resolver.Target // User specified dial target to resolve.
+ builder resolver.Builder // Resolver builder to use.
+ bOpts resolver.BuildOptions // Resolver build options to use.
+ channelzID *channelz.Identifier // Channelz identifier for the channel.
}
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
-func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
+func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) {
+ ctx, cancel := context.WithCancel(context.Background())
ccr := &ccResolverWrapper{
- cc: cc,
- done: grpcsync.NewEvent(),
- }
-
- var credsClone credentials.TransportCredentials
- if creds := cc.dopts.copts.TransportCredentials; creds != nil {
- credsClone = creds.Clone()
- }
- rbo := resolver.BuildOptions{
- DisableServiceConfig: cc.dopts.disableServiceConfig,
- DialCreds: credsClone,
- CredsBundle: cc.dopts.copts.CredsBundle,
- Dialer: cc.dopts.copts.Dialer,
- }
-
- var err error
- // We need to hold the lock here while we assign to the ccr.resolver field
- // to guard against a data race caused by the following code path,
- // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
- // accessing ccr.resolver which is being assigned here.
- ccr.resolverMu.Lock()
- defer ccr.resolverMu.Unlock()
- ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
+ cc: cc,
+ channelzID: opts.channelzID,
+ ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
+ opts: opts,
+ serializer: grpcsync.NewCallbackSerializer(ctx),
+ serializerCancel: cancel,
+ }
+
+ // Cannot hold the lock at build time because the resolver can send an
+ // update or error inline and these incoming calls grab the lock to schedule
+ // a callback in the serializer.
+ r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
if err != nil {
+ cancel()
return nil, err
}
+
+ // Any error reported by the resolver at build time that leads to a
+ // re-resolution request from the balancer is dropped by grpc until we
+ // return from this function. So, we don't have to handle pending resolveNow
+ // requests here.
+ ccr.mu.Lock()
+ ccr.resolver = r
+ ccr.mu.Unlock()
+
return ccr, nil
}
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
- ccr.resolverMu.Lock()
- if !ccr.done.HasFired() {
- ccr.resolver.ResolveNow(o)
+ ccr.mu.Lock()
+ defer ccr.mu.Unlock()
+
+ // ccr.resolver field is set only after the call to Build() returns. But in
+ // the process of building, the resolver may send an error update which when
+ // propagated to the balancer may result in a re-resolution request.
+ if ccr.closed || ccr.resolver == nil {
+ return
}
- ccr.resolverMu.Unlock()
+ ccr.resolver.ResolveNow(o)
}
func (ccr *ccResolverWrapper) close() {
- ccr.resolverMu.Lock()
- ccr.resolver.Close()
- ccr.done.Fire()
- ccr.resolverMu.Unlock()
+ ccr.mu.Lock()
+ if ccr.closed {
+ ccr.mu.Unlock()
+ return
+ }
+
+ channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
+
+ // Close the serializer to ensure that no more calls from the resolver are
+ // handled, before actually closing the resolver.
+ ccr.serializerCancel()
+ ccr.closed = true
+ r := ccr.resolver
+ ccr.mu.Unlock()
+
+ // Give enqueued callbacks a chance to finish.
+ <-ccr.serializer.Done
+
+ // Spawn a goroutine to close the resolver (since it may block trying to
+ // cleanup all allocated resources) and return early.
+ go r.Close()
+}
+
+// serializerScheduleLocked is a convenience method to schedule a function to be
+// run on the serializer while holding ccr.mu.
+func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
+ ccr.mu.Lock()
+ ccr.serializer.Schedule(f)
+ ccr.mu.Unlock()
}
+// UpdateState is called by resolver implementations to report new state to gRPC
+// which includes addresses and service config.
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
+ errCh := make(chan error, 1)
+ ok := ccr.serializer.Schedule(func(context.Context) {
+ ccr.addChannelzTraceEvent(s)
+ ccr.curState = s
+ if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
+ errCh <- balancer.ErrBadResolverState
+ return
+ }
+ errCh <- nil
+ })
+ if !ok {
+ // The only time when Schedule() fail to add the callback to the
+ // serializer is when the serializer is closed, and this happens only
+ // when the resolver wrapper is closed.
return nil
}
- ccr.addChannelzTraceEvent(s)
- ccr.curState = s
- if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
- return balancer.ErrBadResolverState
- }
- return nil
+ return <-errCh
}
+// ReportError is called by resolver implementations to report errors
+// encountered during name resolution to gRPC.
func (ccr *ccResolverWrapper) ReportError(err error) {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
- return
- }
- channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
- ccr.cc.updateResolverState(resolver.State{}, err)
+ ccr.serializerScheduleLocked(func(_ context.Context) {
+ channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
+ ccr.cc.updateResolverState(resolver.State{}, err)
+ })
}
-// NewAddress is called by the resolver implementation to send addresses to gRPC.
+// NewAddress is called by the resolver implementation to send addresses to
+// gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
- return
- }
- ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
- ccr.curState.Addresses = addrs
- ccr.cc.updateResolverState(ccr.curState, nil)
+ ccr.serializerScheduleLocked(func(_ context.Context) {
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
+ ccr.curState.Addresses = addrs
+ ccr.cc.updateResolverState(ccr.curState, nil)
+ })
}
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
- return
- }
- channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc)
- if ccr.cc.dopts.disableServiceConfig {
- channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
- return
- }
- scpr := parseServiceConfig(sc)
- if scpr.Err != nil {
- channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
- return
- }
- ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
- ccr.curState.ServiceConfig = scpr
- ccr.cc.updateResolverState(ccr.curState, nil)
+ ccr.serializerScheduleLocked(func(_ context.Context) {
+ channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
+ if ccr.ignoreServiceConfig {
+ channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")
+ return
+ }
+ scpr := parseServiceConfig(sc)
+ if scpr.Err != nil {
+ channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
+ return
+ }
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
+ ccr.curState.ServiceConfig = scpr
+ ccr.cc.updateResolverState(ccr.curState, nil)
+ })
}
+// ParseServiceConfig is called by resolver implementations to parse a JSON
+// representation of the service config.
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
return parseServiceConfig(scJSON)
}
+// addChannelzTraceEvent adds a channelz trace event containing the new
+// state received from resolver implementations.
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
var updates []string
var oldSC, newSC *ServiceConfig
@@ -172,5 +235,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
}
- channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
+ channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 2030736a3..a844d28f4 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -577,6 +577,9 @@ type parser struct {
// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte
+
+ // recvBufferPool is the pool of shared receive buffers.
+ recvBufferPool SharedBufferPool
}
// recvMsg reads a complete gRPC message from the stream.
@@ -610,9 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
- // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
- // of making it for each message:
- msg = make([]byte, int(length))
+ msg = p.recvBufferPool.Get(int(length))
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
@@ -726,12 +727,12 @@ type payloadInfo struct {
}
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
- pf, d, err := p.recvMsg(maxReceiveMessageSize)
+ pf, buf, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err
}
if payInfo != nil {
- payInfo.compressedLength = len(d)
+ payInfo.compressedLength = len(buf)
}
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
@@ -743,10 +744,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
- d, err = dc.Do(bytes.NewReader(d))
- size = len(d)
+ buf, err = dc.Do(bytes.NewReader(buf))
+ size = len(buf)
} else {
- d, size, err = decompress(compressor, d, maxReceiveMessageSize)
+ buf, size, err = decompress(compressor, buf, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
@@ -757,7 +758,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
}
- return d, nil
+ return buf, nil
}
// Using compressor, decompress d, returning data and size.
@@ -792,15 +793,17 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
- d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
+ buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
if err != nil {
return err
}
- if err := c.Unmarshal(d, m); err != nil {
+ if err := c.Unmarshal(buf, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
}
if payInfo != nil {
- payInfo.uncompressedBytes = d
+ payInfo.uncompressedBytes = buf
+ } else {
+ p.recvBufferPool.Put(&buf)
}
return nil
}
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 76d152a69..e076ec714 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -174,6 +174,7 @@ type serverOptions struct {
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
+ recvBufferPool SharedBufferPool
}
var defaultServerOptions = serverOptions{
@@ -182,6 +183,7 @@ var defaultServerOptions = serverOptions{
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
+ recvBufferPool: nopBufferPool{},
}
var globalServerOptions []ServerOption
@@ -552,6 +554,27 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}
+// RecvBufferPool returns a ServerOption that configures the server
+// to use the provided shared buffer pool for parsing incoming messages. Depending
+// on the application's workload, this could result in reduced memory allocation.
+//
+// If you are unsure about how to implement a memory pool but want to utilize one,
+// begin with grpc.NewSharedBufferPool.
+//
+// Note: The shared buffer pool feature will not be active if any of the following
+// options are used: StatsHandler, EnableTracing, or binary logging. In such
+// cases, the shared buffer pool will be ignored.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.recvBufferPool = bufferPool
+ })
+}
+
// serverWorkerResetThreshold defines how often the stack must be reset. Every
// N requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
@@ -895,7 +918,7 @@ func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
- st.Drain()
+ st.Drain("")
}
s.mu.Unlock()
}
@@ -1046,7 +1069,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
- st.Drain()
+ st.Drain("")
}
if s.conns[addr] == nil {
@@ -1296,7 +1319,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
- d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
+ d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
@@ -1506,7 +1529,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ctx: ctx,
t: t,
s: stream,
- p: &parser{r: stream},
+ p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
@@ -1856,7 +1879,7 @@ func (s *Server) GracefulStop() {
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
- st.Drain()
+ st.Drain("graceful_stop")
}
}
s.drain = true
diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go
index f22acace4..0df11fc09 100644
--- a/vendor/google.golang.org/grpc/service_config.go
+++ b/vendor/google.golang.org/grpc/service_config.go
@@ -23,8 +23,6 @@ import (
"errors"
"fmt"
"reflect"
- "strconv"
- "strings"
"time"
"google.golang.org/grpc/codes"
@@ -106,8 +104,8 @@ type healthCheckConfig struct {
type jsonRetryPolicy struct {
MaxAttempts int
- InitialBackoff string
- MaxBackoff string
+ InitialBackoff internalserviceconfig.Duration
+ MaxBackoff internalserviceconfig.Duration
BackoffMultiplier float64
RetryableStatusCodes []codes.Code
}
@@ -129,50 +127,6 @@ type retryThrottlingPolicy struct {
TokenRatio float64
}
-func parseDuration(s *string) (*time.Duration, error) {
- if s == nil {
- return nil, nil
- }
- if !strings.HasSuffix(*s, "s") {
- return nil, fmt.Errorf("malformed duration %q", *s)
- }
- ss := strings.SplitN((*s)[:len(*s)-1], ".", 3)
- if len(ss) > 2 {
- return nil, fmt.Errorf("malformed duration %q", *s)
- }
- // hasDigits is set if either the whole or fractional part of the number is
- // present, since both are optional but one is required.
- hasDigits := false
- var d time.Duration
- if len(ss[0]) > 0 {
- i, err := strconv.ParseInt(ss[0], 10, 32)
- if err != nil {
- return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
- }
- d = time.Duration(i) * time.Second
- hasDigits = true
- }
- if len(ss) == 2 && len(ss[1]) > 0 {
- if len(ss[1]) > 9 {
- return nil, fmt.Errorf("malformed duration %q", *s)
- }
- f, err := strconv.ParseInt(ss[1], 10, 64)
- if err != nil {
- return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
- }
- for i := 9; i > len(ss[1]); i-- {
- f *= 10
- }
- d += time.Duration(f)
- hasDigits = true
- }
- if !hasDigits {
- return nil, fmt.Errorf("malformed duration %q", *s)
- }
-
- return &d, nil
-}
-
type jsonName struct {
Service string
Method string
@@ -201,7 +155,7 @@ func (j jsonName) generatePath() (string, error) {
type jsonMC struct {
Name *[]jsonName
WaitForReady *bool
- Timeout *string
+ Timeout *internalserviceconfig.Duration
MaxRequestMessageBytes *int64
MaxResponseMessageBytes *int64
RetryPolicy *jsonRetryPolicy
@@ -252,15 +206,10 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
if m.Name == nil {
continue
}
- d, err := parseDuration(m.Timeout)
- if err != nil {
- logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
- return &serviceconfig.ParseResult{Err: err}
- }
mc := MethodConfig{
WaitForReady: m.WaitForReady,
- Timeout: d,
+ Timeout: (*time.Duration)(m.Timeout),
}
if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
@@ -312,18 +261,10 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol
if jrp == nil {
return nil, nil
}
- ib, err := parseDuration(&jrp.InitialBackoff)
- if err != nil {
- return nil, err
- }
- mb, err := parseDuration(&jrp.MaxBackoff)
- if err != nil {
- return nil, err
- }
if jrp.MaxAttempts <= 1 ||
- *ib <= 0 ||
- *mb <= 0 ||
+ jrp.InitialBackoff <= 0 ||
+ jrp.MaxBackoff <= 0 ||
jrp.BackoffMultiplier <= 0 ||
len(jrp.RetryableStatusCodes) == 0 {
logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
@@ -332,8 +273,8 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol
rp := &internalserviceconfig.RetryPolicy{
MaxAttempts: jrp.MaxAttempts,
- InitialBackoff: *ib,
- MaxBackoff: *mb,
+ InitialBackoff: time.Duration(jrp.InitialBackoff),
+ MaxBackoff: time.Duration(jrp.MaxBackoff),
BackoffMultiplier: jrp.BackoffMultiplier,
RetryableStatusCodes: make(map[codes.Code]bool),
}
diff --git a/vendor/google.golang.org/grpc/shared_buffer_pool.go b/vendor/google.golang.org/grpc/shared_buffer_pool.go
new file mode 100644
index 000000000..c3a5a9ac1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/shared_buffer_pool.go
@@ -0,0 +1,154 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import "sync"
+
+// SharedBufferPool is a pool of buffers that can be shared, resulting in
+// decreased memory allocation. Currently, in gRPC-go, it is only utilized
+// for parsing incoming messages.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+type SharedBufferPool interface {
+ // Get returns a buffer with specified length from the pool.
+ //
+ // The returned byte slice may be not zero initialized.
+ Get(length int) []byte
+
+ // Put returns a buffer to the pool.
+ Put(*[]byte)
+}
+
+// NewSharedBufferPool creates a simple SharedBufferPool with buckets
+// of different sizes to optimize memory usage. This prevents the pool from
+// wasting large amounts of memory, even when handling messages of varying sizes.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func NewSharedBufferPool() SharedBufferPool {
+ return &simpleSharedBufferPool{
+ pools: [poolArraySize]simpleSharedBufferChildPool{
+ newBytesPool(level0PoolMaxSize),
+ newBytesPool(level1PoolMaxSize),
+ newBytesPool(level2PoolMaxSize),
+ newBytesPool(level3PoolMaxSize),
+ newBytesPool(level4PoolMaxSize),
+ newBytesPool(0),
+ },
+ }
+}
+
+// simpleSharedBufferPool is a simple implementation of SharedBufferPool.
+type simpleSharedBufferPool struct {
+ pools [poolArraySize]simpleSharedBufferChildPool
+}
+
+func (p *simpleSharedBufferPool) Get(size int) []byte {
+ return p.pools[p.poolIdx(size)].Get(size)
+}
+
+func (p *simpleSharedBufferPool) Put(bs *[]byte) {
+ p.pools[p.poolIdx(cap(*bs))].Put(bs)
+}
+
+func (p *simpleSharedBufferPool) poolIdx(size int) int {
+ switch {
+ case size <= level0PoolMaxSize:
+ return level0PoolIdx
+ case size <= level1PoolMaxSize:
+ return level1PoolIdx
+ case size <= level2PoolMaxSize:
+ return level2PoolIdx
+ case size <= level3PoolMaxSize:
+ return level3PoolIdx
+ case size <= level4PoolMaxSize:
+ return level4PoolIdx
+ default:
+ return levelMaxPoolIdx
+ }
+}
+
+const (
+ level0PoolMaxSize = 16 // 16 B
+ level1PoolMaxSize = level0PoolMaxSize * 16 // 256 B
+ level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB
+ level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB
+ level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB
+)
+
+const (
+ level0PoolIdx = iota
+ level1PoolIdx
+ level2PoolIdx
+ level3PoolIdx
+ level4PoolIdx
+ levelMaxPoolIdx
+ poolArraySize
+)
+
+type simpleSharedBufferChildPool interface {
+ Get(size int) []byte
+ Put(interface{})
+}
+
+type bufferPool struct {
+ sync.Pool
+
+ defaultSize int
+}
+
+func (p *bufferPool) Get(size int) []byte {
+ bs := p.Pool.Get().(*[]byte)
+
+ if cap(*bs) < size {
+ p.Pool.Put(bs)
+
+ return make([]byte, size)
+ }
+
+ return (*bs)[:size]
+}
+
+func newBytesPool(size int) simpleSharedBufferChildPool {
+ return &bufferPool{
+ Pool: sync.Pool{
+ New: func() interface{} {
+ bs := make([]byte, size)
+ return &bs
+ },
+ },
+ defaultSize: size,
+ }
+}
+
+// nopBufferPool is a buffer pool just makes new buffer without pooling.
+type nopBufferPool struct {
+}
+
+func (nopBufferPool) Get(length int) []byte {
+ return make([]byte, length)
+}
+
+func (nopBufferPool) Put(*[]byte) {
+}
diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go
index 53910fb7c..bcf2e4d81 100644
--- a/vendor/google.golang.org/grpc/status/status.go
+++ b/vendor/google.golang.org/grpc/status/status.go
@@ -77,11 +77,18 @@ func FromProto(s *spb.Status) *Status {
// FromError returns a Status representation of err.
//
// - If err was produced by this package or implements the method `GRPCStatus()
-// *Status`, or if err wraps a type satisfying this, the appropriate Status is
-// returned. For wrapped errors, the message returned contains the entire
-// err.Error() text and not just the wrapped status.
+// *Status` and `GRPCStatus()` does not return nil, or if err wraps a type
+// satisfying this, the Status from `GRPCStatus()` is returned. For wrapped
+// errors, the message returned contains the entire err.Error() text and not
+// just the wrapped status. In that case, ok is true.
//
-// - If err is nil, a Status is returned with codes.OK and no message.
+// - If err is nil, a Status is returned with codes.OK and no message, and ok
+// is true.
+//
+// - If err implements the method `GRPCStatus() *Status` and `GRPCStatus()`
+// returns nil (which maps to Codes.OK), or if err wraps a type
+// satisfying this, a Status is returned with codes.Unknown and err's
+// Error() message, and ok is false.
//
// - Otherwise, err is an error not compatible with this package. In this
// case, a Status is returned with codes.Unknown and err's Error() message,
@@ -92,10 +99,24 @@ func FromError(err error) (s *Status, ok bool) {
}
type grpcstatus interface{ GRPCStatus() *Status }
if gs, ok := err.(grpcstatus); ok {
+ if gs.GRPCStatus() == nil {
+ // Error has status nil, which maps to codes.OK. There
+ // is no sensible behavior for this, so we turn it into
+ // an error with codes.Unknown and discard the existing
+ // status.
+ return New(codes.Unknown, err.Error()), false
+ }
return gs.GRPCStatus(), true
}
var gs grpcstatus
if errors.As(err, &gs) {
+ if gs.GRPCStatus() == nil {
+ // Error wraps an error that has status nil, which maps
+ // to codes.OK. There is no sensible behavior for this,
+ // so we turn it into an error with codes.Unknown and
+ // discard the existing status.
+ return New(codes.Unknown, err.Error()), false
+ }
p := gs.GRPCStatus().Proto()
p.Message = err.Error()
return status.FromProto(p), true
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index d1226a412..de32a7597 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -123,6 +123,9 @@ type ClientStream interface {
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
+ //
+ // It is not safe to modify the message after calling SendMsg. Tracing
+ // libraries and stats handlers may use the message lazily.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
@@ -152,6 +155,11 @@ type ClientStream interface {
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
+ if err := cc.idlenessMgr.onCallBegin(); err != nil {
+ return nil, err
+ }
+ defer cc.idlenessMgr.onCallEnd()
+
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
@@ -469,7 +477,7 @@ func (a *csAttempt) newStream() error {
// It is safe to overwrite the csAttempt's context here, since all state
// maintained in it are local to the attempt. When the attempt has to be
// retried, a new instance of csAttempt will be created.
- if a.pickResult.Metatada != nil {
+ if a.pickResult.Metadata != nil {
// We currently do not have a function it the metadata package which
// merges given metadata with existing metadata in a context. Existing
// function `AppendToOutgoingContext()` takes a variadic argument of key
@@ -479,7 +487,7 @@ func (a *csAttempt) newStream() error {
// in a form passable to AppendToOutgoingContext(), or create a version
// of AppendToOutgoingContext() that accepts a metadata.MD.
md, _ := metadata.FromOutgoingContext(a.ctx)
- md = metadata.Join(md, a.pickResult.Metatada)
+ md = metadata.Join(md, a.pickResult.Metadata)
a.ctx = metadata.NewOutgoingContext(a.ctx, md)
}
@@ -499,7 +507,7 @@ func (a *csAttempt) newStream() error {
return toRPCErr(nse.Err)
}
a.s = s
- a.p = &parser{r: s}
+ a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool}
return nil
}
@@ -1262,17 +1270,22 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
return nil, err
}
as.s = s
- as.p = &parser{r: s}
+ as.p = &parser{r: s, recvBufferPool: ac.dopts.recvBufferPool}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
- // Listen on cc and stream contexts to cleanup when the user closes the
- // ClientConn or cancels the stream context. In all other cases, an error
- // should already be injected into the recv buffer by the transport, which
- // the client will eventually receive, and then we will cancel the stream's
- // context in clientStream.finish.
+ // Listen on stream context to cleanup when the stream context is
+ // canceled. Also listen for the addrConn's context in case the
+ // addrConn is closed or reconnects to a different address. In all
+ // other cases, an error should already be injected into the recv
+ // buffer by the transport, which the client will eventually receive,
+ // and then we will cancel the stream's context in
+ // addrConnStream.finish.
go func() {
+ ac.mu.Lock()
+ acCtx := ac.ctx
+ ac.mu.Unlock()
select {
- case <-ac.ctx.Done():
+ case <-acCtx.Done():
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-ctx.Done():
as.finish(toRPCErr(ctx.Err()))
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 853ce0e30..353cfd528 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.55.0"
+const Version = "1.57.0"
diff --git a/vendor/google.golang.org/protobuf/types/known/structpb/struct.pb.go b/vendor/google.golang.org/protobuf/types/known/structpb/struct.pb.go
new file mode 100644
index 000000000..d2bac8b88
--- /dev/null
+++ b/vendor/google.golang.org/protobuf/types/known/structpb/struct.pb.go
@@ -0,0 +1,810 @@
+// Protocol Buffers - Google's data interchange format
+// Copyright 2008 Google Inc. All rights reserved.
+// https://developers.google.com/protocol-buffers/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: google/protobuf/struct.proto
+
+// Package structpb contains generated types for google/protobuf/struct.proto.
+//
+// The messages (i.e., Value, Struct, and ListValue) defined in struct.proto are
+// used to represent arbitrary JSON. The Value message represents a JSON value,
+// the Struct message represents a JSON object, and the ListValue message
+// represents a JSON array. See https://json.org for more information.
+//
+// The Value, Struct, and ListValue types have generated MarshalJSON and
+// UnmarshalJSON methods such that they serialize JSON equivalent to what the
+// messages themselves represent. Use of these types with the
+// "google.golang.org/protobuf/encoding/protojson" package
+// ensures that they will be serialized as their JSON equivalent.
+//
+// # Conversion to and from a Go interface
+//
+// The standard Go "encoding/json" package has functionality to serialize
+// arbitrary types to a large degree. The Value.AsInterface, Struct.AsMap, and
+// ListValue.AsSlice methods can convert the protobuf message representation into
+// a form represented by interface{}, map[string]interface{}, and []interface{}.
+// This form can be used with other packages that operate on such data structures
+// and also directly with the standard json package.
+//
+// In order to convert the interface{}, map[string]interface{}, and []interface{}
+// forms back as Value, Struct, and ListValue messages, use the NewStruct,
+// NewList, and NewValue constructor functions.
+//
+// # Example usage
+//
+// Consider the following example JSON object:
+//
+// {
+// "firstName": "John",
+// "lastName": "Smith",
+// "isAlive": true,
+// "age": 27,
+// "address": {
+// "streetAddress": "21 2nd Street",
+// "city": "New York",
+// "state": "NY",
+// "postalCode": "10021-3100"
+// },
+// "phoneNumbers": [
+// {
+// "type": "home",
+// "number": "212 555-1234"
+// },
+// {
+// "type": "office",
+// "number": "646 555-4567"
+// }
+// ],
+// "children": [],
+// "spouse": null
+// }
+//
+// To construct a Value message representing the above JSON object:
+//
+// m, err := structpb.NewValue(map[string]interface{}{
+// "firstName": "John",
+// "lastName": "Smith",
+// "isAlive": true,
+// "age": 27,
+// "address": map[string]interface{}{
+// "streetAddress": "21 2nd Street",
+// "city": "New York",
+// "state": "NY",
+// "postalCode": "10021-3100",
+// },
+// "phoneNumbers": []interface{}{
+// map[string]interface{}{
+// "type": "home",
+// "number": "212 555-1234",
+// },
+// map[string]interface{}{
+// "type": "office",
+// "number": "646 555-4567",
+// },
+// },
+// "children": []interface{}{},
+// "spouse": nil,
+// })
+// if err != nil {
+// ... // handle error
+// }
+// ... // make use of m as a *structpb.Value
+package structpb
+
+import (
+ base64 "encoding/base64"
+ protojson "google.golang.org/protobuf/encoding/protojson"
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ math "math"
+ reflect "reflect"
+ sync "sync"
+ utf8 "unicode/utf8"
+)
+
+// `NullValue` is a singleton enumeration to represent the null value for the
+// `Value` type union.
+//
+// The JSON representation for `NullValue` is JSON `null`.
+type NullValue int32
+
+const (
+ // Null value.
+ NullValue_NULL_VALUE NullValue = 0
+)
+
+// Enum value maps for NullValue.
+var (
+ NullValue_name = map[int32]string{
+ 0: "NULL_VALUE",
+ }
+ NullValue_value = map[string]int32{
+ "NULL_VALUE": 0,
+ }
+)
+
+func (x NullValue) Enum() *NullValue {
+ p := new(NullValue)
+ *p = x
+ return p
+}
+
+func (x NullValue) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (NullValue) Descriptor() protoreflect.EnumDescriptor {
+ return file_google_protobuf_struct_proto_enumTypes[0].Descriptor()
+}
+
+func (NullValue) Type() protoreflect.EnumType {
+ return &file_google_protobuf_struct_proto_enumTypes[0]
+}
+
+func (x NullValue) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use NullValue.Descriptor instead.
+func (NullValue) EnumDescriptor() ([]byte, []int) {
+ return file_google_protobuf_struct_proto_rawDescGZIP(), []int{0}
+}
+
+// `Struct` represents a structured data value, consisting of fields
+// which map to dynamically typed values. In some languages, `Struct`
+// might be supported by a native representation. For example, in
+// scripting languages like JS a struct is represented as an
+// object. The details of that representation are described together
+// with the proto support for the language.
+//
+// The JSON representation for `Struct` is JSON object.
+type Struct struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // Unordered map of dynamically typed values.
+ Fields map[string]*Value `protobuf:"bytes,1,rep,name=fields,proto3" json:"fields,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+}
+
+// NewStruct constructs a Struct from a general-purpose Go map.
+// The map keys must be valid UTF-8.
+// The map values are converted using NewValue.
+func NewStruct(v map[string]interface{}) (*Struct, error) {
+ x := &Struct{Fields: make(map[string]*Value, len(v))}
+ for k, v := range v {
+ if !utf8.ValidString(k) {
+ return nil, protoimpl.X.NewError("invalid UTF-8 in string: %q", k)
+ }
+ var err error
+ x.Fields[k], err = NewValue(v)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return x, nil
+}
+
+// AsMap converts x to a general-purpose Go map.
+// The map values are converted by calling Value.AsInterface.
+func (x *Struct) AsMap() map[string]interface{} {
+ f := x.GetFields()
+ vs := make(map[string]interface{}, len(f))
+ for k, v := range f {
+ vs[k] = v.AsInterface()
+ }
+ return vs
+}
+
+func (x *Struct) MarshalJSON() ([]byte, error) {
+ return protojson.Marshal(x)
+}
+
+func (x *Struct) UnmarshalJSON(b []byte) error {
+ return protojson.Unmarshal(b, x)
+}
+
+func (x *Struct) Reset() {
+ *x = Struct{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_google_protobuf_struct_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Struct) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Struct) ProtoMessage() {}
+
+func (x *Struct) ProtoReflect() protoreflect.Message {
+ mi := &file_google_protobuf_struct_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Struct.ProtoReflect.Descriptor instead.
+func (*Struct) Descriptor() ([]byte, []int) {
+ return file_google_protobuf_struct_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Struct) GetFields() map[string]*Value {
+ if x != nil {
+ return x.Fields
+ }
+ return nil
+}
+
+// `Value` represents a dynamically typed value which can be either
+// null, a number, a string, a boolean, a recursive struct value, or a
+// list of values. A producer of value is expected to set one of these
+// variants. Absence of any variant indicates an error.
+//
+// The JSON representation for `Value` is JSON value.
+type Value struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The kind of value.
+ //
+ // Types that are assignable to Kind:
+ //
+ // *Value_NullValue
+ // *Value_NumberValue
+ // *Value_StringValue
+ // *Value_BoolValue
+ // *Value_StructValue
+ // *Value_ListValue
+ Kind isValue_Kind `protobuf_oneof:"kind"`
+}
+
+// NewValue constructs a Value from a general-purpose Go interface.
+//
+// ╔════════════════════════╤════════════════════════════════════════════╗
+// ║ Go type │ Conversion ║
+// ╠════════════════════════╪════════════════════════════════════════════╣
+// ║ nil │ stored as NullValue ║
+// ║ bool │ stored as BoolValue ║
+// ║ int, int32, int64 │ stored as NumberValue ║
+// ║ uint, uint32, uint64 │ stored as NumberValue ║
+// ║ float32, float64 │ stored as NumberValue ║
+// ║ string │ stored as StringValue; must be valid UTF-8 ║
+// ║ []byte │ stored as StringValue; base64-encoded ║
+// ║ map[string]interface{} │ stored as StructValue ║
+// ║ []interface{} │ stored as ListValue ║
+// ╚════════════════════════╧════════════════════════════════════════════╝
+//
+// When converting an int64 or uint64 to a NumberValue, numeric precision loss
+// is possible since they are stored as a float64.
+func NewValue(v interface{}) (*Value, error) {
+ switch v := v.(type) {
+ case nil:
+ return NewNullValue(), nil
+ case bool:
+ return NewBoolValue(v), nil
+ case int:
+ return NewNumberValue(float64(v)), nil
+ case int32:
+ return NewNumberValue(float64(v)), nil
+ case int64:
+ return NewNumberValue(float64(v)), nil
+ case uint:
+ return NewNumberValue(float64(v)), nil
+ case uint32:
+ return NewNumberValue(float64(v)), nil
+ case uint64:
+ return NewNumberValue(float64(v)), nil
+ case float32:
+ return NewNumberValue(float64(v)), nil
+ case float64:
+ return NewNumberValue(float64(v)), nil
+ case string:
+ if !utf8.ValidString(v) {
+ return nil, protoimpl.X.NewError("invalid UTF-8 in string: %q", v)
+ }
+ return NewStringValue(v), nil
+ case []byte:
+ s := base64.StdEncoding.EncodeToString(v)
+ return NewStringValue(s), nil
+ case map[string]interface{}:
+ v2, err := NewStruct(v)
+ if err != nil {
+ return nil, err
+ }
+ return NewStructValue(v2), nil
+ case []interface{}:
+ v2, err := NewList(v)
+ if err != nil {
+ return nil, err
+ }
+ return NewListValue(v2), nil
+ default:
+ return nil, protoimpl.X.NewError("invalid type: %T", v)
+ }
+}
+
+// NewNullValue constructs a new null Value.
+func NewNullValue() *Value {
+ return &Value{Kind: &Value_NullValue{NullValue: NullValue_NULL_VALUE}}
+}
+
+// NewBoolValue constructs a new boolean Value.
+func NewBoolValue(v bool) *Value {
+ return &Value{Kind: &Value_BoolValue{BoolValue: v}}
+}
+
+// NewNumberValue constructs a new number Value.
+func NewNumberValue(v float64) *Value {
+ return &Value{Kind: &Value_NumberValue{NumberValue: v}}
+}
+
+// NewStringValue constructs a new string Value.
+func NewStringValue(v string) *Value {
+ return &Value{Kind: &Value_StringValue{StringValue: v}}
+}
+
+// NewStructValue constructs a new struct Value.
+func NewStructValue(v *Struct) *Value {
+ return &Value{Kind: &Value_StructValue{StructValue: v}}
+}
+
+// NewListValue constructs a new list Value.
+func NewListValue(v *ListValue) *Value {
+ return &Value{Kind: &Value_ListValue{ListValue: v}}
+}
+
+// AsInterface converts x to a general-purpose Go interface.
+//
+// Calling Value.MarshalJSON and "encoding/json".Marshal on this output produce
+// semantically equivalent JSON (assuming no errors occur).
+//
+// Floating-point values (i.e., "NaN", "Infinity", and "-Infinity") are
+// converted as strings to remain compatible with MarshalJSON.
+func (x *Value) AsInterface() interface{} {
+ switch v := x.GetKind().(type) {
+ case *Value_NumberValue:
+ if v != nil {
+ switch {
+ case math.IsNaN(v.NumberValue):
+ return "NaN"
+ case math.IsInf(v.NumberValue, +1):
+ return "Infinity"
+ case math.IsInf(v.NumberValue, -1):
+ return "-Infinity"
+ default:
+ return v.NumberValue
+ }
+ }
+ case *Value_StringValue:
+ if v != nil {
+ return v.StringValue
+ }
+ case *Value_BoolValue:
+ if v != nil {
+ return v.BoolValue
+ }
+ case *Value_StructValue:
+ if v != nil {
+ return v.StructValue.AsMap()
+ }
+ case *Value_ListValue:
+ if v != nil {
+ return v.ListValue.AsSlice()
+ }
+ }
+ return nil
+}
+
+func (x *Value) MarshalJSON() ([]byte, error) {
+ return protojson.Marshal(x)
+}
+
+func (x *Value) UnmarshalJSON(b []byte) error {
+ return protojson.Unmarshal(b, x)
+}
+
+func (x *Value) Reset() {
+ *x = Value{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_google_protobuf_struct_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Value) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Value) ProtoMessage() {}
+
+func (x *Value) ProtoReflect() protoreflect.Message {
+ mi := &file_google_protobuf_struct_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Value.ProtoReflect.Descriptor instead.
+func (*Value) Descriptor() ([]byte, []int) {
+ return file_google_protobuf_struct_proto_rawDescGZIP(), []int{1}
+}
+
+func (m *Value) GetKind() isValue_Kind {
+ if m != nil {
+ return m.Kind
+ }
+ return nil
+}
+
+func (x *Value) GetNullValue() NullValue {
+ if x, ok := x.GetKind().(*Value_NullValue); ok {
+ return x.NullValue
+ }
+ return NullValue_NULL_VALUE
+}
+
+func (x *Value) GetNumberValue() float64 {
+ if x, ok := x.GetKind().(*Value_NumberValue); ok {
+ return x.NumberValue
+ }
+ return 0
+}
+
+func (x *Value) GetStringValue() string {
+ if x, ok := x.GetKind().(*Value_StringValue); ok {
+ return x.StringValue
+ }
+ return ""
+}
+
+func (x *Value) GetBoolValue() bool {
+ if x, ok := x.GetKind().(*Value_BoolValue); ok {
+ return x.BoolValue
+ }
+ return false
+}
+
+func (x *Value) GetStructValue() *Struct {
+ if x, ok := x.GetKind().(*Value_StructValue); ok {
+ return x.StructValue
+ }
+ return nil
+}
+
+func (x *Value) GetListValue() *ListValue {
+ if x, ok := x.GetKind().(*Value_ListValue); ok {
+ return x.ListValue
+ }
+ return nil
+}
+
+type isValue_Kind interface {
+ isValue_Kind()
+}
+
+type Value_NullValue struct {
+ // Represents a null value.
+ NullValue NullValue `protobuf:"varint,1,opt,name=null_value,json=nullValue,proto3,enum=google.protobuf.NullValue,oneof"`
+}
+
+type Value_NumberValue struct {
+ // Represents a double value.
+ NumberValue float64 `protobuf:"fixed64,2,opt,name=number_value,json=numberValue,proto3,oneof"`
+}
+
+type Value_StringValue struct {
+ // Represents a string value.
+ StringValue string `protobuf:"bytes,3,opt,name=string_value,json=stringValue,proto3,oneof"`
+}
+
+type Value_BoolValue struct {
+ // Represents a boolean value.
+ BoolValue bool `protobuf:"varint,4,opt,name=bool_value,json=boolValue,proto3,oneof"`
+}
+
+type Value_StructValue struct {
+ // Represents a structured value.
+ StructValue *Struct `protobuf:"bytes,5,opt,name=struct_value,json=structValue,proto3,oneof"`
+}
+
+type Value_ListValue struct {
+ // Represents a repeated `Value`.
+ ListValue *ListValue `protobuf:"bytes,6,opt,name=list_value,json=listValue,proto3,oneof"`
+}
+
+func (*Value_NullValue) isValue_Kind() {}
+
+func (*Value_NumberValue) isValue_Kind() {}
+
+func (*Value_StringValue) isValue_Kind() {}
+
+func (*Value_BoolValue) isValue_Kind() {}
+
+func (*Value_StructValue) isValue_Kind() {}
+
+func (*Value_ListValue) isValue_Kind() {}
+
+// `ListValue` is a wrapper around a repeated field of values.
+//
+// The JSON representation for `ListValue` is JSON array.
+type ListValue struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // Repeated field of dynamically typed values.
+ Values []*Value `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"`
+}
+
+// NewList constructs a ListValue from a general-purpose Go slice.
+// The slice elements are converted using NewValue.
+func NewList(v []interface{}) (*ListValue, error) {
+ x := &ListValue{Values: make([]*Value, len(v))}
+ for i, v := range v {
+ var err error
+ x.Values[i], err = NewValue(v)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return x, nil
+}
+
+// AsSlice converts x to a general-purpose Go slice.
+// The slice elements are converted by calling Value.AsInterface.
+func (x *ListValue) AsSlice() []interface{} {
+ vals := x.GetValues()
+ vs := make([]interface{}, len(vals))
+ for i, v := range vals {
+ vs[i] = v.AsInterface()
+ }
+ return vs
+}
+
+func (x *ListValue) MarshalJSON() ([]byte, error) {
+ return protojson.Marshal(x)
+}
+
+func (x *ListValue) UnmarshalJSON(b []byte) error {
+ return protojson.Unmarshal(b, x)
+}
+
+func (x *ListValue) Reset() {
+ *x = ListValue{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_google_protobuf_struct_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ListValue) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ListValue) ProtoMessage() {}
+
+func (x *ListValue) ProtoReflect() protoreflect.Message {
+ mi := &file_google_protobuf_struct_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ListValue.ProtoReflect.Descriptor instead.
+func (*ListValue) Descriptor() ([]byte, []int) {
+ return file_google_protobuf_struct_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *ListValue) GetValues() []*Value {
+ if x != nil {
+ return x.Values
+ }
+ return nil
+}
+
+var File_google_protobuf_struct_proto protoreflect.FileDescriptor
+
+var file_google_protobuf_struct_proto_rawDesc = []byte{
+ 0x0a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
+ 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f,
+ 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x22,
+ 0x98, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x12, 0x3b, 0x0a, 0x06, 0x66, 0x69,
+ 0x65, 0x6c, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x67, 0x6f, 0x6f,
+ 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72,
+ 0x75, 0x63, 0x74, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,
+ 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x1a, 0x51, 0x0a, 0x0b, 0x46, 0x69, 0x65, 0x6c, 0x64,
+ 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2c, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
+ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52,
+ 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xb2, 0x02, 0x0a, 0x05, 0x56,
+ 0x61, 0x6c, 0x75, 0x65, 0x12, 0x3b, 0x0a, 0x0a, 0x6e, 0x75, 0x6c, 0x6c, 0x5f, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
+ 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4e, 0x75, 0x6c, 0x6c, 0x56,
+ 0x61, 0x6c, 0x75, 0x65, 0x48, 0x00, 0x52, 0x09, 0x6e, 0x75, 0x6c, 0x6c, 0x56, 0x61, 0x6c, 0x75,
+ 0x65, 0x12, 0x23, 0x0a, 0x0c, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x5f, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x48, 0x00, 0x52, 0x0b, 0x6e, 0x75, 0x6d, 0x62, 0x65,
+ 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x23, 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67,
+ 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b,
+ 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0a, 0x62,
+ 0x6f, 0x6f, 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x48,
+ 0x00, 0x52, 0x09, 0x62, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x3c, 0x0a, 0x0c,
+ 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x73,
+ 0x74, 0x72, 0x75, 0x63, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x3b, 0x0a, 0x0a, 0x6c, 0x69,
+ 0x73, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
+ 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
+ 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x48, 0x00, 0x52, 0x09, 0x6c, 0x69,
+ 0x73, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x06, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22,
+ 0x3b, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x2e, 0x0a, 0x06,
+ 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67,
+ 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x56,
+ 0x61, 0x6c, 0x75, 0x65, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2a, 0x1b, 0x0a, 0x09,
+ 0x4e, 0x75, 0x6c, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x0a, 0x4e, 0x55, 0x4c,
+ 0x4c, 0x5f, 0x56, 0x41, 0x4c, 0x55, 0x45, 0x10, 0x00, 0x42, 0x7f, 0x0a, 0x13, 0x63, 0x6f, 0x6d,
+ 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
+ 0x42, 0x0b, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a,
+ 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2e, 0x6f,
+ 0x72, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x79, 0x70, 0x65,
+ 0x73, 0x2f, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x70, 0x62,
+ 0xf8, 0x01, 0x01, 0xa2, 0x02, 0x03, 0x47, 0x50, 0x42, 0xaa, 0x02, 0x1e, 0x47, 0x6f, 0x6f, 0x67,
+ 0x6c, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x57, 0x65, 0x6c, 0x6c,
+ 0x4b, 0x6e, 0x6f, 0x77, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x33,
+}
+
+var (
+ file_google_protobuf_struct_proto_rawDescOnce sync.Once
+ file_google_protobuf_struct_proto_rawDescData = file_google_protobuf_struct_proto_rawDesc
+)
+
+func file_google_protobuf_struct_proto_rawDescGZIP() []byte {
+ file_google_protobuf_struct_proto_rawDescOnce.Do(func() {
+ file_google_protobuf_struct_proto_rawDescData = protoimpl.X.CompressGZIP(file_google_protobuf_struct_proto_rawDescData)
+ })
+ return file_google_protobuf_struct_proto_rawDescData
+}
+
+var file_google_protobuf_struct_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_google_protobuf_struct_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_google_protobuf_struct_proto_goTypes = []interface{}{
+ (NullValue)(0), // 0: google.protobuf.NullValue
+ (*Struct)(nil), // 1: google.protobuf.Struct
+ (*Value)(nil), // 2: google.protobuf.Value
+ (*ListValue)(nil), // 3: google.protobuf.ListValue
+ nil, // 4: google.protobuf.Struct.FieldsEntry
+}
+var file_google_protobuf_struct_proto_depIdxs = []int32{
+ 4, // 0: google.protobuf.Struct.fields:type_name -> google.protobuf.Struct.FieldsEntry
+ 0, // 1: google.protobuf.Value.null_value:type_name -> google.protobuf.NullValue
+ 1, // 2: google.protobuf.Value.struct_value:type_name -> google.protobuf.Struct
+ 3, // 3: google.protobuf.Value.list_value:type_name -> google.protobuf.ListValue
+ 2, // 4: google.protobuf.ListValue.values:type_name -> google.protobuf.Value
+ 2, // 5: google.protobuf.Struct.FieldsEntry.value:type_name -> google.protobuf.Value
+ 6, // [6:6] is the sub-list for method output_type
+ 6, // [6:6] is the sub-list for method input_type
+ 6, // [6:6] is the sub-list for extension type_name
+ 6, // [6:6] is the sub-list for extension extendee
+ 0, // [0:6] is the sub-list for field type_name
+}
+
+func init() { file_google_protobuf_struct_proto_init() }
+func file_google_protobuf_struct_proto_init() {
+ if File_google_protobuf_struct_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_google_protobuf_struct_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Struct); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_google_protobuf_struct_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Value); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_google_protobuf_struct_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ListValue); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ file_google_protobuf_struct_proto_msgTypes[1].OneofWrappers = []interface{}{
+ (*Value_NullValue)(nil),
+ (*Value_NumberValue)(nil),
+ (*Value_StringValue)(nil),
+ (*Value_BoolValue)(nil),
+ (*Value_StructValue)(nil),
+ (*Value_ListValue)(nil),
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_google_protobuf_struct_proto_rawDesc,
+ NumEnums: 1,
+ NumMessages: 4,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_google_protobuf_struct_proto_goTypes,
+ DependencyIndexes: file_google_protobuf_struct_proto_depIdxs,
+ EnumInfos: file_google_protobuf_struct_proto_enumTypes,
+ MessageInfos: file_google_protobuf_struct_proto_msgTypes,
+ }.Build()
+ File_google_protobuf_struct_proto = out.File
+ file_google_protobuf_struct_proto_rawDesc = nil
+ file_google_protobuf_struct_proto_goTypes = nil
+ file_google_protobuf_struct_proto_depIdxs = nil
+}