reaktiv: Reactive State Management for Python

The State Management Dilemma in Python

Picture this: You’re designing a FastAPI WebSocket to broadcast real-time sensor data. Or orchestrating an IoT pipeline that reacts to environmental changes. Or building a long-running CLI tool that updates its UI based on user input.

In all these scenarios, a critical question arises: How do you efficiently propagate state changes without drowning in boilerplate?

Traditional approaches-manual subscriptions, observer patterns, or asyncio event loops-often lead to:

  • Spaghetti Code: Callbacks entangled like headphone wires.
  • Resource Leaks: Forgotten subscriptions haunting your memory.
  • Concurrency Bugs: Race conditions that defy logical explanation.

Enter reaktiv, a library that borrows the reactive programming paradigm from frontend frameworks like Angular and SolidJS, adapting it for Python’s async-first world.

Reactive Programming: A 30-Second History Lesson

Reactivity emerged in frontend ecosystems to solve a critical problem: How can UIs automatically update when data changes? Frameworks introduced signals-variables that track dependencies and notify consumers on mutation.

The result? Cleaner code, implicit dependency management, and frontend devs who stopped manually calling render().

Python, however, lagged behind. While libraries like RxPy offered reactive extensions, they demanded complex setups. reaktiv fills this gap with a minimalist, Pythonic API tailored for real-world backend and IoT use cases.

Core Concepts: Signals, Computed, and Effects

reaktiv’s power lies in three primitives:

  1. Signal: A mutable value container that notifies dependents on change.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span>
<span>temperature</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>)</span> <span># Initial value </span> <span>temperature</span><span>.</span><span>set</span><span>(</span><span>30.0</span><span>)</span> <span># Update value </span>
   <span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span>  
   <span>temperature</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>)</span>  <span># Initial value </span>   <span>temperature</span><span>.</span><span>set</span><span>(</span><span>30.0</span><span>)</span>       <span># Update value </span>
from reaktiv import signal temperature = signal(25.0) # Initial value temperature.set(30.0) # Update value

Enter fullscreen mode Exit fullscreen mode

  1. Computed: Derived values that auto-update when dependencies change.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>computed</span>
<span>feels_like</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>temperature</span><span>()</span> <span>*</span> <span>humidity</span><span>())</span>
   <span>from</span> <span>reaktiv</span> <span>import</span> <span>computed</span>  
   <span>feels_like</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>temperature</span><span>()</span> <span>*</span> <span>humidity</span><span>())</span>  
from reaktiv import computed feels_like = computed(lambda: temperature() * humidity())

Enter fullscreen mode Exit fullscreen mode

  1. Effect: Side effects triggered by signal/computed changes.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>effect</span>
<span>async</span> <span>def</span> <span>log_climate</span><span>():</span>
<span>print</span><span>(</span><span>f</span><span>"</span><span>Feels like: </span><span>{</span><span>feels_like</span><span>()</span><span>}</span><span>°C</span><span>"</span><span>)</span>
<span>log_effect</span> <span>=</span> <span>effect</span><span>(</span><span>log_climate</span><span>)</span> <span># Runs on `temperature` or `humidity` changes </span>
   <span>from</span> <span>reaktiv</span> <span>import</span> <span>effect</span>  
   <span>async</span> <span>def</span> <span>log_climate</span><span>():</span>  
       <span>print</span><span>(</span><span>f</span><span>"</span><span>Feels like: </span><span>{</span><span>feels_like</span><span>()</span><span>}</span><span>°C</span><span>"</span><span>)</span>  
   <span>log_effect</span> <span>=</span> <span>effect</span><span>(</span><span>log_climate</span><span>)</span>  <span># Runs on `temperature` or `humidity` changes </span>
from reaktiv import effect async def log_climate(): print(f"Feels like: {feels_like()}°C") log_effect = effect(log_climate) # Runs on `temperature` or `humidity` changes

Enter fullscreen mode Exit fullscreen mode

This triad enables declarative state management-define relationships once, let the library handle updates.

