跳到主要内容

Events

Events are composed of two parts: Event Triggers and Event Handlers.

  • Event Handlers 更新 state 的方式。它们由用户与 UI 的交互触发,例如单击按钮或悬停在元素上。事件也可以由页面加载或其他事件触发。

  • Event Triggers 是一类组件属性,用于发送事件至 Event Handlers。每个组件都支持一组 Event Triggers。

例:尝试将鼠标悬停在标题上以更改单词:

class WordCycleState(rx.State):
# The words to cycle through.
text: list[str] = ["Welcome", "to", "Reflex", "!"]

# The index of the current word.
index: int = 0

@rx.event
def next_word(self):
self.index = (self.index + 1) % len(self.text)

@rx.var
def get_text(self) -> str:
return self.text[self.index]


def event_triggers_example():
return rx.heading(
WordCycleState.get_text,
on_mouse_over=WordCycleState.next_word,
color="green",
)

Adding the @rx.event decorator above the event handler is strongly recommended

Event Arguments 事件参数

The event handler signature needs to match the event trigger definition argument count. If the event handler takes two arguments, the event trigger must be able to provide two arguments.

class EventArgStateSlider(rx.State):
value: int = 50

@rx.event
def set_end(self, value: list[int]):
self.value = value[0]


def slider_max_min_step():
return rx.vstack(
rx.heading(EventArgStateSlider.value),
rx.slider(
default_value=40,
on_value_commit=EventArgStateSlider.set_end,
),
width="100%",
)

When the number of args accepted by an EventHandler differs from that provided by the event trigger, an EventHandlerArgMismatch error will be raised.

Pass Additional Arguments to Event Handlers

将附加参数传递给 Event Handlers

在某些用例中,您可能希望将附加参数传递给您的事件处理程序。为此,您可以将事件触发器绑定到一个 lambda,该 lambda 可以使用您想要的参数调用您的事件处理程序。

class ArgState(rx.State):
colors: list[str] = [
"rgba(245,168,152)",
"MediumSeaGreen",
"#DEADE3",
]

@rx.event
def change_color(self, color: str, index: int):
self.colors[index] = color


def event_arguments_example():
return rx.hstack(
rx.input(
default_value=ArgState.colors[0],
on_blur=lambda c: ArgState.change_color(c, 0),
bg=ArgState.colors[0],
),
rx.input(
default_value=ArgState.colors[1],
on_blur=lambda c: ArgState.change_color(c, 1),
bg=ArgState.colors[1],
),
rx.input(
default_value=ArgState.colors[2],
on_blur=lambda c: ArgState.change_color(c, 2),
bg=ArgState.colors[2],
),
)

on_blur 事件触发器将输入的文本作为参数传递给 Lambda,Lambda 调用 change_color 事件处理程序并传递文本和输入的索引。

Events with Partial Arguments (Advanced)

在 Reflex 中,事件参数是按位置传递的。当事件被触发时,未传递给 EventHandler 的任何额外参数将由事件触发器填充。

以下两个代码示例是等效的:

# Use a lambda to pass event trigger args to the EventHandler.
rx.text(
on_blur=lambda v: MyState.handle_update("field1", v)
)

# Create a partial that passes event trigger args for any args not provided to the EventHandler.
rx.text(on_blur=MyState.handle_update("field1"))

Setters

Every base var has a built-in event handler to set it's value for convenience, called set_VARNAME.

假设您想要更改选择组件的值。您可以编写自己的事件处理程序来实现这一点:

options: list[str] = ["1", "2", "3", "4"]


class SetterState1(rx.State):
selected: str = "1"

@rx.event
def change(self, value):
self.selected = value


def code_setter():
return rx.vstack(
rx.badge(
SetterState1.selected, color_scheme="green"
),
rx.select(
options,
on_change=lambda value: SetterState1.change(
value
),
),
)

或者您可以使用内置的 setter 来简洁地实现。

options: list[str] = ["1", "2", "3", "4"]


class SetterState2(rx.State):
selected: str = "1"


def code_setter_2():
return rx.vstack(
rx.badge(
SetterState2.selected, color_scheme="green"
),
rx.select(
options,
on_change=SetterState2.set_selected,
),
)

在这个例子中, selected 的设置器是 set_selected 。这两个例子是等效的。

Yielding Updates

A regular event handler will send a StateUpdate when it has finished running. This works fine for basic event, but sometimes we need more complex logic. To update the UI multiple times in an event handler, we can yield when we want to send an update.

