Skip to content

Instantly share code, notes, and snippets.

@codeistpl
Last active March 19, 2021 14:19
Show Gist options
  • Save codeistpl/190b3866871405192975c9f81ee4eca3 to your computer and use it in GitHub Desktop.
Save codeistpl/190b3866871405192975c9f81ee4eca3 to your computer and use it in GitHub Desktop.
this is a bug reproduction in cvat on creating task with rest api
import os
import time
import requests
class Credentials:
def __init__(self):
self.username = None
self.password = None
def to_dict(self):
return {'username': self.username, 'password': self.password}
class NetworkServiceConfig:
def __init__(self):
self.address = None
self.credentials = Credentials()
PROTOCOL = "http://"
API_VER = "/api/v1"
class LoginError(Exception):
""" throw on login error """
class ProcessingTaskFailed(Exception):
pass
class CVATSession:
def __init__(self, cvat_config):
self._headers = None
self._config = cvat_config
def _make_url(self, endpoint: str):
return PROTOCOL + self._config.address + API_VER + endpoint
def login(self):
resp = requests.post(self._make_url("/auth/login"), json=self._config.credentials.to_dict())
if resp.status_code != 200:
raise LoginError(" Cannot login to cvat instance with: " + str(self._config))
key = resp.json()['key']
self._headers = {'Authorization': 'Token ' + key}
def create_task(self, task_data):
resp = requests.post(self._make_url("/tasks"), headers=self._headers, json=task_data)
resp.raise_for_status()
return resp.json()
def upload_task_local_media(self, task_id: int, media_files):
self.wait_for_task_processing_finishes_or_fails(task_id)
data = {}
files = {f"client_files[{i}]": open(f, 'rb') for i, f in enumerate(media_files)}
data['image_quality'] = 100
response = requests.post(self._make_url(f"/tasks/{task_id}/data"), headers=self._headers, data=data, files=files)
response.raise_for_status()
def wait_for_task_processing_finishes_or_fails(self, task_id: int, progress_clbk=None):
while True:
state = self.get_task_status(task_id)['state']
if state not in ['Queued', 'Started', 'Finished', 'Failed ']:
raise ValueError(f"get_task_status() got unexpected value of state : {state}")
if state == "Finished":
if progress_clbk is not None:
progress_clbk(state, task_id)
return
if state == "Failed":
raise ProcessingTaskFailed(f"Processing Task {task_id} Failed!!")
time.sleep(0.5)
def get_task_status(self, task_id: int):
resp = requests.get(self._make_url(f"/tasks/{task_id}/status"), headers=self._headers)
resp.raise_for_status()
return resp.json()
def get_annotations_for_task(self, task_id: int):
annotations = []
for job in self.get_task_jobs(task_id):
annotations.append(self.get_job_annotations(job["id"]))
self.wait_for_task_processing_finishes_or_fails(task_id)
return annotations
MEDIA_PATH = os.path.dirname(os.path.realpath(__file__))
IMAGE_PATH = os.path.join(MEDIA_PATH, "image.png")
cvat_config = NetworkServiceConfig()
cvat_config.address = "localhost:8080"
cvat_config.credentials.username = "usr1"
cvat_config.credentials.password = "Password.01"
task = {'name': 'buggy_task',
'id': None, 'status': 'undefined',
'labels': [{'name': 'buggy_label',
'color': '#000000',
'attributes': [{'name': 'checkbox',
'input_type': 'checkbox',
'mutable': True,
'default_value': 'default_value',
'values': []},
{'name': 'radio',
'input_type': 'radio',
'mutable': False,
'default_value': 'default_value',
'values': ['true', 'false', 'maybe']}]}]}
session = CVATSession(cvat_config)
session.login()
resp = session.create_task(task)
print(resp)
session.upload_task_local_media(resp["id"], [IMAGE_PATH])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment