MarkTechPost AI模型更新

Loguru實作指南:打造穩健、結構化、並行且可上線的Python日誌管道

2026年5月31日 08:58

重點摘要

在本教學中,我們使用Loguru——一個強大、靈活且生產就緒的Python日誌庫,實作一個實用案例。我們從設定一個乾淨、冪等的日誌配置開始,確保能安全重複執行而不會產生重複處理器或混亂輸出。接著逐步深入結構化日誌、上下文日誌、自訂日誌等級、全域修補、可呼叫格式化器以及記憶體接收器。同時也處理真實世界的日誌需求,例如豐富的異常追蹤、JSON日誌檔案、自訂輪替、壓縮、保留、非同步日誌、執行緒執行、多程序安全的日誌,以及標準日誌模組攔截。透過將一切整合在Colab可用的工作流程中,讓測試、檢查和理解Loguru如何支援除錯、監控變得簡單。

站內 AI 整理稿

In this tutorial, we implement a practical use case with Loguru, a powerful, flexible, and production-ready logging library for Python. We start by building a clean, idempotent logging setup that can be safely rerun without duplicating handlers or producing messy output. From there, we move step by step through structured logging, contextual logging, custom log levels, global patching, callable formatters, and in-memory sinks. We also handle real-world logging needs such as rich exception traces, JSON log files, custom rotation, compression, retention, async logging, threaded execution, multiprocessing-safe logging, and standard logging module interception. By keeping everything in a Colab-ready workflow, we make it easy to test, inspect, and understand how Loguru can support debugging, monitoring, and observability in serious Python applications. Copy CodeCopiedUse a different Browser!pip install -q loguru nest_asyncio import os, sys, time, json, glob, gzip, shutil, asyncio, logging, itertools, multiprocessing from collections import deque from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from loguru import logger try: import nest_asyncio; nest_asyncio.apply() except Exception as e: print("nest_asyncio not applied:", e) WORKDIR = "/content/loguru_demo" if os.path.isdir("/content") else "/tmp/loguru_demo" os.makedirs(WORKDIR, exist_ok=True); os.chdir(WORKDIR) for f in glob.glob("*"): try: os.remove(f) except OSError: pass print(f"Working directory: {WORKDIR}\n") RESULTS = [] def check(name, condition, detail=""): ok = bool(condition); RESULTS.append((name, ok)) print(f" [{'PASS' if ok else 'FAIL'}] {name}" + (f" — {detail}" if detail else "")) def banner(t): print(f"\n{'='*64}\n {t}\n{'='*64}") _seq = itertools.count(1) def global_patcher(record): record["extra"].setdefault("env", "colab") record["extra"]["seq"] = next(_seq) _NOISE = {"env", "seq", "app"} def console_formatter(record): fmt = ("<green>{time:HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | " "<cyan>{name}:{function}:{line}</cyan> - <level>{message}</level>") if any(k not in _NOISE for k in record["extra"]): fmt += " | <yellow>{extra}</yellow>" return fmt + "\n{exception}" We install Loguru and supporting dependencies, import all required libraries, and prepare a clean working directory for the tutorial. We also create a small verification helper to test each feature as the tutorial runs. We then define a global patcher and console formatter so that every log record carries useful metadata and appears in a readable format. Copy CodeCopiedUse a different Browserclass MemorySink: def __init__(self, capacity=2000): self.buffer = deque(maxlen=capacity) def write(self, message): self.buffer.append(message.record) def flush(self): pass def has_level(self, name): return any(r["level"].name == name for r in self.buffer) def find(self, pred): return [r for r in self.buffer if pred(r)] MAX_BYTES = 1500 def size_rotation(message, file): return file.tell() + len(message) > MAX_BYTES def gzip_compression(filepath): with open(filepath, "rb") as fi, gzip.open(filepath + ".gz", "wb") as fo: shutil.copyfileobj(fi, fo) os.remove(filepath) def keep_latest_retention(files): for old in sorted(files, key=os.path.getmtime, reverse=True)[3:]: try: os.remove(old) except OSError: pass class InterceptHandler(logging.Handler): def emit(self, record): try: level = logger.level(record.levelname).name except ValueError: level = record.levelno frame, depth = logging.currentframe(), 2 while frame and frame.f_code.co_filename == logging.__file__: frame, depth = frame.f_back, depth + 1 (logger.opt(depth=depth, exception=record.exc_info) .bind(stdlib_logger=record.name) .log(level, record.getMessage())) def mp_worker(n): logger.bind(child=os.getpid()).info("hello from child item {}", n) return os.getpid() We create reusable logging components that make the tutorial more practical and production-like. We define an in-memory sink, custom file rotation, compression, and retention functions to control how logs are stored. We also built a standard logging interceptor and a multiprocessing worker to connect Loguru to external libraries and child processes. Copy CodeCopiedUse a different Browserbanner("1) logger.configure(): handlers + custom level + extra + patcher") mem = MemorySink() logger.configure( handlers=[ {"sink": sys.stderr, "format": console_formatter, "level": "DEBUG", "colorize": True, "backtrace": True, "diagnose": True}, {"sink": mem, "level": "DEBUG", "format": "{message}"}, {"sink": "structured.jsonl", "serialize": True, "level": "DEBUG", "enqueue": True}, {"sink": "errors.log", "level": "ERROR", "enqueue": True, "backtrace": True, "diagnose": False, "format": "{time:YYYY-MM-DD HH:mm:ss} | {level} | " "{name}:{function}:{line} | {message}"}, ], levels=[{"name": "NOTICE", "no": 22, "color": "<blue><bold>", "icon": ""}], extra={"app": "loguru-advanced"}, patcher=global_patcher, ) logger.debug("debug"); logger.info("info"); logger.success("SUCCESS level ships built-in") logger.warning("warning"); logger.log("NOTICE", "custom level between INFO and SUCCESS") banner("2) bind() / contextualize() / patch()") logger.bind(user_id=42, request_id="abc-123").info("bound context") with logger.contextualize(task="batch-job", run=7): logger.info("inside contextualized block") logger.patch(lambda r: r["extra"].update(epoch=round(time.time()))).info("per-call patched record") banner("3) @logger.catch + context-manager form") def inner(d): return d["a"] / d["b"] def outer(d): return inner(d) @logger.catch(reraise=False) def compute(d): return outer(d) compute({"a": 1, "b": 0}) with logger.catch(message="handled inside a with-block"): raise ValueError("boom in block") banner("4) opt(lazy=True), inline colors, record access") logger.opt(lazy=True).debug("lazy sum = {}", lambda: sum(i*i for i in range(1_000_000))) logger.opt(colors=True).info("inline <red>colors</red> <green>work</green>") logger.opt(record=True).info("emitted from source line {record[line]}") We configure Loguru with multiple handlers, including console output, memory capture, JSON logging, and error logging. We then demonstrate structured logging with bound context, contextual blocks, patched records, and a custom log level. We also explore exception handling and useful opt() features such as lazy evaluation, inline colors, and record access. Copy CodeCopiedUse a different Browserbanner("5) custom rotation/compression/retention (forces real rotation)") ev_id = logger.add("events_{time:HHmmss_SSS}.log", rotation=size_rotation, compression=gzip_compression, retention=keep_latest_retention, enqueue=True, level="DEBUG", format="{time:HH:mm:ss.SSS} | {level: <8} | {message}") for i in range(80): logger.bind(idx=i).debug("rotating event line number {}", i) logger.complete(); logger.remove(ev_id) print(f" archives created: {sorted(glob.glob('events_*.gz'))}") banner("6a) ThreadPoolExecutor with per-thread contextualize()") thread_caps = [] tid = logger.add(thread_caps.append, level="DEBUG", format="{message}", filter=lambda r: "worker_id" in r["extra"]) def worker(n): with logger.contextualize(worker_id=n): logger.info("thread work item {}", n) return n * n with ThreadPoolExecutor(max_workers=8) as ex: sq = list(ex.map(worker, range(8))) logger.complete(); logger.remove(tid) worker_ids = {m.record["extra"]["worker_id"] for m in thread_caps} banner("6b) async coroutine sink + await logger.complete()") async def run_async_demo(): sunk = [] async def async_sink(message): await asyncio.sleep(0); sunk.append(message.record["message"]) sid = logger.add(async_sink, level="DEBUG", catch=True) async def task(n): with logger.contextualize(coro=n): logger.info("async task {} start", n) await asyncio.sleep(0.01) logger.success("async task {} done", n) await asyncio.gather(*(task(i) for i in range(5))) await logger.complete() logger.remove(sid) return sunk try: async_msgs = asyncio.run(run_async_demo()) except RuntimeError: async_msgs = asyncio.get_event_l

