# lognestmonster Copyright (c) 2019 Joshua 'joshuas3' Stockin
# .
# This file is part of lognestmonster.
# lognestmonster is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# lognestmonster is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with lognestmonster. If not, see .
import struct
import os
STATEMENT_START = 0
STATEMENT_END = 1
EVENT_START = 2
EVENT_END = 3
VERBOSITY_LEVELS = {
0: "INIT",
1: "DEBUG",
2: "VERBOSE",
3: "VERYVERBOSE",
4: "WARNING",
5: "ERROR"
}
def ulonglong(bytestr):
return struct.unpack("@Q", bytestr)[0]
def uchar(charv):
return struct.unpack("@B", charv)[0]
def ushort(shortv):
return struct.unpack("@H", shortv)[0]
class EventProto:
parent = None
pushed = [False]
def __init__(self):
self.parent = None
self.pushed = [False]
class Reader:
fd = None
top_level = []
current_event = None
event_count = 0
statement_count = 0
total_events = 0
total_statements = 0
seekable = True
file_size = 0
position = 0
version = 0
timestamp = 0
bad_bytes = 0
filter_time_start = -1
filter_time_end = -1
filter_verbosity = -1
filter_tag = -1
update_callbacks = []
def __init__(self, fd, seekable=True):
self.fd = fd
self.top_level = []
self.current_event = None
self.event_count = 0
self.statement_count = 0
self.total_events = 0
self.total_statements = 0
self.seekable = seekable
self.file_size = 0
self.position = 0
self.version = 0
self.timestamp = 0
self.bad_bytes = 0
self.filters = False
self.filter_time_start = -1
self.filter_time_end = -1
self.filter_verbosity = -1
self.filter_tag = -1
self.update_callbacks = []
def update(self):
for callback in self.update_callbacks:
callback()
def onupdate(self, callback):
self.update_callbacks.append(callback)
def size(self):
self.fd.seek(0, os.SEEK_END) # go to end of file and get position
newsize = self.fd.tell()
self.fd.seek(self.position) # return to previous position
is_diff = self.file_size is not newsize
self.file_size = newsize
return is_diff
def pos(self):
p = self.fd.tell()
self.position = p
return p
def seek(self, position):
self.position = position
self.fd.seek(self.position)
def read(self, byte_count):
return self.fd.read(byte_count)
def fetch_item(self, position):
previouspos = self.pos()
self.seek(position)
timestamp = ulonglong(self.read(8))
verbosity = uchar(self.read(1))
tag_size = uchar(self.read(1))
tag = self.read(tag_size).decode("utf-8")
message_size = ushort(self.read(2))
message = self.read(message_size).decode("utf-8")
self.seek(previouspos)
return (timestamp, verbosity, tag, message)
def parse_block(self, in_byte):
seekable = self.seekable
if in_byte == STATEMENT_START: # the byte indicates a statement's start, begin interpreting
if seekable:
this_position = self.pos() # identify and save the seeker position of this statement
try:
block = self.read(10)
timestamp = ulonglong(block[:8]) # bytes 1-8
verbosity = block[8] # byte 9
tag = self.read(block[9]) # byte 10
append = True
if self.filters:
if self.filter_time_start is not -1 and append:
append = timestamp > self.filter_time_start
if self.filter_time_end is not -1 and append:
append = timestamp < self.filter_time_end
if self.filter_verbosity is not -1 and append:
append = verbosity in self.filter_verbosity
if self.filter_tag is not -1 and append:
tag = tag.decode("utf-8")
append = tag == self.filter_tag
message_size = ushort(self.read(2))
if seekable:
self.read(message_size)
while self.read(1)[0] is not STATEMENT_END and self.pos() < self.file_size:
self.bad_bytes += 1
else:
message = self.read(message_size).decode("utf-8")
while self.read(1)[0] is not STATEMENT_END:
self.bad_bytes += 1
if append == True:
self.statement_count += 1
if self.current_event is not None:
if seekable:
self.current_event.pushed.append(this_position)
else:
self.current_event.pushed.append((timestamp, verbosity, tag, message))
else:
if seekable:
self.top_level.append(this_position)
else:
self.top_level.append((timestamp, verbosity, tag, message))
self.total_statements += 1
except:
return -1
elif in_byte == EVENT_START: # the byte indicates an event's start, create an event
new_event = EventProto()
if self.current_event is not None: # we're already inside an event, set the new event's parent to match
new_event.parent = self.current_event
self.current_event = new_event
elif in_byte == EVENT_END: # end of event
if self.current_event is not None:
if len(self.current_event.pushed) > 1:
self.event_count += 1
if self.current_event.parent is not None:
self.current_event.parent.pushed.append(self.current_event)
self.current_event = self.current_event.parent
else:
self.top_level.append(self.current_event)
self.current_event = None
else: # event is empty
if self.current_event.parent is not None:
self.current_event = self.current_event.parent
else:
self.current_event = None
self.total_events += 1
else:
self.bad_bytes += 1 # event doesn't exist, bad byte
return -1
else: # unknown byte
self.bad_bytes += 1
return -1
def scan(self): # scan for events and statements from self.position to the end of file
if not self.seekable: raise Exception("attempting to scan non-seekable stream")
if self.pos() == 0: # if it's the start of the file, grab version and timestamp
self.version = self.read(1)[0]
self.timestamp = ulonglong(self.read(8))
while self.pos() < self.file_size: # while the seeker is before EOF
in_byte = self.read(1)[0] # read 1 byte
self.parse_block(in_byte) # parse block based on byte read
self.update() # trigger callbacks