为此,我们可以使用 Python 关键字 yield 。对于函数内的每个 yield,将使用 StateUpdate 将更改发送到前端,直到事件处理程序执行到此时为止。

下面的示例显示了如何向 UI 产生 100 次更新。

class MultiUpdateState(rx.State):
count: int = 0

@rx.event
def timed_update(self):
for i in range(100):
self.count += 1
yield


def multi_update():
return rx.vstack(
rx.text(MultiUpdateState.count),
rx.button(
"Start", on_click=MultiUpdateState.timed_update
),
)

这里是另一个示例,显示多个更新并带有加载图标。

import asyncio


class ProgressExampleState(rx.State):
count: int = 0
show_progress: bool = False

@rx.event
async def increment(self):
self.show_progress = True
yield
# Think really hard.
await asyncio.sleep(0.5)
self.count += 1
self.show_progress = False


def progress_example():
return rx.button(
ProgressExampleState.count,
on_click=ProgressExampleState.increment,
loading=ProgressExampleState.show_progress,
)

Yielding Other Events

事件也可以产生其他事件。当您想要将事件链接在一起时,这将非常有用。要做到这一点,您可以 yeild 事件处理程序函数本身。

import asyncio


class YieldEventsState(rx.State):
count: int = 0
show_progress: bool = False

@rx.event
async def add_five(self):
self.show_progress = True
yield
# Think really hard.
await asyncio.sleep(1)
self.count += 5
self.show_progress = False

@rx.event
async def increment(self):
yield YieldEventsState.add_five
yield YieldEventsState.add_five
yield YieldEventsState.add_five


def multiple_yield_example():
return rx.button(
YieldEventsState.count,
on_click=YieldEventsState.increment,
loading=YieldEventsState.show_progress,
)

Chaining events 事件链接

Calling Event Handlers From Event Handlers

您可以在 Event Handler 中调用其他 Event Handler,以保持代码模块化。只需使用 self.call_handler 语法运行另一个事件处理程序。与往常一样,您可以在函数内部使用 yield 向前端发送增量更新。

import asyncio


class CallHandlerState(rx.State):
count: int = 0
progress: int = 0

@rx.event
async def run(self):
# Reset the count.
self.set_progress(0)
yield

# Count to 10 while showing progress.
for i in range(10):
# Wait and increment.
await asyncio.sleep(0.5)
self.count += 1

# Update the progress.
self.set_progress(i + 1)

# Yield to send the update.
yield


def call_handler_example():
return rx.vstack(
rx.badge(
CallHandlerState.count,
font_size="1.5em",
color_scheme="green",
),
rx.progress(
value=CallHandlerState.progress,
max=10,
width="100%",
),
rx.button("Run", on_click=CallHandlerState.run),
)

Returning Events From Event Handlers

到目前为止,我们只看到由组件触发的事件。然而,Event Handlers 也可以返回事件。

在 Reflex 中,Event Handler 是同步运行的,因此一次只能运行一个 Event Handler,并且队列中的事件将被阻塞,直到当前 Event Handler 完成。返回事件和调用 Event Handler 之间的区别在于,返回事件将事件发送到前端并解除队列的阻塞。

class CollatzState(rx.State):
count: int = 1

@rx.event
def start_collatz(self, count: str):
"""Run the collatz conjecture on the given number."""
self.count = abs(int(count if count else 1))
return CollatzState.run_step

@rx.event
async def run_step(self):
"""Run a single step of the collatz conjecture."""

while self.count > 1:
await asyncio.sleep(0.5)

if self.count % 2 == 0:
# If the number is even, divide by 2.
self.count /= 2
else:
# If the number is odd, multiply by 3 and add 1.
self.count = self.count * 3 + 1
yield


def collatz_example():
return rx.vstack(
rx.badge(
CollatzState.count,
font_size="1.5em",
color_scheme="green",
),
rx.input(on_blur=CollatzState.start_collatz),
)

在这个例子中,我们对用户输入的一个数字运行 Collatz 猜想。

当触发 on_blur 事件时,将调用事件处理程序 start_collatz 。它设置初始计数,然后调用 run_step ,直到计数达到 1 为止。

Special Events

Reflex 还具有内置的特殊事件

例如,事件处理程序可以在浏览器上触发警报。

class SpecialEventsState(rx.State):
@rx.event
def alert(self):
return rx.window_alert("Hello World!")


def special_events_example():
return rx.button(
"Alert", on_click=SpecialEventsState.alert
)