Related

相關文章

MarkTechPost AI模型更新

Liquid AI Introduces LFM2.5-Embedding-350M and LFM2.5-ColBERT-350M: Dense Bi-Encoder and Late-Interaction Models for Fast Multilingual Search Across 11 Languages

This week, Liquid AI released two new retrieval models. They are LFM2.5-ColBERT-350M and LFM2.5-Embedding-350M. Both hold 350M parameters. Both are the first bidirectional members of the LFM family. They build on LFM2.5-350M-Base, released in March. The pair targets fast multilingual and cross-lingual search across 11 languages. Their footprint is small enough to run almost anywhere. Both are available now on Hugging Face under the LFM Open License v1.0. LFM2.5 Retrievers The two models share one backbone but represent text differently. LFM2.5-Embedding-350M is a dense bi-encoder. It turns each document into a single vector. Pick it when you want the fastest search and the smallest, cheapest index. LFM2.5-ColBERT-350M is a late-interaction model. It converts each token into a vector rather

15 分鐘前
MarkTechPost AI模型更新

Perplexity Launches Brain, a Self-Improving Memory System That Builds a Context Graph of an Agent’s Work and Learns Overnight

Most AI memory remembers the user. It stores your preferences, your tastes, and your role. Perplexity is taking a different path. Today, Perplexity launched Brain, a self-improving memory system for its agent product, Computer. Brain does not focus on remembering you. It remembers what the agent did. That reframes what memory in AI is for. What is Perplexity‘s Brain Brain is a self-improving memory system. It builds a context graph of the work Computer performs. At set intervals, such as overnight, Brain reviews that graph. It then teaches itself how to do the work better. The idea is straightforward. The more work you do, the more efficient Brain makes your Computer. Brain is rolling out today to Perplexity Max and Enterprise Max subscribers in Research Preview. Two Axes of AI Memory Perp

14 小時前

智譜新高,MiniMax承壓,“大模型雙雄”命運殊途

這篇消息聚焦「智譜新高,MiniMax承壓,“大模型雙雄”命運殊途」。原始導語提到:大模型在被市場重新定價 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。

16 小時前