diff options
Diffstat (limited to 'vendor/gopkg.in/mcuadros/go-syslog.v2/server.go')
-rw-r--r-- | vendor/gopkg.in/mcuadros/go-syslog.v2/server.go | 378 |
1 files changed, 378 insertions, 0 deletions
diff --git a/vendor/gopkg.in/mcuadros/go-syslog.v2/server.go b/vendor/gopkg.in/mcuadros/go-syslog.v2/server.go new file mode 100644 index 000000000..352597b89 --- /dev/null +++ b/vendor/gopkg.in/mcuadros/go-syslog.v2/server.go @@ -0,0 +1,378 @@ +package syslog + +import ( + "bufio" + "crypto/tls" + "errors" + "net" + "strings" + "sync" + "time" + + "gopkg.in/mcuadros/go-syslog.v2/format" +) + +var ( + RFC3164 = &format.RFC3164{} // RFC3164: http://www.ietf.org/rfc/rfc3164.txt + RFC5424 = &format.RFC5424{} // RFC5424: http://www.ietf.org/rfc/rfc5424.txt + RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant + Automatic = &format.Automatic{} // Automatically identify the format +) + +const ( + datagramChannelBufferSize = 10 + datagramReadBufferSize = 64 * 1024 +) + +// A function type which gets the TLS peer name from the connection. Can return +// ok=false to terminate the connection +type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool) + +type Server struct { + listeners []net.Listener + connections []net.PacketConn + wait sync.WaitGroup + doneTcp chan bool + datagramChannel chan DatagramMessage + format format.Format + handler Handler + lastError error + readTimeoutMilliseconds int64 + tlsPeerNameFunc TlsPeerNameFunc + datagramPool sync.Pool +} + +//NewServer returns a new Server +func NewServer() *Server { + return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{ + New: func() interface{} { + return make([]byte, 65536) + }, + }} +} + +//Sets the syslog format (RFC3164 or RFC5424 or RFC6587) +func (s *Server) SetFormat(f format.Format) { + s.format = f +} + +//Sets the handler, this handler with receive every syslog entry +func (s *Server) SetHandler(handler Handler) { + s.handler = handler +} + +//Sets the connection timeout for TCP connections, in milliseconds +func (s *Server) SetTimeout(millseconds int64) { + s.readTimeoutMilliseconds = millseconds +} + +// Set the function that extracts a TLS peer name from the TLS connection +func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) { + s.tlsPeerNameFunc = tlsPeerNameFunc +} + +// Default TLS peer name function - returns the CN of the certificate +func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) { + state := tlsConn.ConnectionState() + if len(state.PeerCertificates) <= 0 { + return "", false + } + cn := state.PeerCertificates[0].Subject.CommonName + return cn, true +} + +//Configure the server for listen on an UDP addr +func (s *Server) ListenUDP(addr string) error { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return err + } + + connection, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return err + } + connection.SetReadBuffer(datagramReadBufferSize) + + s.connections = append(s.connections, connection) + return nil +} + +//Configure the server for listen on an unix socket +func (s *Server) ListenUnixgram(addr string) error { + unixAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return err + } + + connection, err := net.ListenUnixgram("unixgram", unixAddr) + if err != nil { + return err + } + connection.SetReadBuffer(datagramReadBufferSize) + + s.connections = append(s.connections, connection) + return nil +} + +//Configure the server for listen on a TCP addr +func (s *Server) ListenTCP(addr string) error { + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return err + } + + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return err + } + + s.doneTcp = make(chan bool) + s.listeners = append(s.listeners, listener) + return nil +} + +//Configure the server for listen on a TCP addr for TLS +func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error { + listener, err := tls.Listen("tcp", addr, config) + if err != nil { + return err + } + + s.doneTcp = make(chan bool) + s.listeners = append(s.listeners, listener) + return nil +} + +//Starts the server, all the go routines goes to live +func (s *Server) Boot() error { + if s.format == nil { + return errors.New("please set a valid format") + } + + if s.handler == nil { + return errors.New("please set a valid handler") + } + + for _, listener := range s.listeners { + s.goAcceptConnection(listener) + } + + if len(s.connections) > 0 { + s.goParseDatagrams() + } + + for _, connection := range s.connections { + s.goReceiveDatagrams(connection) + } + + return nil +} + +func (s *Server) goAcceptConnection(listener net.Listener) { + s.wait.Add(1) + go func(listener net.Listener) { + loop: + for { + select { + case <-s.doneTcp: + break loop + default: + } + connection, err := listener.Accept() + if err != nil { + continue + } + + s.goScanConnection(connection) + } + + s.wait.Done() + }(listener) +} + +func (s *Server) goScanConnection(connection net.Conn) { + scanner := bufio.NewScanner(connection) + if sf := s.format.GetSplitFunc(); sf != nil { + scanner.Split(sf) + } + + remoteAddr := connection.RemoteAddr() + var client string + if remoteAddr != nil { + client = remoteAddr.String() + } + + tlsPeer := "" + if tlsConn, ok := connection.(*tls.Conn); ok { + // Handshake now so we get the TLS peer information + if err := tlsConn.Handshake(); err != nil { + connection.Close() + return + } + if s.tlsPeerNameFunc != nil { + var ok bool + tlsPeer, ok = s.tlsPeerNameFunc(tlsConn) + if !ok { + connection.Close() + return + } + } + } + + var scanCloser *ScanCloser + scanCloser = &ScanCloser{scanner, connection} + + s.wait.Add(1) + go s.scan(scanCloser, client, tlsPeer) +} + +func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) { +loop: + for { + select { + case <-s.doneTcp: + break loop + default: + } + if s.readTimeoutMilliseconds > 0 { + scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond)) + } + if scanCloser.Scan() { + s.parser([]byte(scanCloser.Text()), client, tlsPeer) + } else { + break loop + } + } + scanCloser.closer.Close() + + s.wait.Done() +} + +func (s *Server) parser(line []byte, client string, tlsPeer string) { + parser := s.format.GetParser(line) + err := parser.Parse() + if err != nil { + s.lastError = err + } + + logParts := parser.Dump() + logParts["client"] = client + if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) { + if i := strings.Index(client, ":"); i > 1 { + logParts["hostname"] = client[:i] + } else { + logParts["hostname"] = client + } + } + logParts["tls_peer"] = tlsPeer + + s.handler.Handle(logParts, int64(len(line)), err) +} + +//Returns the last error +func (s *Server) GetLastError() error { + return s.lastError +} + +//Kill the server +func (s *Server) Kill() error { + for _, connection := range s.connections { + err := connection.Close() + if err != nil { + return err + } + } + + for _, listener := range s.listeners { + err := listener.Close() + if err != nil { + return err + } + } + // Only need to close channel once to broadcast to all waiting + if s.doneTcp != nil { + close(s.doneTcp) + } + if s.datagramChannel != nil { + close(s.datagramChannel) + } + return nil +} + +//Waits until the server stops +func (s *Server) Wait() { + s.wait.Wait() +} + +type TimeoutCloser interface { + Close() error + SetReadDeadline(t time.Time) error +} + +type ScanCloser struct { + *bufio.Scanner + closer TimeoutCloser +} + +type DatagramMessage struct { + message []byte + client string +} + +func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) { + s.wait.Add(1) + go func() { + defer s.wait.Done() + for { + buf := s.datagramPool.Get().([]byte) + n, addr, err := packetconn.ReadFrom(buf) + if err == nil { + // Ignore trailing control characters and NULs + for ; (n > 0) && (buf[n-1] < 32); n-- { + } + if n > 0 { + var address string + if addr != nil { + address = addr.String() + } + s.datagramChannel <- DatagramMessage{buf[:n], address} + } + } else { + // there has been an error. Either the server has been killed + // or may be getting a transitory error due to (e.g.) the + // interface being shutdown in which case sleep() to avoid busy wait. + opError, ok := err.(*net.OpError) + if (ok) && !opError.Temporary() && !opError.Timeout() { + return + } + time.Sleep(10 * time.Millisecond) + } + } + }() +} + +func (s *Server) goParseDatagrams() { + s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize) + + s.wait.Add(1) + go func() { + defer s.wait.Done() + for { + select { + case msg, ok := (<-s.datagramChannel): + if !ok { + return + } + if sf := s.format.GetSplitFunc(); sf != nil { + if _, token, err := sf(msg.message, true); err == nil { + s.parser(token, msg.client, "") + } + } else { + s.parser(msg.message, msg.client, "") + } + s.datagramPool.Put(msg.message[:cap(msg.message)]) + } + } + }() +} |