Skip to content

Reward System

We put reward calculation into the agent side instead of trainer side and use a separate reward layer for it. This is for severl reasons: 1. Reward calculation is related to the task itself. Different tasks may need different rewards. 2. Reward calculation can be designed to be asynchronous for efficiency.

Definition

Similar to tools, reward functions can be purely functional (no external resources) or stateful (using environments/resources via Context). The return should either be a float value, or a dictionary containing reward as one of keys. We can use the @reward decorator or inherit from the BaseReward class. Any additional keys in the returned dict (e.g. em, f1, fmt) are passed through and documented in training and validation.

A purely functional reward decorated with @reward:

@reward(name="qa_f1_reward")
def qa_f1_reward(final_response: str, answer: str, trajectory: List[str]) -> float:
    """
    Calculate the reward for the agent's response based on the F1 score.

    Args:
        prediction (str): The agent's predicted answer
        answer (str): The correct answer
        trajectory (List[str]): The agent's conversation trajectory

    Returns:
        dict: A dictionary containing the reward, F1 score, EM score, precision, and recall
            - reward (float): The reward value
            - f1 (float): The F1 score
            - em (float): The EM score
            - precision (float): The precision score
            - recall (float): The recall score
    """
    response = final_response
    f1, precision, recall = f1_score(response, answer)
    em = em_score(response, answer)

    return {
        "reward": f1,
        "f1": f1,
        "em": em,
        "precision": precision,
        "recall": recall,
    }

A class-based reward that holds external credentials or state, by inheriting from BaseReward:

class APIReward(BaseReward):
    name = "api_reward"

    def __init__(self, api_key):
        self.api_key = api_key

    def call(self, query: str):
        # call request with api key
        result = requests.request(api_key=self.api_key, query=query)
        return result["reward"]

A stateful async reward that shares an environment with the corresponding tools through Context:

from agentfly.core import Context
from agentfly.envs.webshop_text_env import WebShopSpec

@reward(name="webshop_reward")
async def webshop_reward(
    final_response: str, context: Context, task_id: int
) -> dict:
    """
    Calculates the reward for the WebShop environment based on the environment state.
    Uses the same rollout resource as the webshop tools (context.acquire_resource).

    Args:
        final_response (str): The agent's final response. Not used in this reward function.
        context (Context): Injected rollout context; used to acquire the WebShop resource.
        task_id (int): The identifier for the current task. Used to match with golden answer.

    Returns:
        dict: A dictionary containing the reward (float) and output (str) from the environment step.
    """
    try:
        env = await context.acquire_resource(spec=WebShopSpec, scope="global", backend="local")
        result = await env.step("get_reward", task_id)
        return {
            "reward": result["reward"],
            "output": result["observation"],
        }
    except Exception as e:
        return {
            "reward": 0.0,
            "output": f"Error webshop reward function: {e}",
        }

Predefined Fields

When an agent uses a reward function, it will automatically detect and fill several special fields when they appear in the reward signature: final_response, trajectory, and (via Context) the rollout-level identifiers.

  • final_response: The final response the agent generates for the task.
  • trajectory: The whole interaction trajectory.
  • context: The rollout execution context, which knows the rollout id, group id and provides acquire_resource for sharing environments with tools.

When defining the function, you can set these to your arguments and directly use them in reward calculation. For stateful rewards that need to share an environment with tools, prefer taking a context: Context argument and using context.acquire_resource(...) with the same ResourceSpec as the corresponding tools.

Additional Fields

Beside predefined fields, you can give additional fields in your task input. The input take the following format:

task_messages = {
    "messages": [
        "role": "user", "content": "Search the information about AgentFly and write a short summary."
    ],
    "length_penalty": True,
    "max_length": 2048,
}

await agent.run(
    messages=task_messages,
    max_turns=4,
)

In this example, two additional fields length_penalty and max_length is defined in the input. And your reward function can be defined with these two fields. After the agent finished the task, it will put these values to the reward. For example,

@reward(name="summary_reward_with_penalty")
def summary_reward(final_response, length_penalty, max_length):
    if length_penalty:
        if len(final_response) > max_length:
            return 0.0
        else:
            return 1.0
    else:
        return 1.0

Return Values

A reward function returns either a float or a dict containing a reward key. When a float is returned, it is used directly. When a dict is returned, the value at reward is used as the scalar reward and every other key is logged as an extra metric (reward_extra/{key}/mean, .../max, .../min) in the metrics produced by compute_data_metrics.

Internally, the framework normalizes both shapes into a typed RewardResult at the boundary (agentfly.rewards.calculate_reward), exposing .reward: float and .extras: Dict[str, Any]. You don't need to construct RewardResult yourself — the conversion is automatic. The typed form lands on the trajectory as Trajectory.reward (the float) and Trajectory.metrics (the extras dict).