Why reaktiv Shines in Real-World Scenarios

1. FastAPI WebSockets: Real-Time Updates Without the Boilerplate

Consider a live dashboard tracking stock prices. With reaktiv:

<span>from</span> <span>fastapi</span> <span>import</span> <span>WebSocket</span>
<span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span><span>,</span> <span>effect</span>
<span>stocks</span> <span>=</span> <span>signal</span><span>({</span><span>"</span><span>AAPL</span><span>"</span><span>:</span> <span>150.0</span><span>,</span> <span>"</span><span>GOOGL</span><span>"</span><span>:</span> <span>2700.0</span><span>})</span>
<span>@app.websocket</span><span>(</span><span>"</span><span>/stocks</span><span>"</span><span>)</span>
<span>async</span> <span>def</span> <span>stock_feed</span><span>(</span><span>websocket</span><span>:</span> <span>WebSocket</span><span>):</span>
<span>await</span> <span>websocket</span><span>.</span><span>accept</span><span>()</span>
<span>async</span> <span>def</span> <span>send_updates</span><span>():</span>
<span>await</span> <span>websocket</span><span>.</span><span>send_json</span><span>(</span><span>stocks</span><span>())</span>
<span># Assign effect to variable to avoid garbage collection </span> <span>update_effect</span> <span>=</span> <span>effect</span><span>(</span><span>send_updates</span><span>)</span>
<span>try</span><span>:</span>
<span>while</span> <span>True</span><span>:</span>
<span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Simulate background updates </span> <span>finally</span><span>:</span>
<span>update_effect</span><span>.</span><span>dispose</span><span>()</span> <span># Cleanup </span>
<span>from</span> <span>fastapi</span> <span>import</span> <span>WebSocket</span>  
<span>from</span> <span>reaktiv</span> <span>import</span> <span>signal</span><span>,</span> <span>effect</span>  

<span>stocks</span> <span>=</span> <span>signal</span><span>({</span><span>"</span><span>AAPL</span><span>"</span><span>:</span> <span>150.0</span><span>,</span> <span>"</span><span>GOOGL</span><span>"</span><span>:</span> <span>2700.0</span><span>})</span>  

<span>@app.websocket</span><span>(</span><span>"</span><span>/stocks</span><span>"</span><span>)</span>  
<span>async</span> <span>def</span> <span>stock_feed</span><span>(</span><span>websocket</span><span>:</span> <span>WebSocket</span><span>):</span>  
    <span>await</span> <span>websocket</span><span>.</span><span>accept</span><span>()</span>  

    <span>async</span> <span>def</span> <span>send_updates</span><span>():</span>  
        <span>await</span> <span>websocket</span><span>.</span><span>send_json</span><span>(</span><span>stocks</span><span>())</span>  

    <span># Assign effect to variable to avoid garbage collection </span>    <span>update_effect</span> <span>=</span> <span>effect</span><span>(</span><span>send_updates</span><span>)</span>  

    <span>try</span><span>:</span>  
        <span>while</span> <span>True</span><span>:</span>  
            <span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span>  <span># Simulate background updates </span>    <span>finally</span><span>:</span>  
        <span>update_effect</span><span>.</span><span>dispose</span><span>()</span>  <span># Cleanup </span>
from fastapi import WebSocket from reaktiv import signal, effect stocks = signal({"AAPL": 150.0, "GOOGL": 2700.0}) @app.websocket("/stocks") async def stock_feed(websocket: WebSocket): await websocket.accept() async def send_updates(): await websocket.send_json(stocks()) # Assign effect to variable to avoid garbage collection update_effect = effect(send_updates) try: while True: await asyncio.sleep(1) # Simulate background updates finally: update_effect.dispose() # Cleanup

Enter fullscreen mode Exit fullscreen mode

When stocks.set(new_data) fires, all connected clients receive updates. No manual broadcast() calls. No polling.

2. IoT Systems: Reacting to Sensor Data

For an IoT pipeline monitoring air quality:

