The State Management Dilemma in Python
Picture this: You’re designing a FastAPI WebSocket to broadcast real-time sensor data. Or orchestrating an IoT pipeline that reacts to environmental changes. Or building a long-running CLI tool that updates its UI based on user input.
In all these scenarios, a critical question arises: How do you efficiently propagate state changes without drowning in boilerplate?
Traditional approaches-manual subscriptions, observer patterns, or asyncio
event loops-often lead to:
- Spaghetti Code: Callbacks entangled like headphone wires.
- Resource Leaks: Forgotten subscriptions haunting your memory.
- Concurrency Bugs: Race conditions that defy logical explanation.
Enter reaktiv, a library that borrows the reactive programming paradigm from frontend frameworks like Angular and SolidJS, adapting it for Python’s async-first world.
Reactive Programming: A 30-Second History Lesson
Reactivity emerged in frontend ecosystems to solve a critical problem: How can UIs automatically update when data changes? Frameworks introduced signals-variables that track dependencies and notify consumers on mutation.
The result? Cleaner code, implicit dependency management, and frontend devs who stopped manually calling render()
.
Python, however, lagged behind. While libraries like RxPy offered reactive extensions, they demanded complex setups. reaktiv fills this gap with a minimalist, Pythonic API tailored for real-world backend and IoT use cases.
Core Concepts: Signals, Computed, and Effects
reaktiv’s power lies in three primitives:
- Signal: A mutable value container that notifies dependents on change.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span><span>temperature</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>)</span> <span># Initial value </span> <span>temperature</span><span>.</span><span>set</span><span>(</span><span>30.0</span><span>)</span> <span># Update value </span><span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span> <span>temperature</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>)</span> <span># Initial value </span> <span>temperature</span><span>.</span><span>set</span><span>(</span><span>30.0</span><span>)</span> <span># Update value </span>from reaktiv import signal temperature = signal(25.0) # Initial value temperature.set(30.0) # Update value
Enter fullscreen mode Exit fullscreen mode
- Computed: Derived values that auto-update when dependencies change.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>computed</span><span>feels_like</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>temperature</span><span>()</span> <span>*</span> <span>humidity</span><span>())</span><span>from</span> <span>reaktiv</span> <span>import</span> <span>computed</span> <span>feels_like</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>temperature</span><span>()</span> <span>*</span> <span>humidity</span><span>())</span>from reaktiv import computed feels_like = computed(lambda: temperature() * humidity())
Enter fullscreen mode Exit fullscreen mode
- Effect: Side effects triggered by signal/computed changes.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>effect</span><span>async</span> <span>def</span> <span>log_climate</span><span>():</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Feels like: </span><span>{</span><span>feels_like</span><span>()</span><span>}</span><span>°C</span><span>"</span><span>)</span><span>log_effect</span> <span>=</span> <span>effect</span><span>(</span><span>log_climate</span><span>)</span> <span># Runs on `temperature` or `humidity` changes </span><span>from</span> <span>reaktiv</span> <span>import</span> <span>effect</span> <span>async</span> <span>def</span> <span>log_climate</span><span>():</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Feels like: </span><span>{</span><span>feels_like</span><span>()</span><span>}</span><span>°C</span><span>"</span><span>)</span> <span>log_effect</span> <span>=</span> <span>effect</span><span>(</span><span>log_climate</span><span>)</span> <span># Runs on `temperature` or `humidity` changes </span>from reaktiv import effect async def log_climate(): print(f"Feels like: {feels_like()}°C") log_effect = effect(log_climate) # Runs on `temperature` or `humidity` changes
Enter fullscreen mode Exit fullscreen mode
This triad enables declarative state management-define relationships once, let the library handle updates.
Why reaktiv Shines in Real-World Scenarios
1. FastAPI WebSockets: Real-Time Updates Without the Boilerplate
Consider a live dashboard tracking stock prices. With reaktiv:
<span>from</span> <span>fastapi</span> <span>import</span> <span>WebSocket</span><span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span><span>,</span> <span>effect</span><span>stocks</span> <span>=</span> <span>signal</span><span>({</span><span>"</span><span>AAPL</span><span>"</span><span>:</span> <span>150.0</span><span>,</span> <span>"</span><span>GOOGL</span><span>"</span><span>:</span> <span>2700.0</span><span>})</span><span>@app.websocket</span><span>(</span><span>"</span><span>/stocks</span><span>"</span><span>)</span><span>async</span> <span>def</span> <span>stock_feed</span><span>(</span><span>websocket</span><span>:</span> <span>WebSocket</span><span>):</span><span>await</span> <span>websocket</span><span>.</span><span>accept</span><span>()</span><span>async</span> <span>def</span> <span>send_updates</span><span>():</span><span>await</span> <span>websocket</span><span>.</span><span>send_json</span><span>(</span><span>stocks</span><span>())</span><span># Assign effect to variable to avoid garbage collection </span> <span>update_effect</span> <span>=</span> <span>effect</span><span>(</span><span>send_updates</span><span>)</span><span>try</span><span>:</span><span>while</span> <span>True</span><span>:</span><span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Simulate background updates </span> <span>finally</span><span>:</span><span>update_effect</span><span>.</span><span>dispose</span><span>()</span> <span># Cleanup </span><span>from</span> <span>fastapi</span> <span>import</span> <span>WebSocket</span> <span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span><span>,</span> <span>effect</span> <span>stocks</span> <span>=</span> <span>signal</span><span>({</span><span>"</span><span>AAPL</span><span>"</span><span>:</span> <span>150.0</span><span>,</span> <span>"</span><span>GOOGL</span><span>"</span><span>:</span> <span>2700.0</span><span>})</span> <span>@app.websocket</span><span>(</span><span>"</span><span>/stocks</span><span>"</span><span>)</span> <span>async</span> <span>def</span> <span>stock_feed</span><span>(</span><span>websocket</span><span>:</span> <span>WebSocket</span><span>):</span> <span>await</span> <span>websocket</span><span>.</span><span>accept</span><span>()</span> <span>async</span> <span>def</span> <span>send_updates</span><span>():</span> <span>await</span> <span>websocket</span><span>.</span><span>send_json</span><span>(</span><span>stocks</span><span>())</span> <span># Assign effect to variable to avoid garbage collection </span> <span>update_effect</span> <span>=</span> <span>effect</span><span>(</span><span>send_updates</span><span>)</span> <span>try</span><span>:</span> <span>while</span> <span>True</span><span>:</span> <span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Simulate background updates </span> <span>finally</span><span>:</span> <span>update_effect</span><span>.</span><span>dispose</span><span>()</span> <span># Cleanup </span>from fastapi import WebSocket from reaktiv import signal, effect stocks = signal({"AAPL": 150.0, "GOOGL": 2700.0}) @app.websocket("/stocks") async def stock_feed(websocket: WebSocket): await websocket.accept() async def send_updates(): await websocket.send_json(stocks()) # Assign effect to variable to avoid garbage collection update_effect = effect(send_updates) try: while True: await asyncio.sleep(1) # Simulate background updates finally: update_effect.dispose() # Cleanup
Enter fullscreen mode Exit fullscreen mode
When stocks.set(new_data)
fires, all connected clients receive updates. No manual broadcast()
calls. No polling.
2. IoT Systems: Reacting to Sensor Data
For an IoT pipeline monitoring air quality:
<span>pm25</span> <span>=</span> <span>signal</span><span>(</span><span>35.0</span><span>)</span><span>alert_threshold</span> <span>=</span> <span>signal</span><span>(</span><span>50.0</span><span>)</span><span># Derived air quality index </span><span>aqi</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>pm25</span><span>()</span> <span>/</span> <span>alert_threshold</span><span>()</span> <span>*</span> <span>100</span><span>)</span><span>async</span> <span>def</span> <span>trigger_alerts</span><span>():</span><span>if</span> <span>aqi</span><span>()</span> <span>></span> <span>100</span><span>:</span><span>await</span> <span>notify_admins</span><span>()</span><span>await</span> <span>activate_purifiers</span><span>()</span><span># Assign effect to persist it </span><span>alert_effect</span> <span>=</span> <span>effect</span><span>(</span><span>trigger_alerts</span><span>)</span><span># Simulate sensor input </span><span>pm25</span><span>.</span><span>set</span><span>(</span><span>55.0</span><span>)</span> <span># AQI = 110 → Alerts fire </span><span>pm25</span> <span>=</span> <span>signal</span><span>(</span><span>35.0</span><span>)</span> <span>alert_threshold</span> <span>=</span> <span>signal</span><span>(</span><span>50.0</span><span>)</span> <span># Derived air quality index </span><span>aqi</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>pm25</span><span>()</span> <span>/</span> <span>alert_threshold</span><span>()</span> <span>*</span> <span>100</span><span>)</span> <span>async</span> <span>def</span> <span>trigger_alerts</span><span>():</span> <span>if</span> <span>aqi</span><span>()</span> <span>></span> <span>100</span><span>:</span> <span>await</span> <span>notify_admins</span><span>()</span> <span>await</span> <span>activate_purifiers</span><span>()</span> <span># Assign effect to persist it </span><span>alert_effect</span> <span>=</span> <span>effect</span><span>(</span><span>trigger_alerts</span><span>)</span> <span># Simulate sensor input </span><span>pm25</span><span>.</span><span>set</span><span>(</span><span>55.0</span><span>)</span> <span># AQI = 110 → Alerts fire </span>pm25 = signal(35.0) alert_threshold = signal(50.0) # Derived air quality index aqi = computed(lambda: pm25() / alert_threshold() * 100) async def trigger_alerts(): if aqi() > 100: await notify_admins() await activate_purifiers() # Assign effect to persist it alert_effect = effect(trigger_alerts) # Simulate sensor input pm25.set(55.0) # AQI = 110 → Alerts fire
Enter fullscreen mode Exit fullscreen mode
3. Long-Running Processes with User Interaction
Imagine a CLI tool that processes uploads while accepting user commands:
<span>upload_progress</span> <span>=</span> <span>signal</span><span>(</span><span>0</span><span>)</span><span>user_command</span> <span>=</span> <span>signal</span><span>(</span><span>"</span><span>pause</span><span>"</span><span>)</span><span>async</span> <span>def</span> <span>handle_upload</span><span>():</span><span>while</span> <span>upload_progress</span><span>()</span> <span><</span> <span>100</span><span>:</span><span>if</span> <span>user_command</span><span>()</span> <span>==</span> <span>"</span><span>pause</span><span>"</span><span>:</span><span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Wait for resume </span> <span>continue</span><span># Process chunk </span> <span>upload_progress</span><span>.</span><span>update</span><span>(</span><span>lambda</span> <span>x</span><span>:</span> <span>x</span> <span>+</span> <span>10</span><span>)</span><span>upload_effect</span> <span>=</span> <span>effect</span><span>(</span><span>handle_upload</span><span>)</span><span>upload_progress</span> <span>=</span> <span>signal</span><span>(</span><span>0</span><span>)</span> <span>user_command</span> <span>=</span> <span>signal</span><span>(</span><span>"</span><span>pause</span><span>"</span><span>)</span> <span>async</span> <span>def</span> <span>handle_upload</span><span>():</span> <span>while</span> <span>upload_progress</span><span>()</span> <span><</span> <span>100</span><span>:</span> <span>if</span> <span>user_command</span><span>()</span> <span>==</span> <span>"</span><span>pause</span><span>"</span><span>:</span> <span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Wait for resume </span> <span>continue</span> <span># Process chunk </span> <span>upload_progress</span><span>.</span><span>update</span><span>(</span><span>lambda</span> <span>x</span><span>:</span> <span>x</span> <span>+</span> <span>10</span><span>)</span> <span>upload_effect</span> <span>=</span> <span>effect</span><span>(</span><span>handle_upload</span><span>)</span>upload_progress = signal(0) user_command = signal("pause") async def handle_upload(): while upload_progress() < 100: if user_command() == "pause": await asyncio.sleep(1) # Wait for resume continue # Process chunk upload_progress.update(lambda x: x + 10) upload_effect = effect(handle_upload)
Enter fullscreen mode Exit fullscreen mode
User inputs (e.g., user_command.set("resume")
) dynamically alter behavior without restarting the process.
Under the Hood: How reaktiv Tracks Dependencies
When you access a signal inside a computed
or effect
, reaktiv automatically records it as a dependency. This dependency graph ensures:
- Efficiency: Only affected computations rerun on changes.
- Glitch-Free: Batched updates prevent inconsistent intermediate states.
For example:
<span>a</span> <span>=</span> <span>signal</span><span>(</span><span>1</span><span>)</span><span>b</span> <span>=</span> <span>signal</span><span>(</span><span>2</span><span>)</span><span>c</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>a</span><span>()</span> <span>+</span> <span>b</span><span>())</span> <span># Depends on `a` and `b` </span><span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>c</span><span>()))</span> <span># Depends on `c` </span><span>a</span><span>.</span><span>set</span><span>(</span><span>3</span><span>)</span> <span># Recomputes `c`, then triggers effect </span><span>a</span> <span>=</span> <span>signal</span><span>(</span><span>1</span><span>)</span> <span>b</span> <span>=</span> <span>signal</span><span>(</span><span>2</span><span>)</span> <span>c</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>a</span><span>()</span> <span>+</span> <span>b</span><span>())</span> <span># Depends on `a` and `b` </span><span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>c</span><span>()))</span> <span># Depends on `c` </span> <span>a</span><span>.</span><span>set</span><span>(</span><span>3</span><span>)</span> <span># Recomputes `c`, then triggers effect </span>a = signal(1) b = signal(2) c = computed(lambda: a() + b()) # Depends on `a` and `b` eff = effect(lambda: print(c())) # Depends on `c` a.set(3) # Recomputes `c`, then triggers effect
Enter fullscreen mode Exit fullscreen mode
Advanced Features for Production Use
- Custom Equality Checks: Avoid unnecessary updates with custom comparators.
<span># Only trigger if the difference exceeds 5 </span> <span>temp</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>,</span> <span>equal</span><span>=</span><span>lambda</span> <span>old</span><span>,</span> <span>new</span><span>:</span> <span>abs</span><span>(</span><span>old</span> <span>-</span> <span>new</span><span>)</span> <span><</span> <span>5</span><span>)</span><span># Only trigger if the difference exceeds 5 </span> <span>temp</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>,</span> <span>equal</span><span>=</span><span>lambda</span> <span>old</span><span>,</span> <span>new</span><span>:</span> <span>abs</span><span>(</span><span>old</span> <span>-</span> <span>new</span><span>)</span> <span><</span> <span>5</span><span>)</span># Only trigger if the difference exceeds 5 temp = signal(25.0, equal=lambda old, new: abs(old - new) < 5)
Enter fullscreen mode Exit fullscreen mode
- Untracked Reads: Access a signal without subscribing to it.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>untracked</span><span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>untracked</span><span>(</span><span>temp</span><span>)))</span> <span># No dependency on `temp` </span><span>from</span> <span>reaktiv</span> <span>import</span> <span>untracked</span> <span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>untracked</span><span>(</span><span>temp</span><span>)))</span> <span># No dependency on `temp` </span>from reaktiv import untracked eff = effect(lambda: print(untracked(temp))) # No dependency on `temp`
Enter fullscreen mode Exit fullscreen mode
- Effect Cleanup: Release resources when effects rerun or dispose.
<span>async</span> <span>def</span> <span>fetch_data</span><span>(</span><span>on_cleanup</span><span>):</span><span>timer</span> <span>=</span> <span>start_interval</span><span>(</span><span>update_data</span><span>,</span> <span>seconds</span><span>=</span><span>10</span><span>)</span><span>on_cleanup</span><span>(</span><span>timer</span><span>.</span><span>cancel</span><span>)</span> <span># Cancel on disposal </span><span>async</span> <span>def</span> <span>fetch_data</span><span>(</span><span>on_cleanup</span><span>):</span> <span>timer</span> <span>=</span> <span>start_interval</span><span>(</span><span>update_data</span><span>,</span> <span>seconds</span><span>=</span><span>10</span><span>)</span> <span>on_cleanup</span><span>(</span><span>timer</span><span>.</span><span>cancel</span><span>)</span> <span># Cancel on disposal </span>async def fetch_data(on_cleanup): timer = start_interval(update_data, seconds=10) on_cleanup(timer.cancel) # Cancel on disposal
Enter fullscreen mode Exit fullscreen mode
Performance Considerations
reaktiv excels in lightweight to moderate workloads (e.g., hundreds of clients, IoT edge devices). However:
- Scalability: For 10k+ WebSocket connections, pair reaktiv with dedicated brokers like Redis Pub/Sub.
- Garbage Collection: Effects must be assigned to variables; otherwise, they’re garbage-collected prematurely.
- Async Safety: Designed for thread-unsafe, single-threaded async code. Use with caution in threaded contexts.
When to Use reaktiv (and When Not To)
Ideal For:
- Real-time dashboards (FastAPI/WebSocket).
- IoT/edge computing pipelines.
- CLI tools with dynamic user interaction.
Not Ideal For:
- High-frequency trading systems (nanosecond latency).
- Distributed systems requiring consensus (use actors or CRDTs).
Embrace Reactivity, Reduce Cognitive Load
reaktiv isn’t a silver bullet, but it is a pragmatic tool for simplifying state management in Python. By adopting patterns proven in frontend ecosystems, it lets you:
- Declare relationships, not manual updates.
- Integrate seamlessly with
asyncio
. - Focus on business logic, not boilerplate.
For senior developers, the value lies in reducing incidental complexity-the kind that breeds bugs and burns out teams.
Get Started:
pip <span>install </span>reaktivpip <span>install </span>reaktivpip install reaktiv
Enter fullscreen mode Exit fullscreen mode
Explore the documentation and examples to see reaktiv in action.
暂无评论内容