Add Pause/Resume #18

Open
zeripath wants to merge 2 commits from zeripath/log:add-pausing into master
3 changed files with 220 additions and 55 deletions

224
event.go
View File

@ -21,6 +21,15 @@ type Event struct {
stacktrace string
}
type logAction int
const (
logClose logAction = iota
logFlush
logPause
logResume
)
// EventLogger represents the behaviours of a logger
type EventLogger interface {
LogEvent(event *Event) error
@ -31,14 +40,19 @@ type EventLogger interface {
GetName() string
}
// Pausable represents a pausable logger
type Pausable interface {
Pause()
Resume()
}
// ChannelledLog represents a cached channel to a LoggerProvider
type ChannelledLog struct {
name string
provider string
queue chan *Event
loggerProvider LoggerProvider
flush chan bool
close chan bool
actions chan logAction
closed chan bool
}
@ -46,10 +60,9 @@ type ChannelledLog struct {
func NewChannelledLog(name, provider, config string, bufferLength int64) (*ChannelledLog, error) {
if log, ok := providers[provider]; ok {
l := &ChannelledLog{
queue: make(chan *Event, bufferLength),
flush: make(chan bool),
close: make(chan bool),
closed: make(chan bool),
queue: make(chan *Event, bufferLength),
actions: make(chan logAction),
closed: make(chan bool),
}
l.loggerProvider = log()
if err := l.loggerProvider.Init(config); err != nil {
@ -65,23 +78,65 @@ func NewChannelledLog(name, provider, config string, bufferLength int64) (*Chann
// Start processing the ChannelledLog
func (l *ChannelledLog) Start() {
logLoop := func() logAction {
for {
select {
case event, ok := <-l.queue:
if !ok {
l.closeLogger()
return logClose
}
l.loggerProvider.LogEvent(event)
case action, ok := <-l.actions:
if !ok {
l.closeLogger()
return logClose
}
switch action {
case logFlush:
l.loggerProvider.Flush()
case logClose:
l.closeLogger()
return logClose
case logPause:
if pausable := l.loggerProvider.(Pausable); pausable != nil {
pausable.Pause()
}
return logPause
}
}
}
}
pausedLoop := func() logAction {
for {
action, ok := <-l.actions
if !ok {
l.closeLogger()
return logClose
}
switch action {
case logResume:
if pausable := l.loggerProvider.(Pausable); pausable != nil {
pausable.Resume()
}
return logResume
case logClose:
l.closeLogger()
return logClose
}
}
}
currentLoop := logLoop
for {
select {
case event, ok := <-l.queue:
if !ok {
l.closeLogger()
return
}
l.loggerProvider.LogEvent(event)
case _, ok := <-l.flush:
if !ok {
l.closeLogger()
return
}
l.loggerProvider.Flush()
case <-l.close:
l.closeLogger()
switch currentLoop() {
case logClose:
return
case logPause:
currentLoop = pausedLoop
case logResume:
currentLoop = logLoop
}
}
}
@ -108,13 +163,23 @@ func (l *ChannelledLog) closeLogger() {
// Close this ChannelledLog
func (l *ChannelledLog) Close() {
l.close <- true
l.actions <- logClose
<-l.closed
}
// Flush this ChannelledLog
func (l *ChannelledLog) Flush() {
l.flush <- true
l.actions <- logFlush
}
// Pause this ChannelledLog
func (l *ChannelledLog) Pause() {
l.actions <- logPause
}
// Resume this ChannelledLog
func (l *ChannelledLog) Resume() {
l.actions <- logResume
}
// GetLevel gets the level of this ChannelledLog
@ -139,8 +204,7 @@ type MultiChannelledLog struct {
queue chan *Event
mutex sync.Mutex
loggers map[string]EventLogger
flush chan bool
close chan bool
actions chan logAction
started bool
level Level
stacktraceLevel Level
@ -152,12 +216,11 @@ func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog
m := &MultiChannelledLog{
name: name,
queue: make(chan *Event, bufferLength),
flush: make(chan bool),
actions: make(chan logAction),
bufferLength: bufferLength,
loggers: make(map[string]EventLogger),
level: NONE,
stacktraceLevel: NONE,
close: make(chan bool),
closed: make(chan bool),
}
return m
@ -238,34 +301,85 @@ func (m *MultiChannelledLog) Start() {
}
m.started = true
m.mutex.Unlock()
for {
select {
case event, ok := <-m.queue:
if !ok {
m.closeLoggers()
return
}
m.mutex.Lock()
for _, logger := range m.loggers {
err := logger.LogEvent(event)
if err != nil {
fmt.Println(err)
logLoop := func() logAction {
for {
select {
case event, ok := <-m.queue:
if !ok {
m.closeLoggers()
return logClose
}
m.mutex.Lock()
for _, logger := range m.loggers {
err := logger.LogEvent(event)
if err != nil {
fmt.Println(err)
}
}
m.mutex.Unlock()
case action, ok := <-m.actions:
if !ok {
m.closeLoggers()
return logClose
}
switch action {
case logFlush:
m.mutex.Lock()
for _, logger := range m.loggers {
logger.Flush()
}
m.mutex.Unlock()
case logClose:
m.closeLoggers()
return logClose
case logPause:
m.mutex.Lock()
for _, logger := range m.loggers {
if pausable := logger.(Pausable); pausable != nil {
pausable.Pause()
}
}
m.mutex.Unlock()
return logPause
}
}
m.mutex.Unlock()
case _, ok := <-m.flush:
}
}
pausedLoop := func() logAction {
for {
action, ok := <-m.actions
if !ok {
m.closeLoggers()
return
return logClose
}
m.mutex.Lock()
for _, logger := range m.loggers {
logger.Flush()
switch action {
case logClose:
m.closeLoggers()
return logClose
case logResume:
m.mutex.Lock()
for _, logger := range m.loggers {
if pausable := logger.(Pausable); pausable != nil {
pausable.Resume()
}
}
m.mutex.Unlock()
return logResume
}
m.mutex.Unlock()
case <-m.close:
m.closeLoggers()
}
}
currentLoop := logLoop
for {
switch currentLoop() {
case logClose:
return
case logPause:
currentLoop = pausedLoop
case logResume:
currentLoop = logLoop
}
}
}
@ -286,13 +400,23 @@ func (m *MultiChannelledLog) LogEvent(event *Event) error {
// Close this MultiChannelledLog
func (m *MultiChannelledLog) Close() {
m.close <- true
m.actions <- logClose
<-m.closed
}
// Flush this ChannelledLog
func (m *MultiChannelledLog) Flush() {
m.flush <- true
m.actions <- logFlush
}
// Pause this ChannelledLog
func (m *MultiChannelledLog) Pause() {
m.actions <- logPause
}
// Resume this ChannelledLog
func (m *MultiChannelledLog) Resume() {
m.actions <- logResume
}
// GetLevel gets the level of this MultiChannelledLog

35
file.go
View File

@ -44,9 +44,10 @@ type FileLogger struct {
// MuxWriter an *os.File writer with locker.
type MuxWriter struct {
mu sync.Mutex
fd *os.File
owner *FileLogger
mu sync.Mutex
closed bool
fd *os.File
owner *FileLogger
}
// Write writes to os.File.
@ -59,14 +60,19 @@ func (mw *MuxWriter) Write(b []byte) (int, error) {
// Close the internal writer
func (mw *MuxWriter) Close() error {
if mw.closed {
return nil
}
mw.closed = true
return mw.fd.Close()
}
// SetFd sets os.File in writer.
// SetFd sets os.File in writer
func (mw *MuxWriter) SetFd(fd *os.File) {
if mw.fd != nil {
mw.fd.Close()
}
mw.closed = false
mw.fd = fd
}
@ -110,7 +116,7 @@ func (log *FileLogger) Init(config string) error {
return log.StartLogger()
}
// StartLogger start file logger. create log file and set to locker-inside file writer.
// StartLogger start file logger. create log file and set to locker-inside file writer
func (log *FileLogger) StartLogger() error {
fd, err := log.createLogFile()
if err != nil {
@ -123,6 +129,9 @@ func (log *FileLogger) StartLogger() error {
func (log *FileLogger) docheck(size int) {
log.startLock.Lock()
defer log.startLock.Unlock()
if log.mw.closed {
_ = log.StartLogger()
}
if log.Rotate && ((log.Maxsize > 0 && log.maxsizeCursize >= log.Maxsize) ||
(log.Daily && time.Now().Day() != log.dailyOpenDate)) {
if err := log.DoRotate(); err != nil {
@ -249,6 +258,22 @@ func (log *FileLogger) Flush() {
_ = log.mw.fd.Sync()
}
// Pause pauses the logger - closing the file
func (log *FileLogger) Pause() {
log.mw.mu.Lock()
defer log.mw.mu.Unlock()
_ = log.mw.fd.Sync()
_ = log.mw.Close()
}
// Resume resumes the logger - reopening the logging file
func (log *FileLogger) Resume() {
// re-start logger
log.mw.mu.Lock()
defer log.mw.mu.Unlock()
_ = log.StartLogger()
}
// GetName returns the default name for this implementation
func (log *FileLogger) GetName() string {
return "file"

16
log.go
View File

@ -200,6 +200,22 @@ func IsFatal() bool {
return GetLevel() <= FATAL
}
// Pause pauses all the loggers
func Pause() {
NamedLoggers.Range(func(name string, logger *Logger) bool {
logger.Pause()
return true
})
}
// Resume resumes logging
func Resume() {
NamedLoggers.Range(func(name string, logger *Logger) bool {
logger.Resume()
return true
})
}
// Close closes all the loggers
func Close() {
NamedLoggers.Range(func(name string, logger *Logger) bool {