<span>pm25</span> <span>=</span> <span>signal</span><span>(</span><span>35.0</span><span>)</span>
<span>alert_threshold</span> <span>=</span> <span>signal</span><span>(</span><span>50.0</span><span>)</span>
<span># Derived air quality index </span><span>aqi</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>pm25</span><span>()</span> <span>/</span> <span>alert_threshold</span><span>()</span> <span>*</span> <span>100</span><span>)</span>
<span>async</span> <span>def</span> <span>trigger_alerts</span><span>():</span>
<span>if</span> <span>aqi</span><span>()</span> <span>></span> <span>100</span><span>:</span>
<span>await</span> <span>notify_admins</span><span>()</span>
<span>await</span> <span>activate_purifiers</span><span>()</span>
<span># Assign effect to persist it </span><span>alert_effect</span> <span>=</span> <span>effect</span><span>(</span><span>trigger_alerts</span><span>)</span>
<span># Simulate sensor input </span><span>pm25</span><span>.</span><span>set</span><span>(</span><span>55.0</span><span>)</span> <span># AQI = 110 → Alerts fire </span>
<span>pm25</span> <span>=</span> <span>signal</span><span>(</span><span>35.0</span><span>)</span>  
<span>alert_threshold</span> <span>=</span> <span>signal</span><span>(</span><span>50.0</span><span>)</span>  

<span># Derived air quality index </span><span>aqi</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>pm25</span><span>()</span> <span>/</span> <span>alert_threshold</span><span>()</span> <span>*</span> <span>100</span><span>)</span>  

<span>async</span> <span>def</span> <span>trigger_alerts</span><span>():</span>  
    <span>if</span> <span>aqi</span><span>()</span> <span>></span> <span>100</span><span>:</span>  
        <span>await</span> <span>notify_admins</span><span>()</span>  
        <span>await</span> <span>activate_purifiers</span><span>()</span>  

<span># Assign effect to persist it </span><span>alert_effect</span> <span>=</span> <span>effect</span><span>(</span><span>trigger_alerts</span><span>)</span>  

<span># Simulate sensor input </span><span>pm25</span><span>.</span><span>set</span><span>(</span><span>55.0</span><span>)</span>  <span># AQI = 110 → Alerts fire </span>
pm25 = signal(35.0) alert_threshold = signal(50.0) # Derived air quality index aqi = computed(lambda: pm25() / alert_threshold() * 100) async def trigger_alerts(): if aqi() > 100: await notify_admins() await activate_purifiers() # Assign effect to persist it alert_effect = effect(trigger_alerts) # Simulate sensor input pm25.set(55.0) # AQI = 110 → Alerts fire

Enter fullscreen mode Exit fullscreen mode

3. Long-Running Processes with User Interaction

Imagine a CLI tool that processes uploads while accepting user commands:

<span>upload_progress</span> <span>=</span> <span>signal</span><span>(</span><span>0</span><span>)</span>
<span>user_command</span> <span>=</span> <span>signal</span><span>(</span><span>"</span><span>pause</span><span>"</span><span>)</span>
<span>async</span> <span>def</span> <span>handle_upload</span><span>():</span>
<span>while</span> <span>upload_progress</span><span>()</span> <span><</span> <span>100</span><span>:</span>
<span>if</span> <span>user_command</span><span>()</span> <span>==</span> <span>"</span><span>pause</span><span>"</span><span>:</span>
<span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Wait for resume </span> <span>continue</span>
<span># Process chunk </span> <span>upload_progress</span><span>.</span><span>update</span><span>(</span><span>lambda</span> <span>x</span><span>:</span> <span>x</span> <span>+</span> <span>10</span><span>)</span>
<span>upload_effect</span> <span>=</span> <span>effect</span><span>(</span><span>handle_upload</span><span>)</span>
<span>upload_progress</span> <span>=</span> <span>signal</span><span>(</span><span>0</span><span>)</span>  
<span>user_command</span> <span>=</span> <span>signal</span><span>(</span><span>"</span><span>pause</span><span>"</span><span>)</span>  

