Adding a Custom Plugin

Prerequisite: Adding a Custom Behavior Tree

Note

This tutorial is more advanced. It is recommended to complete Adding a Custom Behavior Tree and frequently refer to the API Reference.


Example: CBAA

Let’s implement the CBAA (Consensus-Based Auction Algorithm) based on the pseudocode from the paper, H. Choi, L. Brunet, J.P. How, “Consensus-Based Decentralized Auctions for Robust Task Allocation”, IEEE Transactions on Robotics, 25(4), 2009, pp. 912-926. CBAA is a simplified version of CBBA (Consensus-Based Bundle Algorithm), where a single robot is assigned to a single task.

CBAA is divided into two phases: Phase 1 (Task Selection) and Phase 2 (Conflict Resolution).

../../_images/cbaa_alg01.png

Figure 1: CBAA Phase 1 (Task Selection)

../../_images/cbaa_alg02.png

Figure 2: CBAA Phase 2 (Conflict Resolution)

Using the basic skeleton of the decision-making plugin shown earlier, we will now implement the CBAA algorithm.

Initialization

First, rename the class to CBAA and declare the variables necessary for decision-making, initializing them accordingly.

class CBAA:
    def __init__(self, agent):
        self.agent = agent
        self.assigned_task = None
        self.satisfied = False  # Rename if necessary

        # Define any variables if necessary
        self.x = {}  # Task assignment (key: task ID; value: 0 or 1)
        self.y = {}  # Winning bid list (key: task ID; value: bid amount)

In line with Algorithm 1, we have declared and initialized x and y.

Phase 1 (Task Selection)

Next, we need to implement the task selection process. In the pseudocode above in Algorithm 1, c_{ij} represents the value that agent i assigns to task j. We first need to define a subfunction to calculate this value:

def calculate_score(self, task):
    distance_to_task = self.agent.position.distance_to(task.position)
    # Time-discounted reward
    LAMBDA = 0.999
    expected_reward = LAMBDA**(distance_to_task/self.agent.max_speed + task.amount/self.agent.work_rate) * task.amount
    return expected_reward

This function calculates the reward by discounting the reward based on the time it takes for the agent to reach the task at maximum speed and to finish the task with its work rate.

Now, let’s implement Algorithm 1. We start by implementing Line 5:

if not self.satisfied:
# Line 5
    selectable_tasks = {}
    task_rewards = {}
    for task in local_tasks_info:
        task_reward = self.calculate_score(task)
        if task.task_id not in self.y or task_reward > self.y[task.task_id]:
            selectable_tasks[task.task_id] = task
            task_rewards[task.task_id] = task_reward

Lines 6-10 are implemented as follows:

# Lines 6-10
    if selectable_tasks:
        best_task_id = max(task_rewards, key=task_rewards.get)  # Line 7
        self.x[best_task_id] = 1  # Line 8
        self.y[best_task_id] = task_rewards[best_task_id]  # Line 9

        self.assigned_task = selectable_tasks[best_task_id]

Once Algorithm 1 is complete, share the y information with neighboring agents. This can be done by defining message_to_share as follows:

# Broadcasting
    if selectable_tasks:
        .....
        self.agent.message_to_share = {
            'agent_id': self.agent.agent_id,
            'winning_bids': self.y
        }
        self.satisfied = True

Note

The agent does not need to share any information if selectable_tasks is empty.

Phase 2 (Conflict Resolution)

Now, let’s implement Algorithm 2. This algorithm runs after receiving information from neighboring agents. In the next game loop, since self.satisfied = True, the following section will execute:

# Conflict-mitigating
if self.satisfied:
    best_task_id = self.assigned_task.task_id

# Lines 4-5
    winner_agent_candidates = {self.agent.agent_id: self.y[best_task_id]}  # Initialize with own data
    for other_agent_message in self.agent.messages_received:
        if other_agent_message:
            y_k = other_agent_message.get('winning_bids')
            self.y = merge_dicts(self.y, y_k)  # Line 4: Update winning bids
            if y_k.get(best_task_id):
                k_agent_id = other_agent_message.get('agent_id')
                winner_agent_candidates[k_agent_id] = y_k[best_task_id]

    winner_agent_id = max(winner_agent_candidates, key=winner_agent_candidates.get)

# Lines 6-8
    if winner_agent_id != self.agent.agent_id:
        self.x[best_task_id] = 0
        self.satisfied = False
        self.assigned_task = None

The main task of Algorithm 2 is to verify that the agent is the highest bidder for the task it selected. If another agent has a higher bid, the agent gives up the task.

Warning

Note that it is very important to reset messages_received after processing to avoid performance degradation:

if self.satisfied:
    ....
# Reset Messages
    self.agent.reset_messages_received()

Post-Process Upon Task Completion

Once the assigned task is completed, post-processing is required. In CBBA, if a task bundle is assigned, the next task in the bundle should be allocated. However, in CBAA, since only a single task is assigned, we mainly need to reset the following values to prepare for a new auction:

# Post-process if the previously assigned task is done
if self.assigned_task is not None and self.assigned_task.completed:
    # Implement your logic here
    self.assigned_task = None
    self.satisfied = False
    self.x = {}
    self.y = {}

Run this Example

Now, let’s run the new decision-making plugin. According to the instructions in Setting config.yaml, you’ll need to modify the configuration file to use this plugin. We’ll use the same configuration as in CBBA, but with the decision-making plugin replaced by the one we just created.

decision_making:
    plugin: plugins.cbaa.cbaa.CBAA

You can download the configuration file here: config.xml.

../../_images/CBAA_a10_t100_2024-08-24_04-55-03.gif