diff options
Diffstat (limited to 'pkg/k9p')
-rw-r--r-- | pkg/k9p/logger/logger.go | 195 | ||||
-rw-r--r-- | pkg/k9p/session.go | 205 |
2 files changed, 400 insertions, 0 deletions
diff --git a/pkg/k9p/logger/logger.go b/pkg/k9p/logger/logger.go new file mode 100644 index 0000000..6b2e437 --- /dev/null +++ b/pkg/k9p/logger/logger.go @@ -0,0 +1,195 @@ +package logger + +import ( + "context" + "time" + + p9p "github.com/docker/go-p9p" + "github.com/rs/zerolog" +) + +type Logger struct { + logger zerolog.Logger + session p9p.Session +} + +func New(logger zerolog.Logger, session p9p.Session) *Logger { + return &Logger{ + logger: logger, + session: session, + } +} + +func (l *Logger) Auth(ctx context.Context, afid p9p.Fid, uname string, aname string) (qid p9p.Qid, err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "auth"). + Uint32("afid", uint32(afid)). + Str("uname", uname). + Str("aname", aname). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Str("qid", qid.String()). + Err(err)). + Msg("") + }(time.Now()) + + return l.session.Auth(ctx, afid, uname, aname) +} + +func (l *Logger) Attach(ctx context.Context, fid p9p.Fid, afid p9p.Fid, uname string, aname string) (qid p9p.Qid, err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "attach"). + Uint32("fid", uint32(fid)). + Uint32("afid", uint32(afid)). + Str("uname", uname). + Str("aname", aname). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Str("qid", qid.String()). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Attach(ctx, fid, afid, uname, aname) +} + +func (l *Logger) Clunk(ctx context.Context, fid p9p.Fid) (err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "clunk"). + Uint32("fid", uint32(fid)). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Clunk(ctx, fid) +} + +func (l *Logger) Remove(ctx context.Context, fid p9p.Fid) (err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "remove"). + Uint32("fid", uint32(fid)). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Remove(ctx, fid) +} + +func (l *Logger) Walk(ctx context.Context, fid p9p.Fid, newfid p9p.Fid, names ...string) (qids []p9p.Qid, err error) { + defer func(t1 time.Time) { + arr := zerolog.Arr() + for _, qid := range qids { + arr = arr.Str(qid.String()) + } + + l.logger.Debug(). + Str("request", "walk"). + Uint32("fid", uint32(fid)). + Uint32("newfid", uint32(newfid)). + Strs("names", names). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Array("qids", arr). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Walk(ctx, fid, newfid, names...) +} + +// Read follows the semantics of io.ReaderAt.ReadAtt method except it takes +// a contxt and Fid. +func (l *Logger) Read(ctx context.Context, fid p9p.Fid, p []byte, offset int64) (n int, err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "read"). + Uint32("fid", uint32(fid)). + Int64("offset", offset). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Int("n", n). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Read(ctx, fid, p, offset) +} + +// Write follows the semantics of io.WriterAt.WriteAt except takes a context and an Fid. +// +// If n == len(p), no error is returned. +// If n < len(p), io.ErrShortWrite will be returned. +func (l *Logger) Write(ctx context.Context, fid p9p.Fid, p []byte, offset int64) (n int, err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "write"). + Uint32("fid", uint32(fid)). + Int64("offset", offset). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Int("n", n). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Write(ctx, fid, p, offset) +} + +func (l *Logger) Open(ctx context.Context, fid p9p.Fid, mode p9p.Flag) (qid p9p.Qid, iounit uint32, err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "open"). + Uint32("fid", uint32(fid)). + Uint8("mode", uint8(mode)). + TimeDiff("duration", time.Now(), t1). + Dict("ret", zerolog.Dict(). + Str("qid", qid.String()). + Uint32("iounit", iounit). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Open(ctx, fid, mode) +} + +func (l *Logger) Create(ctx context.Context, parent p9p.Fid, name string, perm uint32, mode p9p.Flag) (p9p.Qid, uint32, error) { + l.logger.Debug(). + Str("request", "create"). + Msg("") + return l.session.Create(ctx, parent, name, perm, mode) +} + +func (l *Logger) Stat(ctx context.Context, fid p9p.Fid) (dir p9p.Dir, err error) { + defer func(t1 time.Time) { + l.logger.Debug(). + Str("request", "stat"). + Uint32("fid", uint32(fid)). + Dict("ret", zerolog.Dict(). + Dict("dir", zerolog.Dict(). + Str("name", dir.Name). + Str("qid", dir.Qid.String()). + Uint64("length", dir.Length). + Time("modTime", dir.ModTime)). + Err(err)). + Msg("") + }(time.Now()) + return l.session.Stat(ctx, fid) +} + +func (l *Logger) WStat(ctx context.Context, fid p9p.Fid, dir p9p.Dir) error { + l.logger.Debug(). + Str("request", "stat"). + Msg("") + return l.session.WStat(ctx, fid, dir) +} + +// Version returns the supported version and msize of the session. This +// can be affected by negotiating or the level of support provided by the +// session implementation. +func (l *Logger) Version() (msize int, version string) { + l.logger.Debug(). + Str("request", "stat"). + Msg("") + return l.session.Version() +} diff --git a/pkg/k9p/session.go b/pkg/k9p/session.go new file mode 100644 index 0000000..471a6bc --- /dev/null +++ b/pkg/k9p/session.go @@ -0,0 +1,205 @@ +package k9p + +import ( + "context" + "runtime" + "sync" + + "github.com/docker/go-p9p" + "go.terinstock.com/k9p/pkg/resources" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +type Session struct { + sync.Mutex + aname string + uname string + + client kubernetes.Interface + sharedInformer informers.SharedInformerFactory + refs map[p9p.Fid]resources.Ref +} + +func New(ctx context.Context, client kubernetes.Interface) *Session { + sharedInformer := informers.NewSharedInformerFactory(client, 0) + + sharedInformer.Core().V1().Namespaces().Informer() + sharedInformer.Apps().V1().Deployments().Informer() + + go sharedInformer.Start(ctx.Done()) + runtime.Gosched() + + return &Session{ + client: client, + sharedInformer: sharedInformer, + refs: make(map[p9p.Fid]resources.Ref), + } +} + +func (k *Session) getRef(fid p9p.Fid) (resources.Ref, error) { + k.Lock() + defer k.Unlock() + + if fid == p9p.NOFID { + return nil, p9p.ErrUnknownfid + } + + ref, found := k.refs[fid] + if !found { + return nil, p9p.ErrUnknownfid + } + + return ref, nil +} + +func (k *Session) newRef(fid p9p.Fid, resource resources.Ref) (resources.Ref, error) { + k.Lock() + defer k.Unlock() + + if fid == p9p.NOFID { + return nil, p9p.ErrUnknownfid + } + + _, ok := k.refs[fid] + if ok { + return nil, p9p.ErrDupfid + } + + ref := resource + k.refs[fid] = ref + return ref, nil +} + +func (k *Session) Auth(ctx context.Context, afid p9p.Fid, uname string, aname string) (p9p.Qid, error) { + return p9p.Qid{}, nil +} + +func (k *Session) Attach(ctx context.Context, fid p9p.Fid, afid p9p.Fid, uname string, aname string) (p9p.Qid, error) { + if uname == "" { + return p9p.Qid{}, p9p.MessageRerror{Ename: "no user"} + } + + if aname == "" { + aname = "/" + } + + k.uname = uname + k.aname = aname + + ref, err := k.newRef(fid, resources.NewDirRef("/", k, map[string]resources.Ref{ + "namespaces": resources.NewNamespacesRef(k.client, k), + "cluster": resources.NewDirRef("cluster", k, map[string]resources.Ref{}), + })) + if err != nil { + return p9p.Qid{}, err + } + + return ref.Info().Qid, nil +} + +func (k *Session) Clunk(ctx context.Context, fid p9p.Fid) error { + _, err := k.getRef(fid) + if err != nil { + return err + } + + k.Lock() + defer k.Unlock() + delete(k.refs, fid) + + return nil +} + +func (k *Session) Remove(ctx context.Context, fid p9p.Fid) error { + return p9p.ErrUnknownMsg +} + +func (k *Session) Walk(ctx context.Context, fid p9p.Fid, newfid p9p.Fid, names ...string) ([]p9p.Qid, error) { + var qids []p9p.Qid + + ref, err := k.getRef(fid) + if err != nil { + return qids, err + } + + current := ref + for _, name := range names { + newResource, err := current.Get(name) + if err != nil { + break + } + + qids = append(qids, newResource.Info().Qid) + current = newResource + } + + if len(qids) != len(names) { + return qids, nil + } + + _, err = k.newRef(newfid, current) + if err != nil { + return qids, err + } + + return qids, nil +} + +func (k *Session) Read(ctx context.Context, fid p9p.Fid, p []byte, offset int64) (n int, err error) { + ref, err := k.getRef(fid) + if err != nil { + return 0, err + } + + return ref.Read(ctx, p, offset) +} + +func (k *Session) Write(ctx context.Context, fid p9p.Fid, p []byte, offset int64) (n int, err error) { + return 0, p9p.ErrUnknownMsg +} + +func (k *Session) Open(ctx context.Context, fid p9p.Fid, mode p9p.Flag) (p9p.Qid, uint32, error) { + ref, err := k.getRef(fid) + if err != nil { + return p9p.Qid{}, 0, err + } + + return ref.Info().Qid, 0, nil +} + +func (k *Session) Create(ctx context.Context, parent p9p.Fid, name string, perm uint32, mode p9p.Flag) (p9p.Qid, uint32, error) { + return p9p.Qid{}, 0, p9p.ErrUnknownMsg +} + +func (k *Session) Stat(ctx context.Context, fid p9p.Fid) (p9p.Dir, error) { + ref, err := k.getRef(fid) + if err != nil { + return p9p.Dir{}, err + } + + return ref.Info(), nil +} + +func (k *Session) WStat(ctx context.Context, fid p9p.Fid, dir p9p.Dir) error { + return p9p.ErrUnknownMsg +} + +// Version returns the supported version and msize of the session. This +// can be affected by negotiating or the level of support provided by the +// session implementation. +func (k *Session) Version() (msize int, version string) { + return p9p.DefaultMSize, p9p.DefaultVersion +} + +func (k *Session) WaitForCacheSync(stopCh <-chan struct{}) { + k.sharedInformer.WaitForCacheSync(stopCh) +} + +func (k *Session) GetAuth() (uname, aname string) { + return k.uname, k.aname +} + +func (k *Session) Informer() informers.SharedInformerFactory { + return k.sharedInformer +} |