<span>async</span> <span>def</span> <span>handle_upload</span><span>():</span>  
    <span>while</span> <span>upload_progress</span><span>()</span> <span><</span> <span>100</span><span>:</span>  
        <span>if</span> <span>user_command</span><span>()</span> <span>==</span> <span>"</span><span>pause</span><span>"</span><span>:</span>  
            <span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span>  <span># Wait for resume </span>            <span>continue</span>  
        <span># Process chunk </span>        <span>upload_progress</span><span>.</span><span>update</span><span>(</span><span>lambda</span> <span>x</span><span>:</span> <span>x</span> <span>+</span> <span>10</span><span>)</span>  

<span>upload_effect</span> <span>=</span> <span>effect</span><span>(</span><span>handle_upload</span><span>)</span>  
upload_progress = signal(0) user_command = signal("pause") async def handle_upload(): while upload_progress() < 100: if user_command() == "pause": await asyncio.sleep(1) # Wait for resume continue # Process chunk upload_progress.update(lambda x: x + 10) upload_effect = effect(handle_upload)

Enter fullscreen mode Exit fullscreen mode

User inputs (e.g., user_command.set("resume")) dynamically alter behavior without restarting the process.

Under the Hood: How reaktiv Tracks Dependencies

When you access a signal inside a computed or effect, reaktiv automatically records it as a dependency. This dependency graph ensures:

  • Efficiency: Only affected computations rerun on changes.
  • Glitch-Free: Batched updates prevent inconsistent intermediate states.

For example:

<span>a</span> <span>=</span> <span>signal</span><span>(</span><span>1</span><span>)</span>
<span>b</span> <span>=</span> <span>signal</span><span>(</span><span>2</span><span>)</span>
<span>c</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>a</span><span>()</span> <span>+</span> <span>b</span><span>())</span> <span># Depends on `a` and `b` </span><span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>c</span><span>()))</span> <span># Depends on `c` </span>
<span>a</span><span>.</span><span>set</span><span>(</span><span>3</span><span>)</span> <span># Recomputes `c`, then triggers effect </span>
<span>a</span> <span>=</span> <span>signal</span><span>(</span><span>1</span><span>)</span>  
<span>b</span> <span>=</span> <span>signal</span><span>(</span><span>2</span><span>)</span>  
<span>c</span> <span>=</span> <span>computed</span><span>(</span><span>lambda</span><span>:</span> <span>a</span><span>()</span> <span>+</span> <span>b</span><span>())</span>  <span># Depends on `a` and `b` </span><span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>c</span><span>()))</span> <span># Depends on `c` </span>
<span>a</span><span>.</span><span>set</span><span>(</span><span>3</span><span>)</span>  <span># Recomputes `c`, then triggers effect </span>
a = signal(1) b = signal(2) c = computed(lambda: a() + b()) # Depends on `a` and `b` eff = effect(lambda: print(c())) # Depends on `c` a.set(3) # Recomputes `c`, then triggers effect

Enter fullscreen mode Exit fullscreen mode

Advanced Features for Production Use

  1. Custom Equality Checks: Avoid unnecessary updates with custom comparators.
<span># Only trigger if the difference exceeds 5 </span> <span>temp</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>,</span> <span>equal</span><span>=</span><span>lambda</span> <span>old</span><span>,</span> <span>new</span><span>:</span> <span>abs</span><span>(</span><span>old</span> <span>-</span> <span>new</span><span>)</span> <span><</span> <span>5</span><span>)</span>
   <span># Only trigger if the difference exceeds 5 </span>   <span>temp</span> <span>=</span> <span>signal</span><span>(</span><span>25.0</span><span>,</span> <span>equal</span><span>=</span><span>lambda</span> <span>old</span><span>,</span> <span>new</span><span>:</span> <span>abs</span><span>(</span><span>old</span> <span>-</span> <span>new</span><span>)</span> <span><</span> <span>5</span><span>)</span>  
