Add Pause/Resume #18
224
event.go
224
event.go
|
@ -21,6 +21,15 @@ type Event struct {
|
|||
stacktrace string
|
||||
}
|
||||
|
||||
type logAction int
|
||||
|
||||
const (
|
||||
logClose logAction = iota
|
||||
logFlush
|
||||
logPause
|
||||
logResume
|
||||
)
|
||||
|
||||
// EventLogger represents the behaviours of a logger
|
||||
type EventLogger interface {
|
||||
LogEvent(event *Event) error
|
||||
|
@ -31,14 +40,19 @@ type EventLogger interface {
|
|||
GetName() string
|
||||
}
|
||||
|
||||
// Pausable represents a pausable logger
|
||||
type Pausable interface {
|
||||
Pause()
|
||||
Resume()
|
||||
}
|
||||
|
||||
// ChannelledLog represents a cached channel to a LoggerProvider
|
||||
type ChannelledLog struct {
|
||||
name string
|
||||
provider string
|
||||
queue chan *Event
|
||||
loggerProvider LoggerProvider
|
||||
flush chan bool
|
||||
close chan bool
|
||||
actions chan logAction
|
||||
closed chan bool
|
||||
}
|
||||
|
||||
|
@ -46,10 +60,9 @@ type ChannelledLog struct {
|
|||
func NewChannelledLog(name, provider, config string, bufferLength int64) (*ChannelledLog, error) {
|
||||
if log, ok := providers[provider]; ok {
|
||||
l := &ChannelledLog{
|
||||
queue: make(chan *Event, bufferLength),
|
||||
flush: make(chan bool),
|
||||
close: make(chan bool),
|
||||
closed: make(chan bool),
|
||||
queue: make(chan *Event, bufferLength),
|
||||
actions: make(chan logAction),
|
||||
closed: make(chan bool),
|
||||
}
|
||||
l.loggerProvider = log()
|
||||
if err := l.loggerProvider.Init(config); err != nil {
|
||||
|
@ -65,23 +78,65 @@ func NewChannelledLog(name, provider, config string, bufferLength int64) (*Chann
|
|||
|
||||
// Start processing the ChannelledLog
|
||||
func (l *ChannelledLog) Start() {
|
||||
logLoop := func() logAction {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-l.queue:
|
||||
if !ok {
|
||||
l.closeLogger()
|
||||
return logClose
|
||||
}
|
||||
l.loggerProvider.LogEvent(event)
|
||||
case action, ok := <-l.actions:
|
||||
if !ok {
|
||||
l.closeLogger()
|
||||
return logClose
|
||||
}
|
||||
switch action {
|
||||
case logFlush:
|
||||
l.loggerProvider.Flush()
|
||||
case logClose:
|
||||
l.closeLogger()
|
||||
return logClose
|
||||
case logPause:
|
||||
if pausable := l.loggerProvider.(Pausable); pausable != nil {
|
||||
pausable.Pause()
|
||||
}
|
||||
return logPause
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pausedLoop := func() logAction {
|
||||
for {
|
||||
action, ok := <-l.actions
|
||||
if !ok {
|
||||
l.closeLogger()
|
||||
return logClose
|
||||
}
|
||||
switch action {
|
||||
case logResume:
|
||||
if pausable := l.loggerProvider.(Pausable); pausable != nil {
|
||||
pausable.Resume()
|
||||
}
|
||||
return logResume
|
||||
case logClose:
|
||||
l.closeLogger()
|
||||
return logClose
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
currentLoop := logLoop
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-l.queue:
|
||||
if !ok {
|
||||
l.closeLogger()
|
||||
return
|
||||
}
|
||||
l.loggerProvider.LogEvent(event)
|
||||
case _, ok := <-l.flush:
|
||||
if !ok {
|
||||
l.closeLogger()
|
||||
return
|
||||
}
|
||||
l.loggerProvider.Flush()
|
||||
case <-l.close:
|
||||
l.closeLogger()
|
||||
switch currentLoop() {
|
||||
case logClose:
|
||||
return
|
||||
case logPause:
|
||||
currentLoop = pausedLoop
|
||||
case logResume:
|
||||
currentLoop = logLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -108,13 +163,23 @@ func (l *ChannelledLog) closeLogger() {
|
|||
|
||||
// Close this ChannelledLog
|
||||
func (l *ChannelledLog) Close() {
|
||||
l.close <- true
|
||||
l.actions <- logClose
|
||||
<-l.closed
|
||||
}
|
||||
|
||||
// Flush this ChannelledLog
|
||||
func (l *ChannelledLog) Flush() {
|
||||
l.flush <- true
|
||||
l.actions <- logFlush
|
||||
}
|
||||
|
||||
// Pause this ChannelledLog
|
||||
func (l *ChannelledLog) Pause() {
|
||||
l.actions <- logPause
|
||||
}
|
||||
|
||||
// Resume this ChannelledLog
|
||||
func (l *ChannelledLog) Resume() {
|
||||
l.actions <- logResume
|
||||
}
|
||||
|
||||
// GetLevel gets the level of this ChannelledLog
|
||||
|
@ -139,8 +204,7 @@ type MultiChannelledLog struct {
|
|||
queue chan *Event
|
||||
mutex sync.Mutex
|
||||
loggers map[string]EventLogger
|
||||
flush chan bool
|
||||
close chan bool
|
||||
actions chan logAction
|
||||
started bool
|
||||
level Level
|
||||
stacktraceLevel Level
|
||||
|
@ -152,12 +216,11 @@ func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog
|
|||
m := &MultiChannelledLog{
|
||||
name: name,
|
||||
queue: make(chan *Event, bufferLength),
|
||||
flush: make(chan bool),
|
||||
actions: make(chan logAction),
|
||||
bufferLength: bufferLength,
|
||||
loggers: make(map[string]EventLogger),
|
||||
level: NONE,
|
||||
stacktraceLevel: NONE,
|
||||
close: make(chan bool),
|
||||
closed: make(chan bool),
|
||||
}
|
||||
return m
|
||||
|
@ -238,34 +301,85 @@ func (m *MultiChannelledLog) Start() {
|
|||
}
|
||||
m.started = true
|
||||
m.mutex.Unlock()
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-m.queue:
|
||||
if !ok {
|
||||
m.closeLoggers()
|
||||
return
|
||||
}
|
||||
m.mutex.Lock()
|
||||
for _, logger := range m.loggers {
|
||||
err := logger.LogEvent(event)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
|
||||
logLoop := func() logAction {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-m.queue:
|
||||
if !ok {
|
||||
m.closeLoggers()
|
||||
return logClose
|
||||
}
|
||||
m.mutex.Lock()
|
||||
for _, logger := range m.loggers {
|
||||
err := logger.LogEvent(event)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
case action, ok := <-m.actions:
|
||||
if !ok {
|
||||
m.closeLoggers()
|
||||
return logClose
|
||||
}
|
||||
switch action {
|
||||
case logFlush:
|
||||
m.mutex.Lock()
|
||||
for _, logger := range m.loggers {
|
||||
logger.Flush()
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
case logClose:
|
||||
m.closeLoggers()
|
||||
return logClose
|
||||
case logPause:
|
||||
m.mutex.Lock()
|
||||
for _, logger := range m.loggers {
|
||||
if pausable := logger.(Pausable); pausable != nil {
|
||||
pausable.Pause()
|
||||
}
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
return logPause
|
||||
}
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
case _, ok := <-m.flush:
|
||||
}
|
||||
}
|
||||
|
||||
pausedLoop := func() logAction {
|
||||
for {
|
||||
action, ok := <-m.actions
|
||||
if !ok {
|
||||
m.closeLoggers()
|
||||
return
|
||||
return logClose
|
||||
}
|
||||
m.mutex.Lock()
|
||||
for _, logger := range m.loggers {
|
||||
logger.Flush()
|
||||
switch action {
|
||||
case logClose:
|
||||
m.closeLoggers()
|
||||
return logClose
|
||||
case logResume:
|
||||
m.mutex.Lock()
|
||||
for _, logger := range m.loggers {
|
||||
if pausable := logger.(Pausable); pausable != nil {
|
||||
pausable.Resume()
|
||||
}
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
return logResume
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
case <-m.close:
|
||||
m.closeLoggers()
|
||||
}
|
||||
}
|
||||
|
||||
currentLoop := logLoop
|
||||
for {
|
||||
switch currentLoop() {
|
||||
case logClose:
|
||||
return
|
||||
case logPause:
|
||||
currentLoop = pausedLoop
|
||||
case logResume:
|
||||
currentLoop = logLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -286,13 +400,23 @@ func (m *MultiChannelledLog) LogEvent(event *Event) error {
|
|||
|
||||
// Close this MultiChannelledLog
|
||||
func (m *MultiChannelledLog) Close() {
|
||||
m.close <- true
|
||||
m.actions <- logClose
|
||||
<-m.closed
|
||||
}
|
||||
|
||||
// Flush this ChannelledLog
|
||||
func (m *MultiChannelledLog) Flush() {
|
||||
m.flush <- true
|
||||
m.actions <- logFlush
|
||||
}
|
||||
|
||||
// Pause this ChannelledLog
|
||||
func (m *MultiChannelledLog) Pause() {
|
||||
m.actions <- logPause
|
||||
}
|
||||
|
||||
// Resume this ChannelledLog
|
||||
func (m *MultiChannelledLog) Resume() {
|
||||
m.actions <- logResume
|
||||
}
|
||||
|
||||
// GetLevel gets the level of this MultiChannelledLog
|
||||
|
|
35
file.go
35
file.go
|
@ -44,9 +44,10 @@ type FileLogger struct {
|
|||
|
||||
// MuxWriter an *os.File writer with locker.
|
||||
type MuxWriter struct {
|
||||
mu sync.Mutex
|
||||
fd *os.File
|
||||
owner *FileLogger
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
fd *os.File
|
||||
owner *FileLogger
|
||||
}
|
||||
|
||||
// Write writes to os.File.
|
||||
|
@ -59,14 +60,19 @@ func (mw *MuxWriter) Write(b []byte) (int, error) {
|
|||
|
||||
// Close the internal writer
|
||||
func (mw *MuxWriter) Close() error {
|
||||
if mw.closed {
|
||||
return nil
|
||||
}
|
||||
mw.closed = true
|
||||
return mw.fd.Close()
|
||||
}
|
||||
|
||||
// SetFd sets os.File in writer.
|
||||
// SetFd sets os.File in writer
|
||||
func (mw *MuxWriter) SetFd(fd *os.File) {
|
||||
if mw.fd != nil {
|
||||
mw.fd.Close()
|
||||
}
|
||||
mw.closed = false
|
||||
mw.fd = fd
|
||||
}
|
||||
|
||||
|
@ -110,7 +116,7 @@ func (log *FileLogger) Init(config string) error {
|
|||
return log.StartLogger()
|
||||
}
|
||||
|
||||
// StartLogger start file logger. create log file and set to locker-inside file writer.
|
||||
// StartLogger start file logger. create log file and set to locker-inside file writer
|
||||
func (log *FileLogger) StartLogger() error {
|
||||
fd, err := log.createLogFile()
|
||||
if err != nil {
|
||||
|
@ -123,6 +129,9 @@ func (log *FileLogger) StartLogger() error {
|
|||
func (log *FileLogger) docheck(size int) {
|
||||
log.startLock.Lock()
|
||||
defer log.startLock.Unlock()
|
||||
if log.mw.closed {
|
||||
_ = log.StartLogger()
|
||||
}
|
||||
if log.Rotate && ((log.Maxsize > 0 && log.maxsizeCursize >= log.Maxsize) ||
|
||||
(log.Daily && time.Now().Day() != log.dailyOpenDate)) {
|
||||
if err := log.DoRotate(); err != nil {
|
||||
|
@ -249,6 +258,22 @@ func (log *FileLogger) Flush() {
|
|||
_ = log.mw.fd.Sync()
|
||||
}
|
||||
|
||||
// Pause pauses the logger - closing the file
|
||||
func (log *FileLogger) Pause() {
|
||||
log.mw.mu.Lock()
|
||||
defer log.mw.mu.Unlock()
|
||||
_ = log.mw.fd.Sync()
|
||||
_ = log.mw.Close()
|
||||
}
|
||||
|
||||
// Resume resumes the logger - reopening the logging file
|
||||
func (log *FileLogger) Resume() {
|
||||
// re-start logger
|
||||
log.mw.mu.Lock()
|
||||
defer log.mw.mu.Unlock()
|
||||
_ = log.StartLogger()
|
||||
}
|
||||
|
||||
// GetName returns the default name for this implementation
|
||||
func (log *FileLogger) GetName() string {
|
||||
return "file"
|
||||
|
|
16
log.go
16
log.go
|
@ -200,6 +200,22 @@ func IsFatal() bool {
|
|||
return GetLevel() <= FATAL
|
||||
}
|
||||
|
||||
// Pause pauses all the loggers
|
||||
func Pause() {
|
||||
NamedLoggers.Range(func(name string, logger *Logger) bool {
|
||||
logger.Pause()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// Resume resumes logging
|
||||
func Resume() {
|
||||
NamedLoggers.Range(func(name string, logger *Logger) bool {
|
||||
logger.Resume()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// Close closes all the loggers
|
||||
func Close() {
|
||||
NamedLoggers.Range(func(name string, logger *Logger) bool {
|
||||
|
|
Reference in New Issue
Block a user