main
cypherpunk 12 months ago
parent f4c452e05b
commit c5118a72af

@ -0,0 +1,53 @@
Metadata-Version: 2.1
Name: aiologger
Version: 0.7.0
Summary: Asynchronous logging for python and asyncio
Home-page: https://github.com/b2wdigital/aiologger
Author: Diogo Magalhães Martins
Author-email: magalhaesmartins@icloud.com
License: MIT
Project-URL: Documentation, https://aiologger.readthedocs.io/en/latest/
Project-URL: Code, https://github.com/b2wdigital/aiologger
Project-URL: Issue tracker, https://github.com/b2wdigital/aiologger/issues
Description: # aiologger
[![PYPI](https://img.shields.io/pypi/v/aiologger.svg)](http://pypi.python.org/pypi/aiologger)
[![PYPI Python Versions](https://img.shields.io/pypi/pyversions/aiologger.svg)](http://pypi.python.org/pypi/aiologger)
[![Build Status](https://github.com/async-worker/aiologger/actions/workflows/main.yaml/badge.svg?branch=main)](https://github.com/async-worker/aiologger/actions/workflows/main.yaml)
[![Maintainability](https://api.codeclimate.com/v1/badges/122a3696a749ed04a972/maintainability)](https://codeclimate.com/github/async-worker/aiologger/maintainability)
[![Test Coverage](https://api.codeclimate.com/v1/badges/122a3696a749ed04a972/test_coverage)](https://codeclimate.com/github/async-worker/aiologger/test_coverage)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)
# About the Project
The built-in python logger is I/O blocking. This means that using the built-in
`logging` module will interfere with your asynchronous application performance.
`aiologger` aims to be the standard Asynchronous non-blocking logging for
python and asyncio.
# Documentation
The project documentation can be found here: https://async-worker.github.io/aiologger/
Keywords: logging json log output
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: Unix
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Logging
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: aiofiles

@ -1,3 +1,20 @@
# aiologger
# aiologger+r000t's patch
Asynchronous non-blocking logging for python and asyncio
This project is forked from the upstream PYPI project (which treats github as upstream). It has one patch from r000t.
upstream is at http://pypi.python.org/pypi/aiologger
# About the Project
The built-in python logger is I/O blocking. This means that using the built-in
`logging` module will interfere with your asynchronous application performance.
`aiologger` aims to be the standard Asynchronous non-blocking logging for
python and asyncio.
# Documentation
Good luck.
original author was
Diogo Magalhães Martins
magalhaesmartins@icloud.com
there is one commit from @r000t@ligma.pro.

@ -0,0 +1,53 @@
Metadata-Version: 2.1
Name: aiologger
Version: 0.7.0
Summary: Asynchronous logging for python and asyncio
Home-page: https://github.com/b2wdigital/aiologger
Author: Diogo Magalhães Martins
Author-email: magalhaesmartins@icloud.com
License: MIT
Project-URL: Documentation, https://aiologger.readthedocs.io/en/latest/
Project-URL: Code, https://github.com/b2wdigital/aiologger
Project-URL: Issue tracker, https://github.com/b2wdigital/aiologger/issues
Description: # aiologger
[![PYPI](https://img.shields.io/pypi/v/aiologger.svg)](http://pypi.python.org/pypi/aiologger)
[![PYPI Python Versions](https://img.shields.io/pypi/pyversions/aiologger.svg)](http://pypi.python.org/pypi/aiologger)
[![Build Status](https://github.com/async-worker/aiologger/actions/workflows/main.yaml/badge.svg?branch=main)](https://github.com/async-worker/aiologger/actions/workflows/main.yaml)
[![Maintainability](https://api.codeclimate.com/v1/badges/122a3696a749ed04a972/maintainability)](https://codeclimate.com/github/async-worker/aiologger/maintainability)
[![Test Coverage](https://api.codeclimate.com/v1/badges/122a3696a749ed04a972/test_coverage)](https://codeclimate.com/github/async-worker/aiologger/test_coverage)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)
# About the Project
The built-in python logger is I/O blocking. This means that using the built-in
`logging` module will interfere with your asynchronous application performance.
`aiologger` aims to be the standard Asynchronous non-blocking logging for
python and asyncio.
# Documentation
The project documentation can be found here: https://async-worker.github.io/aiologger/
Keywords: logging json log output
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: Unix
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Logging
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: aiofiles

@ -0,0 +1,25 @@
README.md
setup.cfg
setup.py
aiologger/__init__.py
aiologger/filters.py
aiologger/levels.py
aiologger/logger.py
aiologger/protocols.py
aiologger/records.py
aiologger/settings.py
aiologger/utils.py
aiologger.egg-info/PKG-INFO
aiologger.egg-info/SOURCES.txt
aiologger.egg-info/dependency_links.txt
aiologger.egg-info/requires.txt
aiologger.egg-info/top_level.txt
aiologger/formatters/__init__.py
aiologger/formatters/base.py
aiologger/formatters/json.py
aiologger/handlers/__init__.py
aiologger/handlers/base.py
aiologger/handlers/files.py
aiologger/handlers/streams.py
aiologger/loggers/__init__.py
aiologger/loggers/json.py

@ -0,0 +1,3 @@
[aiofiles]
aiofiles==0.4.0

@ -0,0 +1 @@
from .logger import Logger

@ -0,0 +1,98 @@
# The following code and documentation was inspired, and in some cases
# copied and modified, from the work of Vinay Sajip and contributors
# on cpython's logging package
from abc import ABC
from typing import List, Callable, Union
from aiologger.levels import LogLevel
from aiologger.records import LogRecord
class Filter:
"""
Filter instances are used to perform arbitrary filtering of LogRecords.
Loggers and Handlers can optionally use Filter instances to filter
records as desired. The base filter class only allows events which are
below a certain point in the logger hierarchy. For example, a filter
initialized with "A.B" will allow events logged by loggers "A.B",
"A.B.C", "A.B.C.D", "A.B.D" etc. but not "A.BB", "B.A.B" etc. If
initialized with the empty string, all events are passed.
"""
def __init__(self, name: str = "") -> None:
"""
Initialize a filter.
Initialize with the name of the logger which, together with its
children, will have its events allowed through the filter. If no
name is specified, allow every event.
"""
self.name = name
self.name_length = len(name)
def filter(self, record: LogRecord) -> bool:
"""
Determine if the specified record is to be logged.
"""
if self.name_length == 0:
return True
elif self.name == record.name:
return True
elif not record.name.startswith(self.name):
return False
return record.name[self.name_length] == "."
def __call__(self, record: LogRecord) -> bool:
return self.filter(record)
_FilterCallable = Callable[[LogRecord], bool]
class Filterer(ABC):
"""
A base class for loggers and handlers which allows them to share
common code.
"""
def __init__(self):
"""
Initialize the list of filters to be an empty list.
"""
self.filters: List[Union[Filter, _FilterCallable]] = []
def add_filter(self, filter: Filter):
"""
Add the specified filter to this handler.
"""
if not (filter in self.filters):
self.filters.append(filter)
def remove_filter(self, filter: Filter):
"""
Remove the specified filter from this handler.
"""
if filter in self.filters:
self.filters.remove(filter)
def filter(self, record: LogRecord) -> bool:
"""
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto
this and the record is then dropped. Returns a zero value if a record
is to be dropped, else non-zero.
"""
for filter in self.filters:
result = filter(record)
if not result:
return False
return True
class StdoutFilter(Filter):
_levels = (LogLevel.DEBUG, LogLevel.INFO)
def filter(self, record: LogRecord) -> bool:
return record.levelno in self._levels

@ -0,0 +1,238 @@
import enum
import io
import time
import traceback
from string import Template
from typing import Union, List, Optional
from types import TracebackType
from aiologger.records import LogRecord, ExtendedLogRecord, ExceptionInfo
class FormatStyles(str, enum.Enum):
PERCENT = "%"
STRING_TEMPLATE = "$"
STRING_FORMAT = "{"
class PercentStyle:
default_format = "%(message)s"
asctime_format = "%(asctime)s"
asctime_search = "%(asctime)"
def __init__(self, fmt: str = None) -> None:
self._fmt = fmt or self.default_format
self.uses_time = self._fmt.find(self.asctime_search) >= 0
def format(self, record: LogRecord) -> str:
return self._fmt % record.__dict__
class StrFormatStyle(PercentStyle):
default_format = "{message}"
asctime_format = "{asctime}"
asctime_search = "{asctime"
def format(self, record: LogRecord) -> str:
return self._fmt.format(**record.__dict__)
class StringTemplateStyle(PercentStyle):
default_format = "${message}"
asctime_format = "${asctime}"
asctime_search = "${asctime}"
def __init__(self, fmt: str = None) -> None:
self._fmt = fmt or self.default_format
self._template = Template(self._fmt)
self.uses_time = (
self._fmt.find("$asctime") >= 0
or self._fmt.find(self.asctime_format) >= 0
)
def format(self, record: LogRecord) -> str:
return self._template.substitute(**record.__dict__)
BASIC_FORMAT = "%(levelname)s:%(name)s:%(message)s"
_STYLES = {
"%": (PercentStyle, BASIC_FORMAT),
"{": (StrFormatStyle, "{levelname}:{name}:{message}"),
"$": (StringTemplateStyle, "${levelname}:${name}:${message}"),
}
class Formatter:
"""
Formatter instances are used to convert a ExtendedLogRecord to text.
Formatters need to know how a ExtendedLogRecord is constructed. They are
responsible for converting a ExtendedLogRecord to (usually) a string which can
be interpreted by either a human or an external system. The base Formatter
allows a formatting string to be specified. If none is supplied, the
default value of "%s(message)" is used.
The Formatter can be initialized with a format string which makes use of
knowledge of the ExtendedLogRecord attributes - e.g. the default value mentioned
above makes use of the fact that the user's message and arguments are pre-
formatted into a ExtendedLogRecord's message attribute. Currently, the useful
attributes in a ExtendedLogRecord are described by:
%(name)s Name of the logger (logging channel)
%(levelno)s Numeric logging level for the message (DEBUG, INFO,
WARNING, ERROR, CRITICAL)
%(levelname)s Text logging level for the message ("DEBUG", "INFO",
"WARNING", "ERROR", "CRITICAL")
%(pathname)s Full pathname of the source file where the logging
call was issued (if available)
%(filename)s Filename portion of pathname
%(module)s Module (name portion of filename)
%(lineno)d Source line number where the logging call was issued
(if available)
%(funcName)s Function name
%(created)f Time when the ExtendedLogRecord was created (time.time()
return value)
%(asctime)s Textual time when the ExtendedLogRecord was created
%(msecs)d Millisecond portion of the creation time
%(relativeCreated)d Time in milliseconds when the ExtendedLogRecord was created,
relative to the time the logging module was loaded
(typically at application startup time)
%(thread)d Thread ID (if available)
%(threadName)s Thread name (if available)
%(process)d Process ID (if available)
%(message)s The result of record.get_message(), computed just as
the record is emitted
"""
default_time_format = "%Y-%m-%d %H:%M:%S"
default_msec_format = "%s,%03d"
terminator = "\n"
def __init__(
self,
fmt: str = None,
datefmt: str = None,
style: Union[str, FormatStyles] = "%",
) -> None:
"""
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a
default as described above. Allow for specialized date formatting with
the optional datefmt argument. If datefmt is omitted, you get an
ISO8601-like (or RFC 3339-like) format.
Use a style parameter of '%', '{' or '$' to specify that you want to
use one of %-formatting, :meth:`str.format` (``{}``) formatting or
:class:`string.Template` formatting in your format string.
.. versionchanged:: 3.2
Added the ``style`` parameter.
"""
if style not in _STYLES:
valid_styles = ",".join(_STYLES.keys())
raise ValueError(f"Style must be one of: {valid_styles}")
self._style = _STYLES[style][0](fmt)
self._fmt = self._style._fmt
self.datefmt = datefmt
self.converter = time.localtime
def format_time(self, record: LogRecord, datefmt: str = None) -> str:
"""
Return the creation time of the specified ExtendedLogRecord as formatted text.
This method should be called from format() by a formatter which
wants to make use of a formatted time. This method can be overridden
in formatters to provide for any specific requirement, but the
basic behaviour is as follows: if datefmt (a string) is specified,
it is used with time.strftime() to format the creation time of the
record. Otherwise, an ISO8601-like (or RFC 3339-like) format is used.
The resulting string is returned. This function uses a user-configurable
function to convert the creation time to a tuple. By default,
time.localtime() is used; to change this for a particular formatter
instance, set the 'converter' attribute to a function with the same
signature as time.localtime() or time.gmtime(). To change it for all
formatters, for example if you want all logging times to be shown in GMT,
set the 'converter' attribute in the Formatter class.
"""
ct = self.converter(record.created)
if datefmt:
return time.strftime(datefmt, ct)
else:
t = time.strftime(self.default_time_format, ct)
return self.default_msec_format % (t, record.msecs)
def format_exception(self, exception_info: ExceptionInfo) -> str:
"""
Format and return the specified exception information as a string.
This default implementation just uses
traceback.print_exception()
"""
string_io = io.StringIO()
tb = exception_info[2]
traceback.print_exception(
exception_info[0], exception_info[1], tb, None, string_io
)
s = string_io.getvalue()
string_io.close()
if s[-1:] == self.terminator:
s = s[:-1]
return s
def format_message(self, record: LogRecord) -> str:
return self._style.format(record)
def format_stack(self, stack_info):
"""
This method is provided as an extension point for specialized
formatting of stack information.
The input data is a string as returned from a call to
:func:`traceback.print_stack`, but with the last trailing newline
removed.
The base implementation just returns the value passed in.
"""
return stack_info
@staticmethod
def format_traceback(tb: TracebackType) -> List[str]:
formatted_tb = "".join(traceback.format_tb(tb))
return formatted_tb.strip().split("\n")
def format(self, record: LogRecord) -> str:
"""
Format the specified record as text.
The record's attribute dictionary is used as the operand to a
string formatting operation which yields the returned string.
Before formatting the dictionary, a couple of preparatory steps
are carried out. The message attribute of the record is computed
using LogRecord.get_message(). If the formatting string uses the
time (as determined by a call to usesTime(), format_time() is
called to format the event time. If there is exception information,
it is formatted using format_exception() and appended to the message.
"""
record.message = record.get_message()
if self._style.uses_time:
record.asctime = self.format_time(record, self.datefmt)
s = self.format_message(record)
if record.exc_info:
# Cache the traceback text to avoid converting it multiple times
# (it's constant anyway)
if not record.exc_text:
record.exc_text = self.format_exception(record.exc_info)
if record.exc_text:
if s[-1:] != self.terminator:
s = s + self.terminator
s = s + record.exc_text
if record.stack_info:
if s[-1:] != self.terminator:
s = s + self.terminator
s = s + self.format_stack(record.stack_info)
return s

@ -0,0 +1,184 @@
import json
import traceback
from datetime import datetime
from inspect import istraceback
from typing import Callable, Iterable, Union, Dict, Optional, List
from datetime import timezone
from aiologger.formatters.base import Formatter
from aiologger.levels import LEVEL_TO_NAME
from aiologger.records import LogRecord, ExtendedLogRecord
from aiologger.utils import CallableWrapper
LOGGED_AT_FIELDNAME = "logged_at"
LINE_NUMBER_FIELDNAME = "line_number"
FUNCTION_NAME_FIELDNAME = "function"
LOG_LEVEL_FIELDNAME = "level"
MSG_FIELDNAME = "msg"
FILE_PATH_FIELDNAME = "file_path"
class JsonFormatter(Formatter):
def __init__(
self,
serializer: Callable[..., str] = json.dumps,
default_msg_fieldname: str = None,
) -> None:
super(JsonFormatter, self).__init__()
self.serializer = serializer
self.default_msg_fieldname = default_msg_fieldname or MSG_FIELDNAME
def _default_handler(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif istraceback(obj):
tb = "".join(traceback.format_tb(obj))
return tb.strip().split("\n")
elif isinstance(obj, Exception):
return "Exception: %s" % repr(obj)
elif type(obj) is type:
return str(obj)
elif isinstance(obj, CallableWrapper):
return obj()
return str(obj)
def format(self, record: LogRecord) -> str:
"""
Formats a record and serializes it as a JSON str. If record message isnt
already a dict, initializes a new dict and uses `default_msg_fieldname`
as a key as the record msg as the value.
If the serialized result is of type bytes (if orjson is used), then it is converted to utf-8.
"""
msg: Union[str, dict] = record.msg
if not isinstance(msg, dict):
msg = {self.default_msg_fieldname: msg}
if record.exc_info:
msg["exc_info"] = record.exc_info
if record.exc_text:
msg["exc_text"] = record.exc_text
return self._serializer_ensure_str(msg=msg)
@classmethod
def format_error_msg(cls, record: LogRecord, exception: Exception) -> Dict:
traceback_info: Optional[List[str]]
if exception.__traceback__:
traceback_info = cls.format_traceback(exception.__traceback__)
else:
traceback_info = None
return {
"record": {
LINE_NUMBER_FIELDNAME: record.lineno,
LOG_LEVEL_FIELDNAME: record.levelname,
FILE_PATH_FIELDNAME: record.filename,
FUNCTION_NAME_FIELDNAME: record.funcName,
MSG_FIELDNAME: str(record.msg),
},
LOGGED_AT_FIELDNAME: datetime.utcnow().isoformat(),
"logger_exception": {
"type": str(type(exception)),
"exc": str(exception),
"traceback": traceback_info,
},
}
def _serializer_ensure_str(
self,
msg: dict,
record: Optional[Union[LogRecord, ExtendedLogRecord]] = None,
) -> str:
"""
This ensures that the formatter will return a str object when the serializer
may return a bytes object.
"""
if hasattr(record, "serializer_kwargs"):
result: Union[str, bytes] = self.serializer(
msg,
default=self._default_handler,
**record.serializer_kwargs, # type: ignore
)
else:
result: Union[str, bytes] = self.serializer( # type: ignore
msg, default=self._default_handler
)
if isinstance(result, str):
return result
elif isinstance(result, bytes):
return result.decode()
else:
resType = type(result)
raise TypeError(
f"ERROR: serialized object must be of str or bytes type, given {result} with type {resType}"
)
class ExtendedJsonFormatter(JsonFormatter):
level_to_name_mapping = LEVEL_TO_NAME
default_fields = frozenset(
[
LOG_LEVEL_FIELDNAME,
LOGGED_AT_FIELDNAME,
LINE_NUMBER_FIELDNAME,
FUNCTION_NAME_FIELDNAME,
FILE_PATH_FIELDNAME,
]
)
def __init__(
self,
serializer: Callable[..., str] = json.dumps,
default_msg_fieldname: str = None,
exclude_fields: Iterable[str] = None,
tz: timezone = None,
) -> None:
super(ExtendedJsonFormatter, self).__init__(
serializer=serializer, default_msg_fieldname=default_msg_fieldname
)
self.tz = tz
if exclude_fields is None:
self.log_fields = self.default_fields
else:
self.log_fields = self.default_fields - set(exclude_fields)
def formatter_fields_for_record(self, record: LogRecord):
"""
:type record: aiologger.records.ExtendedLogRecord
"""
datetime_serialized = (
datetime.now(timezone.utc).astimezone(self.tz).isoformat()
)
default_fields = (
(LOGGED_AT_FIELDNAME, datetime_serialized),
(LINE_NUMBER_FIELDNAME, record.lineno),
(FUNCTION_NAME_FIELDNAME, record.funcName),
(LOG_LEVEL_FIELDNAME, self.level_to_name_mapping[record.levelno]),
(FILE_PATH_FIELDNAME, record.pathname),
)
for field, value in default_fields:
if field in self.log_fields:
yield field, value
def format(self, record: ExtendedLogRecord) -> str: # type: ignore
"""
:type record: aiologger.records.ExtendedLogRecord
"""
msg = dict(self.formatter_fields_for_record(record))
if record.flatten and isinstance(record.msg, dict):
msg.update(record.msg)
else:
msg[MSG_FIELDNAME] = record.msg
if record.extra:
msg.update(record.extra)
if record.exc_info:
msg["exc_info"] = record.exc_info
if record.exc_text:
msg["exc_text"] = record.exc_text
return self._serializer_ensure_str(msg=msg, record=record)

@ -0,0 +1,120 @@
import abc
import json
import sys
from typing import Union
from aiologger import settings
from aiologger.filters import Filterer
from aiologger.formatters.base import Formatter
from aiologger.formatters.json import JsonFormatter
from aiologger.levels import LogLevel, check_level, get_level_name
from aiologger.records import LogRecord
class Handler(Filterer):
"""
Handler instances dispatch logging events to specific destinations.
The base handler class. Acts as a placeholder which defines the Handler
interface. Handlers can optionally use Formatter instances to format
records as desired. By default, no formatter is specified; in this case,
the 'raw' message as determined by record.message is logged.
"""
def __init__(
self, level: LogLevel = LogLevel.NOTSET, formatter: Formatter = None
) -> None:
"""
Initializes the instance - basically setting the formatter to None
and the filter list to empty.
"""
Filterer.__init__(self)
self._level = check_level(level)
self.formatter = Formatter() if formatter is None else formatter
@property
@abc.abstractmethod
def initialized(self):
raise NotImplementedError()
@property
def level(self):
return self._level
@level.setter
def level(self, value: Union[str, int, LogLevel]):
"""
Set the logging level of this handler.
"""
self._level = check_level(value)
@abc.abstractmethod
async def emit(self, record: LogRecord) -> None:
"""
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so
raises a NotImplementedError.
"""
raise NotImplementedError(
"emit must be implemented by Handler subclasses"
)
async def handle(self, record: LogRecord) -> bool:
"""
Conditionally emit the specified logging record.
Emission depends on filters which may have been added to the handler.
Returns whether the filter passed the record for emission.
"""
rv = self.filter(record)
if rv:
await self.emit(record)
return rv
async def flush(self) -> None:
"""
Ensure all logging output has been flushed.
This version does nothing and is intended to be implemented by
subclasses.
"""
pass
@abc.abstractmethod
async def close(self) -> None:
"""
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers,
_handlers, which is used for handler lookup by name. Subclasses
should ensure that this gets called from overridden close()
methods.
"""
raise NotImplementedError(
"close must be implemented by Handler subclasses"
)
async def handle_error(
self, record: LogRecord, exception: Exception
) -> None:
"""
Handle errors which occur during an emit() call.
This method should be called from handlers when an exception is
encountered during an emit() call. This is what is mostly wanted
for a logging system - most users will not care about errors in
the logging system, they are more interested in application errors.
You could, however, replace this with a custom handler if you wish.
The record which was being processed is passed in to this method.
"""
if not settings.HANDLE_ERROR_FALLBACK_ENABLED:
return
msg = JsonFormatter.format_error_msg(record, exception)
json.dump(msg, sys.stderr)
sys.stderr.write("\n")
def __repr__(self):
level = get_level_name(self.level)
return f"<${self.__class__.__name__} (${level})>"

@ -0,0 +1,485 @@
# The following code and documentation was inspired, and in some cases
# copied and modified, from the work of Vinay Sajip and contributors
# on cpython's logging package
import abc
import asyncio
import datetime
import enum
import os
import re
import time
from typing import Callable, List, Optional
import aiofiles
from aiofiles.threadpool import AsyncTextIOWrapper
from aiologger.formatters.base import Formatter
from aiologger.handlers.base import Handler
from aiologger.records import LogRecord
from aiologger.utils import classproperty, get_running_loop
class AsyncFileHandler(Handler):
terminator = "\n"
def __init__(
self,
filename: str,
mode: str = "a",
encoding: str = None,
formatter: Formatter = None,
) -> None:
super().__init__(formatter=formatter)
filename = os.fspath(filename)
self.absolute_file_path = os.path.abspath(filename)
self.mode = mode
self.encoding = encoding
self.stream: AsyncTextIOWrapper = None
self._initialization_lock = None
@property
def initialized(self):
return self.stream is not None
async def _init_writer(self):
"""
Open the current base file with the (original) mode and encoding.
"""
if not self._initialization_lock:
self._initialization_lock = asyncio.Lock()
async with self._initialization_lock:
if not self.initialized:
self.stream = await aiofiles.open(
file=self.absolute_file_path,
mode=self.mode,
encoding=self.encoding,
)
async def flush(self):
await self.stream.flush()
async def close(self):
if not self.initialized:
return
await self.stream.flush()
await self.stream.close()
self.stream = None
self._initialization_lock = None
async def emit(self, record: LogRecord):
if not self.initialized:
await self._init_writer()
try:
msg = self.formatter.format(record)
# Write order is not guaranteed. String concatenation required
await self.stream.write(msg + self.terminator)
await self.stream.flush()
except Exception as exc:
await self.handle_error(record, exc)
Namer = Callable[[str], str]
Rotator = Callable[[str, str], None]
class BaseAsyncRotatingFileHandler(AsyncFileHandler, metaclass=abc.ABCMeta):
def __init__(
self,
filename: str,
mode: str = "a",
encoding: str = None,
namer: Namer = None,
rotator: Rotator = None,
formatter: Formatter = None,
) -> None:
super().__init__(filename, mode, encoding, formatter)
self.mode = mode
self.encoding = encoding
self.namer = namer
self.rotator = rotator
self._rollover_lock: Optional[asyncio.Lock] = None
def should_rollover(self, record: LogRecord) -> bool:
raise NotImplementedError
async def do_rollover(self):
raise NotImplementedError
async def emit(self, record: LogRecord): # type: ignore
"""
Emit a record.
Output the record to the file, catering for rollover as described
in `do_rollover`.
"""
try:
if self.should_rollover(record):
if not self._rollover_lock:
self._rollover_lock = asyncio.Lock()
async with self._rollover_lock:
if self.should_rollover(record):
await self.do_rollover()
await super().emit(record)
except Exception as exc:
await self.handle_error(record, exc)
def rotation_filename(self, default_name: str) -> str:
"""
Modify the filename of a log file when rotating.
This is provided so that a custom filename can be provided.
:param default_name: The default name for the log file.
"""
if self.namer is None:
return default_name
return self.namer(default_name)
async def rotate(self, source: str, dest: str):
"""
When rotating, rotate the current log.
The default implementation calls the 'rotator' attribute of the
handler, if it's callable, passing the source and dest arguments to
it. If the attribute isn't callable (the default is None), the source
is simply renamed to the destination.
:param source: The source filename. This is normally the base
filename, e.g. 'test.log'
:param dest: The destination filename. This is normally
what the source is rotated to, e.g. 'test.log.1'.
"""
if self.rotator is None:
# logging issue 18940: A file may not have been created if delay is True.
loop = get_running_loop()
if await loop.run_in_executor(None, lambda: os.path.exists(source)):
await loop.run_in_executor( # type: ignore
None, lambda: os.rename(source, dest)
)
else:
self.rotator(source, dest)
class RolloverInterval(str, enum.Enum):
SECONDS = "S"
MINUTES = "M"
HOURS = "H"
DAYS = "D"
MONDAYS = "W0"
TUESDAYS = "W1"
WEDNESDAYS = "W2"
THURSDAYS = "W3"
FRIDAYS = "W4"
SATURDAYS = "W5"
SUNDAYS = "W6"
MIDNIGHT = "MIDNIGHT"
@classproperty
def WEEK_DAYS(cls):
return (
cls.MONDAYS,
cls.TUESDAYS,
cls.WEDNESDAYS,
cls.THURSDAYS,
cls.FRIDAYS,
cls.SATURDAYS,
cls.SUNDAYS,
)
ONE_MINUTE_IN_SECONDS = 60
ONE_HOUR_IN_SECONDS = 60 * 60
ONE_DAY_IN_SECONDS = ONE_HOUR_IN_SECONDS * 24
ONE_WEEK_IN_SECONDS = 7 * ONE_DAY_IN_SECONDS
class AsyncTimedRotatingFileHandler(BaseAsyncRotatingFileHandler):
"""
Handler for logging to a file, rotating the log file at certain timed
intervals.
If `backup_count` is > 0, when rollover is done, no more than `backup_count`
files are kept - the oldest ones are deleted.
| when | at_time behavior |
|------------|--------------------------------------------------------|
| SECONDS | at_time will be ignored |
| MINUTES | -- // -- |
| HOURS | -- // -- |
| DAYS | at_time will be IGNORED. See also MIDNIGHT |
| MONDAYS | rotation happens every WEEK on MONDAY at ${at_time} |
| TUESDAYS | rotation happens every WEEK on TUESDAY at ${at_time} |
| WEDNESDAYS | rotation happens every WEEK on WEDNESDAY at ${at_time} |
| THURSDAYS | rotation happens every WEEK on THURSDAY at ${at_time} |
| FRIDAYS | rotation happens every WEEK on FRIDAY at ${at_time} |
| SATURDAYS | rotation happens every WEEK on SATURDAY at ${at_time} |
| SUNDAYS | rotation happens every WEEK on SUNDAY at ${at_time} |
| MIDNIGHT | rotation happens every DAY at ${at_time} |
"""
def __init__(
self,
filename: str,
when: RolloverInterval = RolloverInterval.HOURS,
interval: int = 1,
backup_count: int = 0,
encoding: str = None,
utc: bool = False,
at_time: datetime.time = None,
formatter: Formatter = None,
) -> None:
super().__init__(
filename=filename, mode="a", encoding=encoding, formatter=formatter
)
self.when = when.upper()
self.backup_count = backup_count
self.utc = utc
self.at_time = at_time
# Calculate the real rollover interval, which is just the number of
# seconds between rollovers. Also set the filename suffix used when
# a rollover occurs. Current 'when' events supported:
# S - Seconds
# M - Minutes
# H - Hours
# D - Days
# midnight - roll over at midnight
# W{0-6} - roll over on a certain day; 0 - Monday
#
# Case of the 'when' specifier is not important; lower or upper case
# will work.
if self.when == RolloverInterval.SECONDS:
self.interval = 1 # one second
self.suffix = "%Y-%m-%d_%H-%M-%S"
ext_match = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
elif self.when == RolloverInterval.MINUTES:
self.interval = ONE_MINUTE_IN_SECONDS # one minute
self.suffix = "%Y-%m-%d_%H-%M"
ext_match = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}(\.\w+)?$"
elif self.when == RolloverInterval.HOURS:
self.interval = ONE_HOUR_IN_SECONDS # one hour
self.suffix = "%Y-%m-%d_%H"
ext_match = r"^\d{4}-\d{2}-\d{2}_\d{2}(\.\w+)?$"
elif (
self.when == RolloverInterval.DAYS
or self.when == RolloverInterval.MIDNIGHT
):
self.interval = ONE_DAY_IN_SECONDS # one day
self.suffix = "%Y-%m-%d"
ext_match = r"^\d{4}-\d{2}-\d{2}(\.\w+)?$"
elif self.when.startswith("W"):
if self.when not in RolloverInterval.WEEK_DAYS:
raise ValueError(
f"Invalid day specified for weekly rollover: {self.when}"
)
self.interval = ONE_DAY_IN_SECONDS * 7 # one week
self.day_of_week = int(self.when[1])
self.suffix = "%Y-%m-%d"
ext_match = r"^\d{4}-\d{2}-\d{2}(\.\w+)?$"
else:
raise ValueError(f"Invalid RolloverInterval specified: {self.when}")
self.ext_match = re.compile(ext_match, re.ASCII)
self.interval = self.interval * interval # multiply by units requested
# The following line added because the filename passed in could be a
# path object (see Issue #27493), but self.baseFilename will be a string
filename = self.absolute_file_path
if os.path.exists(filename): # todo: IO. Remove or postpone
t = int(os.stat(filename).st_mtime)
else:
t = int(time.time())
self.rollover_at = self.compute_rollover(t)
def compute_rollover(self, current_time: int) -> int:
"""
Work out the rollover time based on the specified time.
If we are rolling over at midnight or weekly, then the interval is
already known. need to figure out is WHEN the next interval is.
In other words, if you are rolling over at midnight, then your base
interval is 1 day, but you want to start that one day clock at midnight,
not now. So, we have to fudge the `rollover_at` value in order to trigger
the first rollover at the right time. After that, the regular interval
will take care of the rest. Note that this code doesn't care about
leap seconds. :)
"""
result = current_time + self.interval
if (
self.when == RolloverInterval.MIDNIGHT
or self.when in RolloverInterval.WEEK_DAYS
):
if self.utc:
t = time.gmtime(current_time)
else:
t = time.localtime(current_time)
current_hour = t[3]
current_minute = t[4]
current_second = t[5]
current_day = t[6]
# r is the number of seconds left between now and the next rotation
if self.at_time is None:
rotate_ts = ONE_DAY_IN_SECONDS
else:
rotate_ts = (
self.at_time.hour * 60 + self.at_time.minute
) * 60 + self.at_time.second
r = rotate_ts - (
(current_hour * 60 + current_minute) * 60 + current_second
)
if r < 0:
# Rotate time is before the current time (for example when
# self.rotateAt is 13:45 and it now 14:15), rotation is
# tomorrow.
r += ONE_DAY_IN_SECONDS
current_day = (current_day + 1) % 7
result = current_time + r
# If we are rolling over on a certain day, add in the number of days until
# the next rollover, but offset by 1 since we just calculated the time
# until the next day starts. There are three cases:
# Case 1) The day to rollover is today; in this case, do nothing
# Case 2) The day to rollover is further in the interval (i.e., today is
# day 2 (Wednesday) and rollover is on day 6 (Sunday). Days to
# next rollover is simply 6 - 2 - 1, or 3.
# Case 3) The day to rollover is behind us in the interval (i.e., today
# is day 5 (Saturday) and rollover is on day 3 (Thursday).
# Days to rollover is 6 - 5 + 3, or 4. In this case, it's the
# number of days left in the current week (1) plus the number
# of days in the next week until the rollover day (3).
# The calculations described in 2) and 3) above need to have a day added.
# This is because the above time calculation takes us to midnight on this
# day, i.e. the start of the next day.
if self.when in RolloverInterval.WEEK_DAYS:
day = current_day # 0 is Monday
if day != self.day_of_week:
if day < self.day_of_week:
days_to_wait = self.day_of_week - day
else:
days_to_wait = 6 - day + self.day_of_week + 1
new_rollover_at = result + (
days_to_wait * ONE_DAY_IN_SECONDS
)
if not self.utc:
dst_now = t[-1]
dst_at_rollover = time.localtime(new_rollover_at)[-1]
if dst_now != dst_at_rollover:
if not dst_now:
# DST kicks in before next rollover, so we need to deduct an hour
new_rollover_at -= ONE_HOUR_IN_SECONDS
else:
# DST bows out before next rollover, so we need to add an hour
new_rollover_at += ONE_HOUR_IN_SECONDS
result = new_rollover_at
return result
def should_rollover(self, record: LogRecord) -> bool:
"""
Determine if rollover should occur.
record is not used, as we are just comparing times, but it is needed so
the method signatures are the same
"""
t = int(time.time())
if t >= self.rollover_at:
return True
return False
async def get_files_to_delete(self) -> List[str]:
"""
Determine the files to delete when rolling over.
"""
dir_name, base_name = os.path.split(self.absolute_file_path)
loop = get_running_loop()
file_names = await loop.run_in_executor(
None, lambda: os.listdir(dir_name)
)
result = []
prefix = base_name + "."
plen = len(prefix)
for file_name in file_names:
if file_name[:plen] == prefix:
suffix = file_name[plen:]
if self.ext_match.match(suffix):
result.append(os.path.join(dir_name, file_name))
if len(result) < self.backup_count:
return []
else:
result.sort(reverse=True) # os.listdir order is not defined
return result[: len(result) - self.backup_count]
async def _delete_files(self, file_paths: List[str]):
loop = get_running_loop()
for file_path in file_paths:
await loop.run_in_executor( # type: ignore
None, lambda: os.unlink(file_path)
)
async def do_rollover(self):
"""
do a rollover; in this case, a date/time stamp is appended to the filename
when the rollover happens. However, you want the file to be named for the
start of the interval, not the current time. If there is a backup count,
then we have to get a list of matching filenames, sort them and remove
the one with the oldest suffix.
"""
if self.stream:
await self.stream.close()
self.stream = None
# get the time that this sequence started at and make it a TimeTuple
current_time = int(time.time())
dst_now = time.localtime(current_time)[-1]
t = self.rollover_at - self.interval
if self.utc:
time_tuple = time.gmtime(t)
else:
time_tuple = time.localtime(t)
dst_then = time_tuple[-1]
if dst_now != dst_then:
if dst_now:
addend = ONE_HOUR_IN_SECONDS
else:
addend = -ONE_HOUR_IN_SECONDS
time_tuple = time.localtime(t + addend)
destination_file_path = self.rotation_filename(
self.absolute_file_path
+ "."
+ time.strftime(self.suffix, time_tuple)
)
loop = get_running_loop()
if await loop.run_in_executor(
None, lambda: os.path.exists(destination_file_path)
):
await loop.run_in_executor(
None, lambda: os.unlink(destination_file_path)
)
await self.rotate(self.absolute_file_path, destination_file_path)
if self.backup_count > 0:
files_to_delete = await self.get_files_to_delete()
if files_to_delete:
await self._delete_files(files_to_delete)
await self._init_writer()
new_rollover_at = self.compute_rollover(current_time)
while new_rollover_at <= current_time:
new_rollover_at = new_rollover_at + self.interval
# If DST changes and midnight or weekly rollover, adjust for this.
if (
self.when == RolloverInterval.MIDNIGHT
or self.when in RolloverInterval.WEEK_DAYS
) and not self.utc:
dst_at_rollover = time.localtime(new_rollover_at)[-1]
if dst_now != dst_at_rollover:
if not dst_now:
# DST kicks in before next rollover, so we need to deduct an hour
addend = -ONE_HOUR_IN_SECONDS
else:
# DST bows out before next rollover, so we need to add an hour
addend = ONE_HOUR_IN_SECONDS
new_rollover_at += addend
self.rollover_at = new_rollover_at

@ -0,0 +1,97 @@
import asyncio
import sys
from asyncio import AbstractEventLoop, StreamWriter
from typing import Union, Optional
from aiologger.utils import get_running_loop
from aiologger.filters import Filter
from aiologger.formatters.base import Formatter
from aiologger.handlers.base import Handler
from aiologger.levels import LogLevel
from aiologger.protocols import AiologgerProtocol
from aiologger.records import LogRecord
class AsyncStreamHandler(Handler):
terminator = "\n"
def __init__(
self,
stream=None,
level: Union[str, int, LogLevel] = LogLevel.NOTSET,
formatter: Formatter = None,
filter: Filter = None,
) -> None:
super().__init__()
if stream is None:
stream = sys.stderr
self.stream = stream
self.level = level
if formatter is None:
formatter = Formatter()
self.formatter: Formatter = formatter
if filter:
self.add_filter(filter)
self.protocol_class = AiologgerProtocol
self._initialization_lock = asyncio.Lock()
self.writer: Optional[StreamWriter] = None
@property
def initialized(self):
return self.writer is not None
async def _init_writer(self) -> StreamWriter:
async with self._initialization_lock:
if self.writer is not None:
return self.writer
loop = get_running_loop()
transport, protocol = await loop.connect_write_pipe(
self.protocol_class, self.stream
)
self.writer = StreamWriter( # type: ignore # https://github.com/python/typeshed/pull/2719
transport=transport, protocol=protocol, reader=None, loop=loop
)
return self.writer
async def handle(self, record: LogRecord) -> bool:
"""
Conditionally emit the specified logging record.
Emission depends on filters which may have been added to the handler.
"""
rv = self.filter(record)
if rv:
await self.emit(record)
return rv
async def flush(self):
await self.writer.drain()
async def emit(self, record: LogRecord):
"""
Actually log the specified logging record to the stream.
"""
if self.writer is None:
self.writer = await self._init_writer()
try:
msg = self.formatter.format(record) + self.terminator
self.writer.write(msg.encode())
await self.writer.drain()
except Exception as exc:
await self.handle_error(record, exc)
async def close(self):
"""
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers,
should ensure that this gets called from overridden close()
methods.
"""
if self.writer is None:
return
await self.flush()
self.writer.close()

@ -0,0 +1,47 @@
import enum
from typing import Union
class LogLevel(enum.IntEnum):
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
NAME_TO_LEVEL = {level: LogLevel[level].value for level in LogLevel.__members__}
LEVEL_TO_NAME = {level.value: level.name for level in LogLevel}
def get_level_name(level: Union[int, LogLevel]) -> str:
"""
Return the textual representation of logging level 'level'.
If the level is one of the predefined levels (CRITICAL, ERROR, WARNING,
INFO, DEBUG) then you get the corresponding string.
If a numeric value corresponding to one of the defined levels is passed
in, the corresponding string representation is returned.
"""
try:
return LEVEL_TO_NAME[level]
except KeyError as e:
raise ValueError(f"Unkown level name: {level}") from e
def check_level(level: Union[str, int, LogLevel]) -> int:
if isinstance(level, int):
if level not in LEVEL_TO_NAME:
raise ValueError(f"Unknown level: {level}")
return level
elif isinstance(level, str):
try:
return NAME_TO_LEVEL[level]
except KeyError:
raise ValueError(f"Unknown level: {level}")
else:
raise TypeError(f"Level not an Union[str, int, LogLevel]: {level}")

@ -0,0 +1,338 @@
import asyncio
import io
import sys
import traceback
from asyncio import AbstractEventLoop, Task
from typing import Iterable, Optional, Callable, Awaitable, List, NamedTuple
from aiologger.filters import StdoutFilter, Filterer
from aiologger.formatters.base import Formatter
from aiologger.handlers.base import Handler
from aiologger.handlers.streams import AsyncStreamHandler
from aiologger.levels import LogLevel, check_level
from aiologger.records import LogRecord
from aiologger.utils import get_current_frame, create_task
_HandlerFactory = Callable[[], Awaitable[Iterable[Handler]]]
class _Caller(NamedTuple):
filename: str
line_number: int
function_name: str
stack: Optional[str]
def o_o():
"""
Ordinarily we would use __file__ for this, but frozen modules don't always
have __file__ set, for some reason (