Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions tracker/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
return [str(value)]


def _resolve_event_items(payload: dict[str, Any]) -> list[dict[str, Any]]:

Check failure on line 38 in tracker/compatibility.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=Smartappli_PyBehaviorLog&issues=AZ3Otm233L6z4FBH6r9C&open=AZ3Otm233L6z4FBH6r9C&pullRequest=36
schema = payload.get('schema', '')
if schema in {
'cowlog-results-v1',
'cowlog-results-v2',
'pybehaviorlog-0.9-session',
'pybehaviorlog-0.9.1-session',
'pybehaviorlog-0.8.3-session',
Expand All @@ -48,42 +49,76 @@
'boris-tabular-xlsx-v1',
'boris-tabular-spreadsheet-v2',
}:
return [item for item in payload.get('events', []) if isinstance(item, dict)]
events = payload.get('events', [])
if isinstance(events, dict):
events = list(events.values())
return [item for item in events if isinstance(item, dict)]
observations = payload.get('observations')
if isinstance(observations, dict):
observations = list(observations.values())
if isinstance(observations, list) and observations:
first = observations[0]
if isinstance(first, dict):
return [item for item in first.get('events', []) if isinstance(item, dict)]
if isinstance(payload.get('events'), list):
return [item for item in payload['events'] if isinstance(item, dict)]
merged_events: list[dict[str, Any]] = []
for observation in observations:
if not isinstance(observation, dict):
continue
observation_events = observation.get('events', [])
if isinstance(observation_events, dict):
observation_events = list(observation_events.values())
merged_events.extend(
[item for item in observation_events if isinstance(item, dict)]
)
return merged_events
root_events = payload.get('events')
if isinstance(root_events, dict):
root_events = list(root_events.values())
if isinstance(root_events, list):
return [item for item in root_events if isinstance(item, dict)]
return []


def _resolve_annotation_items(payload: dict[str, Any]) -> list[dict[str, Any]]:
if payload.get('schema', '').startswith('pybehaviorlog-'):
return [item for item in payload.get('annotations', []) if isinstance(item, dict)]
annotations = payload.get('annotations', [])
if isinstance(annotations, dict):
annotations = list(annotations.values())
return [item for item in annotations if isinstance(item, dict)]
observations = payload.get('observations')
if isinstance(observations, dict):
observations = list(observations.values())
if isinstance(observations, list) and observations:
first = observations[0]
if isinstance(first, dict):
return [item for item in first.get('annotations', []) if isinstance(item, dict)]
merged_annotations: list[dict[str, Any]] = []
for observation in observations:
if not isinstance(observation, dict):
continue
observation_annotations = observation.get('annotations', [])
if isinstance(observation_annotations, dict):
observation_annotations = list(observation_annotations.values())
merged_annotations.extend(
[item for item in observation_annotations if isinstance(item, dict)]
)
return merged_annotations
return []


def _resolve_segment_items(payload: dict[str, Any]) -> list[dict[str, Any]]:
if payload.get('schema', '').startswith('pybehaviorlog-'):
return [item for item in payload.get('segments', []) if isinstance(item, dict)]
segments = payload.get('segments', [])
if isinstance(segments, dict):
segments = list(segments.values())
return [item for item in segments if isinstance(item, dict)]
observations = payload.get('observations')
if isinstance(observations, dict):
observations = list(observations.values())
if isinstance(observations, list) and observations:
first = observations[0]
if isinstance(first, dict):
return [item for item in first.get('segments', []) if isinstance(item, dict)]
merged_segments: list[dict[str, Any]] = []
for observation in observations:
if not isinstance(observation, dict):
continue
observation_segments = observation.get('segments', [])
if isinstance(observation_segments, dict):
observation_segments = list(observation_segments.values())
merged_segments.extend([item for item in observation_segments if isinstance(item, dict)])
return merged_segments
return []


