Subscribe to Toloka News
Subscribe to Toloka News
In recent years Reinforcement Learning has shown a significant progress for many tasks from playing Atari games and Go to plasma control. However, many RL problems need the engineer to correctly define the reward function: how much reward to give an agent for each action? This process is called the reward engineering and it's one of the most difficult parts of mastering the RL solution. It might take a lot of engineering time and result in unexpected behavior from the agent (see reward hacking). In this post, we will learn how to train RL agents without any reward engineering using human judgements. For this purpose we'll implement the paper "Deep Reinforcement Learning from Human Preferences" by OpenAI and DeepMind using Toloka crowdsourcing platform and imitation Python package.
Reinforcement Learning is a special type of Machine Learning problems where an algorithm interacts with some environment and recieves a feedback.
More formally, at each moment of time , the agent observes the environment state (e.g., agent's velocity and position) and needs to take an action from the action space (e.g., the pressure on acceleration pedal and angle of driving wheel rotation). After that, the agent's action changes the environment state and the agent recieves a reward representing how good this action was. The process repeats until the agent reaches the final state (car crash or stopping at the destination point). The machine learning goal here is to predict the best action based on the current state . Here, "the best" means one that maximizes the cumulative reward.
Thanks to many open-source projects, we can easily simulate the environment (see OpenAI Gym and MuJoCo) and train the state-of-the-art algorithm (see stable-baselines3). However, in order to do this, we need to correctly define the reward function $r$. It takes a lot of time and effort to provide a useful reward for complex tasks, so we'll do it by training a separate neural network as a reward predictor based on human judgements of AI's actions. We only need to collect these judgements. Of course, you can do it yourself or ask your friends or colleagues but it is not convinient and, what's more important, scalable way. There is a better solution: just use crowdsourcing!
Crowdsourcing has become a reliable solution for many data collection problems. It's widely used for collecting both research and production labeled datasets. For instance, ImageNet, MS COCO, and SQuAD 2.0 were collected with crowdsourcing. Let's dive into how modern crowdsourcing platforms work.
Crowdsourcing platforms are two-sided markets: there are requesters who provide microtasks (e.g., an image and a list of possible classes) and assign a small price for its completion and workers who complete these tasks and earn money.
In this post, we'll use the Toloka crowdsourcing platform since it has low fees, a large number of workers, and a convenient Python API that's crucial for us because we want to automize our annotation process.
We'll discuss how to use the platform later. Now let's move on to implementing the solution.
For the sake of example, we'll take a popular RL environment called Hopper provided by OpenAI Gym and MuJoCo and train an agent to do flips instead of walking. This follows the original paper and shows the problems of reward engineering: it's relatively simple to define a reasonable reward for walking (one can use the distance walked) but hard for flips since it's easy to explain what a backflip is to humans and difficult to mathematically measure its quality.
We will follow the paper "Deep reinforcement learning from human preferences" by Christiano et al. The core idea of this method is to record some random video clips of agent's actions, ask crowd workers to compare them, and fit the reward predictor based on the annotation results. Now let's dive into each part.
What are, mathematically, these random video clips? They are sequences of environment state and actions the agent did in each state:
This is also called the trajectory. Now we want to compare trajectories. How can we define an order on them? The authors proposed to compare the cumulative reward of two trajectories:
whenever
From the formulas above, we see that our reward predictor (some neural network) should be trained in a way that a comparison of cumulative predicted reward matches the result provided by a human.
Overall, the method can be described as follows:
Now let's look at the reward training procedure.
Assume we have a comparison of two trajectories and . For each trajectory we can predict the cummulative reward:
The result of the comarison is defined by the function . If crowd workers preferred trajectory 1 over 2, , . Otherwise, and .
Now we can define the network's loss on this comparison using the Bradley-Terry model:
where
This loss forces the reward predictor to predict higher rewards for pairs of states and actions of the preferred trajectory. We will cover all the implementation details below.
We will use the imitation Python package to implement this approach. It's a convenient and powerful library that implements various human-assistive RL approaches. Let's make a brief overview of the main parts of the final algorithm. We have two neural networks:
We also need several high-level modules:
Finally, the overall logic of the algorithm will be incorporated into the Preference Comparisons module. The imitation package is now under heavy development, so we can reuse some of the modules from it but for other modules we'll also need to make changes to them to align the implementation with the original paper.
To implement all these modules, we need to install all the necessary packages.
# First, you need to install MuJoCo. You can try this guide https://neptune.ai/blog/installing-mujoco-to-work-with-openai-gym-environments# Also, check out the PyTorch installation page to make sure it matches your CUDA version https://pytorch.org/get-started/locally/pip install imitation gym[all] toloka-kit crowd-kit
Let's look at the implementation of each part of the approach.
We use a simple Feed Forward policy network that will be trained with the Proximal Policy Optimization algorithm (PPO). This method is a reliable baseline for various RL tasks and we can reuse an implementation from stable-baselines 3.
import sealsimport gymfrom imitation.policies.base import FeedForward32Policy, NormalizeFeaturesExtractorfrom imitation.util.networks import RunningNormfrom stable_baselines3.common.vec_env import DummyVecEnvfrom stable_baselines3 import PPO# First, we need to define an environment. We use Hopper environment# wrapped by seals package.venv = DummyVecEnv([lambda: gym.make("seals/Hopper-v0")] * 8)# Second, we define the agentsagent = PPO(policy=FeedForward32Policy, # Our feed-forward policypolicy_kwargs=dict( # We just normalize state vectors and use them as featuresfeatures_extractor_class=NormalizeFeaturesExtractor,features_extractor_kwargs=dict(normalize_class=RunningNorm),),env=venv,seed=0,n_steps=2048 // venv.num_envs,batch_size=1024,ent_coef=0.0,learning_rate=0.0003,n_epochs=10)
For reward predictor, we also use a feed-forward network:
from reward_nets import BasicRewardNet reward_net = BasicRewardNet(venv.observation_space, # The features here are concatenation of statevenv.action_space, # and action vectors.normalize_input_layer=RunningNorm)
We can fully reuse the imitation's class.
trajectory_generator = preference_comparisons.AgentTrainer( algorithm=agent, reward_fn=reward_net, exploration_frac=0.0, seed=0,)
The idea here is the following. After we gathered the comparisons, we make a train-test split of them and send 67% to the training set and 33% to the validation. We train our reward net until validation loss stops decreasing. The original paper's authors proposed a trick to avoid significant overfitting and, at the same time, make sure that the loss is changing: adjust the L2 regularization coefficient to make the validation loss 1.1-1.3 times higher than the training loss. I did it in the following way: after the training has stopped, we multiply the weight_decay
parameter by 2 if the validation loss is more than 1.3 higher and by 0.5 otherwise.
Here you can see the changes to the imitation's class (they just train the model for a fixed number of epochs). For the full code please go to the attached repo.
def _update_weight_decay(self, factor): for g in self.optim.param_groups: g['weight_decay'] *= factor def _train(self, train_dataset: PreferenceDataset, val_dataset: PreferenceDataset, epoch_multiplier: float = 1.0): """Trains for `epoch_multiplier * self.epochs` epochs over `dataset`.""" # TODO(ejnnr): This isn't specific to the loss function or probability model. # In general, it might be best to split the probability model, the loss and # the optimization procedure a bit more cleanly so that different versions # can be combined train_dataloader = th.utils.data.DataLoader( train_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=preference_collate_fn, ) val_dataloader = th.utils.data.DataLoader( val_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=preference_collate_fn, ) val_loss_history = [] while True: train_loss = 0.0 for fragment_pairs, preferences in train_dataloader: self.optim.zero_grad() loss = self._loss(fragment_pairs, preferences) loss.backward() train_loss += loss.item() self.optim.step() self.logger.record("loss", loss.item()) train_loss /= len(train_dataloader) val_loss = 0.0 with th.no_grad(): for fragment_pairs, preferences in val_dataloader: loss = self._loss(fragment_pairs, preferences) val_loss += loss.item() self.logger.record("val_loss", loss.item()) val_loss /= len(val_dataloader) val_loss_history.append(val_loss) if len(val_loss_history) >= 4: if val_loss_history[-4] <= val_loss: break frac = val_loss / train_loss if frac > 1.3: self._update_weight_decay(2.0) else: self._update_weight_decay(0.5) print(f'Train loss: {round(train_loss, 4)}, val loss: {round(val_loss, 4)}')
To split trajectories into segments, the paper's authors proposed to use an ensemble of reward prediction nets and choose pairs of fragments with a high variance of predictions. We'll simplify it and will follow the imitation's implementation of simple random sampling.
fragmenter = preference_comparisons.RandomFragmenter(warning_threshold=0, seed=0)
This is the most interesting and difficult part of the method. Let's take a look at what this module should do in general.
Let's figure out how Toloka works. First, I suggest you take a look at the official requester's guide. After you've done this, you are ready to run the annotation.
In the beginning, we need to create a project. In this step, you need to choose the name and description of your project, which are visible to workers. I named my project "Help AI to Play Games (Robot Backflip)".
The next part is configuring the task's interface. We have two links to videos, so we want to place them side-by-side and add a radio button for choosing one of them. Luckily, Toloka has a built-in component layout.side-by-side
for this purpose.
Here is the final config:
{ "view": { "type": "layout.side-by-side", "items": [ { "type": "view.video", "validation": { "type": "condition.played", "hint": "Play the video" }, "url": { "type": "data.input", "path": "video1" } }, { "type": "view.video", "validation": { "type": "condition.played", "hint": "Play the video" }, "url": { "type": "data.input", "path": "video2" } } ], "controls": { "type": "view.list", "items": [ { "type": "field.button-radio-group", "label": "Which clip shows better AI actions?", "options": [ { "label": "A", "value": "left" }, { "label": "B", "value": "right" }, { "label": "Failed to load", "value": "error" } ], "validation": { "type": "condition.required", "hint": "choose one of the clips" }, "data": { "type": "data.output", "path": "result" } } ] } }, "plugins": [ { "1": { "type": "action.set", "data": { "type": "data.output", "path": "result" }, "payload": "left" }, "2": { "type": "action.set", "data": { "type": "data.output", "path": "result" }, "payload": "right" }, "3": { "type": "action.set", "data": { "type": "data.output", "path": "result" }, "payload": "error" }, "q": { "type": "action.play-pause", "view": { "$ref": "view.items.0" } }, "w": { "type": "action.play-pause", "view": { "$ref": "view.items.1" } }, "type": "plugin.hotkeys" }, { "type": "plugin.toloka", "layout": { "kind": "scroll", "taskWidth": 1000 } } ]}
The next part of building an interface is to define the input and output data format. We have two URLs and one text output:
The final step of the project's configuration is writing the instruction. It is the most important step. Here you need to explain to the workers what the agent needs to do. You might think about it as replacing the mathematical definition of reward with the definition in natural language. Your instruction should cover:
You can find my instruction here.
Since workers see the task for the first time, we need to show them how to complete it in practice. For this purpose, we employ training tasks. This is a special type of task on Toloka for which you provide the correct response and a hint that will show up when a worker answers incorrectly. You can use random video clips of the agent's actions and annotate them by yourself. Make sure that they are not too difficult. You might skip the training setup since it takes some time. However, I'd suggest you not run the project without a training attached because it will be more difficult to configure the quality control afterward. So, let's create it!
First, go to the "Training" tab on your project's page. Then, click "Add training". Now we need to configure the training pool. You can use my setting here.
Finally, we need to upload our training tasks. They must be in a TSV file with the following structure.
INPUT:<input_name>
columns.GOLDEN:output_name
.HINT:text
.You can use my training file from here.
To upload the tasks, click "Upload", select smart mixing and set the number of tasks on a single page. In our case, we will place all the training tasks on one page, so this number will be 10. Then, click "Upload" and choose your TSV file.
Now we will create a pool where the real tasks will be uploaded. To do this, you need to go to the project page and click "Add pool". Here we will configure the quality control.
Here's what we're going to set here
That's it. Now let's discuss how we will implement gatherer.
Here's the gatherer code:
class TolokaGatherer(preference_comparisons.PreferenceGatherer): def __init__( self, venv, path, aws_access_key_id, aws_secret_access_key, endpoint_url, bucket, toloka_token, base_pool, base_url, custom_logger: Optional[imit_logger.HierarchicalLogger] = None,): super().__init__(custom_logger=custom_logger) self.iteration = 0 self.venv = venv self.path = path self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.endpoint_url = endpoint_url self.bucket = bucket self.toloka_client = toloka.TolokaClient(toloka_token, 'PRODUCTION') self.base_pool = base_pool self.base_url = base_url def upload_file(self, file_name, object_name=None): # If S3 object_name was not specified, use file_name if object_name is None: object_name = os.path.basename(file_name) # Upload the file session = boto3.session.Session() s3_client = session.client( service_name='s3', aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, endpoint_url=self.endpoint_url, ) try: response = s3_client.upload_file(file_name, self.bucket, object_name) except ClientError as e: logging.error(e) return False return True def record_video(self, trajectory, path, output_filename): tmp_path = os.path.join(path, 'tmp_video') env = Monitor(gym.make(self.venv), tmp_path, force=True) _ = env.reset() initial_obs = trajectory.obs[0] initial_state = mujoco_py.MjSimState(time=0.0, qpos=initial_obs[:6], qvel=initial_obs[6:], act=None, udd_state={}) env.unwrapped.sim.set_state(initial_state) for act in trajectory.acts: next_state, reward, done, _ = env.step(act) env.close() for file in os.listdir(tmp_path): if file.endswith('.mp4'): tmp_file = os.path.join(tmp_path, file) break shutil.move(tmp_file, output_filename) shutil.rmtree(tmp_path) def record_trajectory_pair(self, trajectory_1, trajectory_2, index): pair_path = os.path.join(self.path, str(index)) os.mkdir(pair_path) self.record_video(trajectory_1, pair_path, os.path.join(pair_path, '0.mp4')) self.record_video(trajectory_2, pair_path, os.path.join(pair_path, '1.mp4')) def upload_files(self, iteration, n_comparisons): for i in range(n_comparisons): self.upload_file(os.path.join(self.path, str(i), '0.mp4'), f'{iteration}_{i}_0.mp4') self.upload_file(os.path.join(self.path, str(i), '1.mp4'), f'{iteration}_{i}_1.mp4') def make_videos(self, iteration, comparisons): os.mkdir(self.path) progress = tqdm(enumerate(comparisons), total=len(comparisons)) progress.set_description('Recording clips') for i, pair in progress: self.record_trajectory_pair(*pair, i) self.upload_files(iteration, len(comparisons)) shutil.rmtree(self.path) def wait_pool_for_close(self, pool_id, minutes_to_wait=1): sleep_time = 60 * minutes_to_wait pool = self.toloka_client.get_pool(pool_id) while not pool.is_closed(): op = self.toloka_client.get_analytics([toloka.analytics_request.CompletionPercentagePoolAnalytics(subject_id=pool.id)]) op = self.toloka_client.wait_operation(op) percentage = op.details['value'][0]['result']['value'] print(f'Pool {pool.id} - {percentage}%') time.sleep(sleep_time) pool = self.toloka_client.get_pool(pool.id) def run_toloka_annotation(self, n_comparisons, iteration): pool = self.toloka_client.clone_pool(pool_id=self.base_pool) pool.set_mixer_config( real_tasks_count=5, golden_tasks_count=0 ) pool.private_name = f'Iteration {iteration}' pool = self.toloka_client.update_pool(pool.id, pool) tasks = [ toloka.Task( pool_id=pool.id, input_values={'video1': f'{self.base_url}/{iteration}_{i}_0.mp4', 'video2': f'{self.base_url}/{iteration}_{i}_1.mp4'}, ) for i in range(n_comparisons) ] created_tasks = self.toloka_client.create_tasks(tasks, allow_defaults=True) print('Tasks created') pool = self.toloka_client.open_pool(pool.id) pool_id = pool.id self.wait_pool_for_close(pool_id) answers_df = self.toloka_client.get_assignments_df(pool_id) answers_df['task'] = answers_df.apply(lambda row: row['INPUT:video1'].split('/')[-1] + ' ' + row['INPUT:video2'].split('/')[-1], axis=1) agg_df = answers_df[['task', 'ASSIGNMENT:worker_id', 'OUTPUT:result']] agg_df.columns = ['task', 'worker', 'label'] agg_res = MajorityVote().fit_predict(agg_df) result = [] for i in range(n_comparisons): task = f'{iteration}_{i}_0.mp4 {iteration}_{i}_1.mp4' label = agg_res[task] if label == 'left': result.append(1) else: result.append(0) return np.array(result).astype(np.float32) def __call__(self, fragment_pairs: Sequence[TrajectoryWithRewPair]) -> np.ndarray: """Computes probability fragment 1 is preferred over fragment 2.""" self.make_videos(self.iteration, fragment_pairs) result = self.run_toloka_annotation(len(fragment_pairs), self.iteration) self.iteration += 1 return result
What's going on here? The gatherer receives a set of trajectory pairs. We need to transform them into videos. To do so, we use the Monitor
wrapper in gym setting the initial state to the first trajectory observation and do trajectories actions one by one.
After that, the resulting videos are uploaded to the S3 bucket. You can use any S3 storage you want. The only necessary things here are AWS Key ID
, AWS Secret Access Key
, connection URL, bucket name, and base URL to the uploaded files. Please go to your cloud provider's documentation to get all of them.
Finally, we use the Toloka-Kit package to do the following:
Let's create a gatherer.
gatherer = TolokaGatherer( "seals/Hopper-v0", 'pairs', <AWS Key ID>, <AWS Secret Access Key>, <Endpoint URL>, <Bucket name>, <Toloka token>, <Pool ID>, <Base URL>)
To get the Toloka token, go to the "Profile" page -> Integrations -> Get OAuth token. You can find your pool ID in the URL of the created pool's page that looks like .../requester/project/<project_id/pool/<pool_id>
.
The only change we need to do there is to add a validation dataset to the imitation package. You can find the changed module here.
pref_comparisons = preference_comparisons.PreferenceComparisons( trajectory_generator, reward_net, fragmenter=fragmenter, preference_gatherer=gatherer, reward_trainer=reward_trainer, comparisons_per_iteration=100, fragment_length=150, transition_oversampling=1, initial_comparison_frac=0.1, allow_variable_horizon=False, seed=0, initial_epoch_multiplier=20)
Finally, we can run the training.
pref_comparisons.train( total_timesteps=12000000, total_comparisons=9000,)th.save(reward_net.state_dict(), 'reward_net_hopper.pt')agent.save('ppo_agent_hopper')
That's it, make sure you have enough money on the Toloka account. It might be helpful to set small numbers in the snippet above to debug the code first without spending too much money.
The training takes some time but after a while, you'll get the trained agent and reward predictor. Below you can see the result of my run.
There are many other behaviors you might want to train your agent to do. For example, see https://github.com/nottombrown/rl-teacher.
Thank you for your time! Hope this post will help you to build better RL solutions.