也可以直接在 UI 的 event trigger 中直接触发特殊事件

def special_events_example():
return rx.button(
"Alert", on_click=rx.window_alert("Hello World!")
)

Page Load Events 页面加载事件

当页面加载时,您还可以指定要运行的函数。这对于仅在每次渲染或状态更改时获取数据一次而不是每次都获取数据非常有用

class State(rx.State):
data: Dict[str, Any]

@rx.event
def get_data(self):
# Fetch data
self.data = fetch_data()


@rx.page(on_load=State.get_data)
def index():
return rx.text("A Beautiful App")

另一个例子是在页面加载时检查用户是否已经通过身份验证。如果用户未经过身份验证,我们会将其重定向到登录页面。如果他们已经通过身份验证,我们不会做任何操作,让他们访问页面。这个 on_load 事件将放置在每个需要身份验证才能访问的页面上。

class State(rx.State):
authenticated: bool

@rx.event
def check_auth(self):
# Check if user is authenticated
self.authenticated = check_auth()
if not self.authenticated:
return rx.redirect("/login")


@rx.page(on_load=State.check_auth)
def index():
return rx.text("A Beautiful App")

Background Tasks 后台任务

后台任务是一种特殊类型的 EventHandler ,可以与其他 EventHandler 函数并发运行。这使得长时间运行的任务可以在不阻塞 UI 交互性的情况下执行。

后台任务是通过在异步 State 方法上添加 @rx.event(background=True) 来定义的。

每当后台任务需要与状态交互时,它必须进入一个 async with self 上下文块,刷新状态并获取独占锁,以防止其他任务或事件处理程序同时修改它。因为其他 EventHandler 函数可能在任务运行时修改状态,所以在上下文块之外,后台任务访问的变量可能是过时的。试图在上下文块之外的后台任务中修改状态将引发 ImmutableStateError 异常。

在下面的示例中, my_task 事件处理程序使用 @rx.event(background=True) 进行装饰,并且每隔半秒增加 counter 变量一次,只要满足某些条件。当它正在运行时,UI 保持交互,并继续正常处理事件。

import asyncio
import reflex as rx


class MyTaskState(rx.State):
counter: int = 0
max_counter: int = 10
running: bool = False
_n_tasks: int = 0

@rx.event
def set_max_counter(self, value: str):
self.max_counter = int(value)

@rx.event(background=True)
async def my_task(self):
async with self:
# The latest state values are always available inside the context
if self._n_tasks > 0:
# only allow 1 concurrent task
return

# State mutation is only allowed inside context block
self._n_tasks += 1

while True:
async with self:
# Check for stopping conditions inside context
if self.counter >= self.max_counter:
self.running = False
if not self.running:
self._n_tasks -= 1
return

self.counter += 1

# Await long operations outside the context to avoid blocking UI
await asyncio.sleep(0.5)

@rx.event
def toggle_running(self):
self.running = not self.running
if self.running:
return MyTaskState.my_task

@rx.event
def clear_counter(self):
self.counter = 0


def background_task_example():
return rx.hstack(
rx.heading(MyTaskState.counter, " /"),
rx.input(
value=MyTaskState.max_counter,
on_change=MyTaskState.set_max_counter,
width="8em",
),
rx.button(
rx.cond(~MyTaskState.running, "Start", "Stop"),
on_click=MyTaskState.toggle_running,
),
rx.button(
"Reset",
on_click=MyTaskState.clear_counter,
),
)

Task Lifecycle 任务生命周期

后台任务被触发时,它会立即启动,并在 app.background_tasks 中保存对任务的引用。任务完成后,它将从集合中移除。

可以同时运行多个相同后台任务的实例,并且框架不会尝试避免重复启动任务。

开发人员需要确保在不希望出现的情况下不会创建重复任务。在上面的示例中, _n_tasks 后端变量用于控制 my_task 是否进入增量循环,或提前退出。

Background Task Limitations

后台任务大多像普通 EventHandler 方法一样工作,但有一些例外情况:

  • 后台任务必须是 async 函数。
  • 后台任务不能在 async with self 上下文块之外修改状态。
  • 后台任务可以在 async with self 上下文块之外读取状态,但值可能过时。
  • 后台任务不能直接从其他 event trigger 或后台任务调用。而应该使用 yieldreturn 来触发。

Event Actions

In Reflex, an event action is a special behavior that occurs during or after processing an event on the frontend.

事件操作可以修改浏览器处理 DOM 事件的方式,或在后端处理事件之前对事件进行节流和防抖。