Expand Down
188 changes: 188 additions & 0 deletions tracker/tests/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ObservationEvent,
ObservationSession,
Project,
SessionAnnotation,
Subject,
)
from tracker.views import (
Expand Down Expand Up @@ -62,6 +63,114 @@ def test_load_session_import_payload_supports_cowlog_text(self):
self.assertEqual(payload['events'][0]['behavior'], 'Eat')
self.assertEqual(payload['events'][0]['modifiers'], ['Near'])

def test_load_session_import_payload_supports_cowlog_timecodes(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'00:01:02.500\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(payload['events'][0]['time'], 62.5)

def test_load_session_import_payload_supports_cowlog_iso8601_durations(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'PT1M2.5S\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(payload['events'][0]['time'], 62.5)

def test_load_session_import_payload_supports_cowlog_frame_timecodes(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'00:00:10:12\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertAlmostEqual(payload['events'][0]['time'], 10.48, places=3)

def test_load_session_import_payload_supports_smpte_semicolon_timecodes(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'00:00:10;12\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertAlmostEqual(payload['events'][0]['time'], 10.48, places=3)

def test_load_session_import_payload_supports_cowlog_frame_rate_metadata(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'# fps\t30\n00:00:10:15\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(report['frame_rate'], '30')
self.assertAlmostEqual(payload['events'][0]['time'], 10.5, places=3)

def test_load_session_import_payload_supports_cowlog_state_aliases(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'2.0\tStand\tbegin\tCow 1\n4.0\tStand\tend\tCow 1\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START)
self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP)

def test_load_session_import_payload_supports_symbolic_state_aliases(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'2.0\tStand\t+\tCow 1\n4.0\tStand\t-\tCow 1\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START)
self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP)

def test_load_session_import_payload_supports_semicolon_cowlog_rows(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'2.0;Stand;begin;Cow 1\n4.0;Stand;end;Cow 1\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START)
self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP)

def test_load_session_import_payload_parses_cowlog_metadata_annotations(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'# annotation\t3.0\tMarker\tInteresting moment\n1.0\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(report['annotation_count'], 1)
self.assertEqual(payload['annotations'][0]['title'], 'Marker')
self.assertEqual(payload['annotations'][0]['note'], 'Interesting moment')

def test_load_session_import_payload_parses_cowlog_headers(self):
upload = SimpleUploadedFile(
'cowlog.txt',
b'# session\tHeader Session\n# project\tHeader Project\n# primary_video\tclip.mp4\n1.0\tEat\tNear\n',
content_type='text/plain',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'cowlog-results-v1')
self.assertEqual(payload['metadata']['session'], 'Header Session')
self.assertEqual(payload['metadata']['project'], 'Header Project')
self.assertEqual(payload['metadata']['primary_video'], 'clip.mp4')

def test_session_import_view_accepts_cowlog_text(self):
upload = SimpleUploadedFile(
'cowlog.txt',
Expand Down Expand Up @@ -89,6 +198,73 @@ def test_load_session_import_payload_supports_state_intervals_from_tabular_rows(
self.assertEqual(payload['events'][0]['event_kind'], 'start')
self.assertEqual(payload['events'][1]['event_kind'], 'stop')

def test_load_session_import_payload_supports_tabular_timecodes(self):
upload = SimpleUploadedFile(
'boris_rows.csv',
b'time,stop,behavior\n00:00:05.100,00:00:08.600,Stand\n',
content_type='text/csv',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1')
self.assertEqual(payload['events'][0]['time'], 5.1)
self.assertEqual(payload['events'][1]['time'], 8.6)

def test_load_session_import_payload_supports_tabular_iso8601_durations(self):
upload = SimpleUploadedFile(
'boris_rows.csv',
b'time,stop,behavior\nPT5S,PT8.5S,Stand\n',
content_type='text/csv',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1')
self.assertEqual(payload['events'][0]['time'], 5.0)
self.assertEqual(payload['events'][1]['time'], 8.5)

def test_load_session_import_payload_supports_tabular_frame_timecodes(self):
upload = SimpleUploadedFile(
'boris_rows.csv',
b'time,stop,behavior\n00:00:05:10,00:00:08:20,Stand\n',
content_type='text/csv',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1')
self.assertAlmostEqual(payload['events'][0]['time'], 5.4, places=3)
self.assertAlmostEqual(payload['events'][1]['time'], 8.8, places=3)

def test_load_session_import_payload_supports_tabular_smpte_semicolon_timecodes(self):
upload = SimpleUploadedFile(
'boris_rows.csv',
b'time,stop,behavior\n00:00:05;10,00:00:08;20,Stand\n',
content_type='text/csv',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1')
self.assertAlmostEqual(payload['events'][0]['time'], 5.4, places=3)
self.assertAlmostEqual(payload['events'][1]['time'], 8.8, places=3)

def test_load_session_import_payload_supports_tabular_custom_frame_rate(self):
upload = SimpleUploadedFile(
'boris_rows.csv',
b'time,stop,behavior,frame_rate\n00:00:05:10,00:00:08:20,Stand,50\n',
content_type='text/csv',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1')
self.assertAlmostEqual(payload['events'][0]['time'], 5.2, places=3)
self.assertAlmostEqual(payload['events'][1]['time'], 8.4, places=3)

def test_load_session_import_payload_supports_state_duration_column(self):
upload = SimpleUploadedFile(
'boris_rows.csv',
b'time,duration,behavior\n10.0,2.5,Stand\n',
content_type='text/csv',
)
payload, report = load_session_import_payload(upload, self.session)
self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1')
self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START)
self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP)
self.assertEqual(payload['events'][1]['time'], 12.5)

def test_session_undo_and_redo_endpoints_restore_event_state(self):
response = self.client.post(
reverse('tracker:event_create_api', args=[self.session.pk]),
Expand Down Expand Up @@ -180,11 +356,23 @@ def test_export_endpoints_for_compatibility_formats(self):
event_kind=ObservationEvent.KIND_POINT,
timestamp_seconds=Decimal('1.000'),
)
SessionAnnotation.objects.create(
session=self.session,
timestamp_seconds=Decimal('1.500'),
title='Mark',
note='CowLog annotation line',
color='#f59e0b',
created_by=self.user,
)
response = self.client.get(
reverse('tracker:session_export_cowlog_txt', args=[self.session.pk])
)
self.assertEqual(response.status_code, 200)
self.assertIn('CowLog-compatible', response.content.decode('utf-8'))
self.assertIn(
'# annotation\t1.5\tMark\tCowLog annotation line',
response.content.decode('utf-8'),
)
response = self.client.get(
reverse('tracker:session_export_behavioral_sequences', args=[self.session.pk])
)
Expand Down
Loading
Loading