使用 Workers、函式與 Cron 觸發器搭配 iii 打造文件智慧後端
重點摘要
本篇教學將建立一套文件智慧工作流程,使用 iii 引擎與 Python SDK,先以背景程序啟動引擎,再連接 Python Worker。完成設定後,分別註冊文字正規化、斷詞、情感分析、關鍵字提取、報表產出與心跳監控等函式,並將它們組合成單一分析管線。接著透過直接呼叫、HTTP 端點、發後不理(fire-and-forget)以及排程 Cron 觸發器來執行相同邏輯。過程中同時追蹤基本的執行狀態,使此工作流程更貼近真實後端系統,而非靜態筆記本展示。完整程式碼請見內文。
In this tutorial, we build a document-intelligence workflow with iii. We begin by installing the iii engine and Python SDK, then start the engine as a background process and connect a Python worker to it. After the setup, we register separate functions for text normalization, tokenization, sentiment analysis, keyword extraction, reporting, and heartbeat tracking. We then combine these functions into a single analysis pipeline and run the same logic via direct invocation, an HTTP endpoint, fire-and-forget execution, and a scheduled cron trigger. Along the way, we also track basic runtime state, making the workflow feel closer to a real backend system than a static notebook demo. Check out the FULL CODES here. Copy CodeCopiedUse a different Browserimport os, sys, subprocess, time, socket, json, threading from collections import Counter HOME = os.path.expanduser("~") BIN_DIR = f"{HOME}/.local/bin" os.environ["PATH"] = BIN_DIR + os.pathsep + os.environ.get("PATH", "") def sh(cmd): print(f"$ {cmd}") subprocess.run(cmd, shell=True, check=True) if not os.path.exists(f"{BIN_DIR}/iii"): sh(f"curl -fsSL https://install.iii.dev/iii/main/install.sh | BIN_DIR={BIN_DIR} sh") sh(f"{sys.executable} -m pip install -q iii-sdk requests") III = f"{BIN_DIR}/iii" sh(f"{III} --version") We start by importing the required Python modules and setting up the local binary path for the III engine. We define a small helper function to run shell commands and install the III engine if it is not already available. We also install the Python SDK and requests package, then verify the iii installation by checking its version. Copy CodeCopiedUse a different BrowserWS_URL, HTTP_URL = "ws://localhost:49134", "http://localhost:3111" engine_log = open("/tmp/iii-engine.log", "w") engine = subprocess.Popen([III, "--use-default-config"], stdout=engine_log, stderr=subprocess.STDOUT) def wait_port(host, port, timeout=90): end = time.time() + timeout while time.time() < end: with socket.socket() as s: s.settimeout(1) try: s.connect((host, port)); return True except OSError: time.sleep(0.5) return False assert wait_port("localhost", 49134), "engine never came up — see /tmp/iii-engine.log" print(f"✓ engine up — WS {WS_URL} | HTTP {HTTP_URL}") from iii import register_worker try: from iii import TriggerAction except Exception: TriggerAction = None worker = register_worker(WS_URL) _STATE = {"docs_analyzed": 0, "heartbeats": 0, "keyword_totals": Counter()} _LOCK = threading.Lock() POSITIVE = {"good","great","love","excellent","happy","fast","reliable","amazing","best","win"} NEGATIVE = {"bad","terrible","hate","slow","broken","sad","worst","bug","crash","fail"} We launch the iii engine as a background process and wait for its WebSocket port to become available. We then connect a Python worker to the running engine and prepare optional support for fire-and-forget triggers. We also define a shared in-memory state, a thread lock, and simple positive and negative word sets for sentiment analysis. Copy CodeCopiedUse a different Browserdef normalize(data): return {"text": (data.get("text") or "").strip().lower()} def tokenize(data): text = data.get("text", "") cleaned = "".join(c if (c.isalnum() or c.isspace()) else " " for c in text) tokens = [t for t in cleaned.split() if t] return {"tokens": tokens, "count": len(tokens)} def sentiment(data): toks = data.get("tokens", []) pos = sum(t in POSITIVE for t in toks) neg = sum(t in NEGATIVE for t in toks) score = pos - neg label = "positive" if score > 0 else "negative" if score < 0 else "neutral" return {"label": label, "score": score, "pos": pos, "neg": neg} def keywords(data): toks = data.get("tokens", []) stop = {"the","a","an","is","it","to","of","and","in","for","on","how"} freq = Counter(t for t in toks if t not in stop and len(t) > 2) return {"keywords": freq.most_common(data.get("top_n", 5))} def analyze(data): norm = worker.trigger({"function_id": "text::normalize", "payload": {"text": data.get("text","")}}) toks = worker.trigger({"function_id": "text::tokenize", "payload": norm}) sent = worker.trigger({"function_id": "text::sentiment", "payload": toks}) keys = worker.trigger({"function_id": "text::keywords", "payload": {**toks, "top_n": data.get("top_n", 5)}}) with _LOCK: _STATE["docs_analyzed"] += 1 for k, c in keys["keywords"]: _STATE["keyword_totals"][k] += c n = _STATE["docs_analyzed"] return {"tokens": toks["count"], "sentiment": sent, "keywords": keys["keywords"], "docs_analyzed": n} def report(data): with _LOCK: return {"docs_analyzed": _STATE["docs_analyzed"], "heartbeats": _STATE["heartbeats"], "top_keywords_all_docs": _STATE["keyword_totals"].most_common(5)} def http_analyze(data): body = data.get("body") or {} result = worker.trigger({"function_id": "pipeline::analyze", "payload": body}) return {"status_code": 200, "body": result, "headers": {"Content-Type": "application/json"}} def heartbeat(data): with _LOCK: _STATE["heartbeats"] += 1 return {"ok": True} for fid, fn in [ ("text::normalize", normalize), ("text::tokenize", tokenize), ("text::sentiment", sentiment), ("text::keywords", keywords), ("pipeline::analyze", analyze), ("stats::report", report), ("http::analyze", http_analyze), ("cron::heartbeat", heartbeat), ]: worker.register_function(fid, fn) We define the core functions used in the text-analysis workflow, including normalization, tokenization, sentiment detection, and keyword extraction. We then create an analysis function that routes each step through the III engine instead of calling everything directly. We also add reporting, HTTP handling, and heartbeat functions before registering all of them with the worker. Copy CodeCopiedUse a different Browserworker.register_trigger({"type": "http", "function_id": "http::analyze", "config": {"api_path": "/analyze", "http_method": "POST"}}) cron_ok = False try: worker.register_trigger({"type": "cron", "function_id": "cron::heartbeat", "config": {"schedule": "*/2 * * * * *"}}) cron_ok = True except Exception as e: print("cron trigger skipped:", e) try: worker.connect() except Exception: pass time.sleep(2) We register an HTTP trigger so that the analysis pipeline can be invoked via a POST request. We also try to register a cron trigger that runs the heartbeat function on a fixed schedule, while safely skipping it if the engine build does not support that schema. We then connect the worker and pause briefly so the registered functions and triggers are ready to use. Copy CodeCopiedUse a different Browserprint("\n=== A) Direct invocation — orchestrated through the engine ===") docs = [ "iii makes the backend amazing and fast, I love how reliable it is", "The legacy gateway was slow and broken, a terrible buggy experience", "Workers register functions and triggers; the engine routes every call", ] for d in docs: r = worker.trigger({"function_id": "pipeline::analyze", "payload": {"text": d, "top_n": 4}}) print(f" [{r['sentiment']['label']:>8}] tokens={r['tokens']:>2} keywords={r['keywords']}") print("\n=== B) The SAME function over HTTP (:3111) — zero handler changes ===") import requests try: resp = requests.post(f"{HTTP_URL}/analyze", json={"text": "great great product, best ever", "top_n": 3}, timeout=10) print(" HTTP", resp.status_code, "->", resp.json()) except Exception as e: print(" HTTP call failed (engine HTTP module/version?):", e) print("\n=== C) Fire-and-forget invocation ===") if TriggerAction: worker.trigger({"function_id": "pipeline::analyze", "payload": {"text": "async win, no waiting"}, "action": TriggerAction.Void()}) print(" dispatched (no result awaited)") else: print(" TriggerAction not in this SDK build — skipping") print("\n=== D) Cron trigger firing on its own ===") if cron_ok: time.sleep(5) print(" heartbeats so far:", worker.trigger({"function_id": "stats::report", "payload": {}})["heartbeats"]) else: print(" cron not registered on this engine build") print("\n=== E) Aggregate state report ===") print(json.dumps(worker.trigger({"function_id": "stats::report", "payload
Related
相關文章
網易有道全面向AI轉型 全場景Agent矩陣亮相圖博會
{"id":"39ef5947-b77a-4904-bf03-ff6264f08dc4","object":"response","model":"deepseek-v4-flash","output":[],"stop_reason":"max_output_tokens","usage":{"input_tokens":154,"output_tokens":200,"total_tokens":354}}
MosaicLeaks: Can your research agent keep a secret?
Back to Articles MosaicLeaks: Can your research agent keep a secret? Enterprise Article Published June 18, 2026 Upvote - Alexander Gurung agurung Follow ServiceNow Rafael Pardinas rafapi-snow Follow ServiceNow TL;DR Deep research agents increasingly combine private local documents with external tools like web retrieval, creating a privacy risk: an agent's external queries may leak sensitive information. MosaicLeaks proposes a new deep-research task with multi-hop questions that interleave public and private information. Across the models we tested, agents frequently leaked private information, and training only for task performance made it worse. We propose a mosaic-leakage-aware RL training method, Privacy-Aware Deep Research (PA-DR), which raises strict chain success (the share of chains

騰訊老兵+大廠00後新銳,碼上飛想做的不只是AI Coding
這篇消息聚焦「騰訊老兵+大廠00後新銳,碼上飛想做的不只是AI Coding」。原始導語提到:已接入華為鴻蒙生態 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。

Agent引爆網盤大戰,騰訊、百度、阿里齊聚,這次爭的不再是下載速度
這篇消息聚焦「Agent引爆網盤大戰,騰訊、百度、阿里齊聚,這次爭的不再是下載速度」。原始導語提到:網盤成了Agent新基建。 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。

21年老牌企服公司的AI實驗:讓Agent跑一遍流程
這篇消息聚焦「21年老牌企服公司的AI實驗:讓Agent跑一遍流程」。原始導語提到:司盟企服接入騰訊雲WorkBuddy後,將海外郵件管理、審計理賬、訂單審核等高頻交付流程交給Agent先跑一遍 從 AI 情報角度來看,這類內容值得關注其背後的技術進展、產品落地、產業競爭與後續市場影響。
曹操出行宣佈啟動全面AI轉型,組織升級向AI原生公司邁進
曹操出行在2026國際汽車及供應鏈博覽會 上宣佈啟動全面AI轉型,併發布RoboX戰略,打造全球領先的物理AI移動科技平臺。與此同時,公司正式啟動組織升級,加快向AI原生公司邁進。為推動全面AI轉型,今年上半年,公司推進戰略聚焦,持續優化業務結構,主動收縮非核心業務,加快向AI原生公司轉型。