通过访问所有 EventHandlers 和 EventSpecs 上存在的属性和方法来指定事件操作。

DOM Event Propagation DOM 事件传播

prevent_default 阻止默认行为

.prevent_default 操作会阻止浏览器对该操作的默认行为。可以将此操作添加到任何现有事件中,也可以通过将 rx.prevent_default 指定为事件处理程序来单独使用。

这种情况的常见用法是在单击链接时阻止导航。

rx.link(
"This Link Does Nothing",
href="https://reflex.dev/",
on_click=rx.prevent_default,
)
class LinkPreventDefaultState(rx.State):
status: bool = False

@rx.event
def toggle_status(self):
self.status = not self.status


def prevent_default_example():
return rx.vstack(
rx.heading(
f"The value is {LinkPreventDefaultState.status}"
),
rx.link(
"Toggle Value",
href="https://reflex.dev/",
on_click=LinkPreventDefaultState.toggle_status.prevent_default,
),
)

stop_propagation

.stop_propagation 操作会阻止事件传播到父元素。

下面的例子中,添加了 .stop_propagation 的 button,其 on_click 事件不会外溢给父元素

class StopPropagationState(rx.State):
where_clicked: list[str] = []

@rx.event
def handle_click(self, where: str):
self.where_clicked.append(where)

@rx.event
def handle_reset(self):
self.where_clicked = []


def stop_propagation_example():
return rx.vstack(
rx.button(
"btn1 - Stop Propagation",
on_click=StopPropagationState.handle_click(
"btn1"
).stop_propagation,
),
rx.button(
"btn2 - Normal Propagation",
on_click=StopPropagationState.handle_click(
"btn2"
),
),
rx.foreach(
StopPropagationState.where_clicked, rx.text
),
rx.button(
"Reset",
on_click=StopPropagationState.handle_reset.stop_propagation,
),
padding="2em",
border=f"1px dashed {rx.color('accent', 5)}",
on_click=StopPropagationState.handle_click("outer"),
)

Throttling and Debounce 节流与防抖

对于频繁触发的事件,采用节流或防抖策略有助于减少网络延迟并提升性能。这两种操作均接受一个参数,用于指定delay时间,单位为毫秒。

throttle 油门

被节流的事件将被丢弃

.throttle 操作限制了在给定时间段内事件被处理的次数。它对于 on_scrollon_mouse_move 事件非常有用,这些事件触发非常频繁,导致在后端处理时出现延迟。

在以下示例中, on_scroll 事件被节流为每半秒仅触发一次。

class ThrottleState(rx.State):
last_scroll: datetime.datetime | None

@rx.event
def handle_scroll(self):
self.last_scroll = datetime.datetime.now(
datetime.timezone.utc
)


def scroll_box():
return rx.scroll_area(
rx.heading("Scroll Me"),
*[rx.text(f"Item {i}") for i in range(100)],
height="75px",
width="50%",
border=f"1px solid {rx.color('accent', 5)}",
on_scroll=ThrottleState.handle_scroll.throttle(500), # 500ms
)


def throttle_example():
return (
scroll_box(),
rx.text(
f"Last Scroll Event: ",
rx.moment(
ThrottleState.last_scroll,
format="HH:mm:ss.SSS",
),
),
)

debounce 去抖动

去抖动事件被丢弃。

.debounce 操作将事件的处理延迟到指定的超时发生。如果在超时期间触发了另一个事件,计时器将重置,原始事件将被丢弃。

Debounce is useful for handling the final result of a series of events, such as moving a slider.

在以下示例中,滑块的 on_change 处理程序 update_value 仅在滑块值半秒内未发生变化后才会在后台触发

class DebounceState(rx.State):
settled_value: int = 50

@rx.event
def update_value(self, value: list[int]):
self.settled_value = value[0]


def debounced_slider():
return rx.slider(
key=rx.State.router.session.session_id,
default_value=[DebounceState.settled_value],
on_change=DebounceState.update_value.debounce(500),
width="100%",
)


def debounce_example():
return rx.vstack(
debounced_slider(),
rx.text(
f"Settled Value: {DebounceState.settled_value}"
),
)

为什么在滑块上设置键?

  • key 设置为带有动态 default_valuesession_id ,确保页面刷新时组件会重新渲染,以反映状态中更新的默认值。

  • 如果没有设置 key ,滑块在页面重新加载后将始终显示原始的 settled_value ,而不是其当前值。