# Only trigger if the difference exceeds 5 temp = signal(25.0, equal=lambda old, new: abs(old - new) < 5)

Enter fullscreen mode Exit fullscreen mode

  1. Untracked Reads: Access a signal without subscribing to it.
<span>from</span> <span>reaktiv</span> <span>import</span> <span>untracked</span>
<span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>untracked</span><span>(</span><span>temp</span><span>)))</span> <span># No dependency on `temp` </span>
   <span>from</span> <span>reaktiv</span> <span>import</span> <span>untracked</span>  
   <span>eff</span> <span>=</span> <span>effect</span><span>(</span><span>lambda</span><span>:</span> <span>print</span><span>(</span><span>untracked</span><span>(</span><span>temp</span><span>)))</span>  <span># No dependency on `temp` </span>
from reaktiv import untracked eff = effect(lambda: print(untracked(temp))) # No dependency on `temp`

Enter fullscreen mode Exit fullscreen mode

  1. Effect Cleanup: Release resources when effects rerun or dispose.
<span>async</span> <span>def</span> <span>fetch_data</span><span>(</span><span>on_cleanup</span><span>):</span>
<span>timer</span> <span>=</span> <span>start_interval</span><span>(</span><span>update_data</span><span>,</span> <span>seconds</span><span>=</span><span>10</span><span>)</span>
<span>on_cleanup</span><span>(</span><span>timer</span><span>.</span><span>cancel</span><span>)</span> <span># Cancel on disposal </span>
   <span>async</span> <span>def</span> <span>fetch_data</span><span>(</span><span>on_cleanup</span><span>):</span>  
       <span>timer</span> <span>=</span> <span>start_interval</span><span>(</span><span>update_data</span><span>,</span> <span>seconds</span><span>=</span><span>10</span><span>)</span>  
       <span>on_cleanup</span><span>(</span><span>timer</span><span>.</span><span>cancel</span><span>)</span>  <span># Cancel on disposal </span>
async def fetch_data(on_cleanup): timer = start_interval(update_data, seconds=10) on_cleanup(timer.cancel) # Cancel on disposal

Enter fullscreen mode Exit fullscreen mode

Performance Considerations

reaktiv excels in lightweight to moderate workloads (e.g., hundreds of clients, IoT edge devices). However:

  • Scalability: For 10k+ WebSocket connections, pair reaktiv with dedicated brokers like Redis Pub/Sub.
  • Garbage Collection: Effects must be assigned to variables; otherwise, they’re garbage-collected prematurely.
  • Async Safety: Designed for thread-unsafe, single-threaded async code. Use with caution in threaded contexts.

When to Use reaktiv (and When Not To)

Ideal For:

  • Real-time dashboards (FastAPI/WebSocket).
  • IoT/edge computing pipelines.
  • CLI tools with dynamic user interaction.

Not Ideal For:

  • High-frequency trading systems (nanosecond latency).
  • Distributed systems requiring consensus (use actors or CRDTs).

Embrace Reactivity, Reduce Cognitive Load

reaktiv isn’t a silver bullet, but it is a pragmatic tool for simplifying state management in Python. By adopting patterns proven in frontend ecosystems, it lets you:

  • Declare relationships, not manual updates.
  • Integrate seamlessly with asyncio.
  • Focus on business logic, not boilerplate.

For senior developers, the value lies in reducing incidental complexity-the kind that breeds bugs and burns out teams.


Get Started:

pip <span>install </span>reaktiv
pip <span>install </span>reaktiv  
pip install reaktiv

Enter fullscreen mode Exit fullscreen mode

Explore the documentation and examples to see reaktiv in action.

原文链接:reaktiv: Reactive State Management for Python

© 版权声明
THE END
喜欢就支持一下吧
点赞9 分享
People do a lot of thinking, and sometimes, that's what kills us.
有时候是我们自己想太多才让自己如此难受
评论 抢沙发

请登录后发表评论

    暂无评论内容