From 058597b4f4762a5df1413b74385054c0fe739a6c Mon Sep 17 00:00:00 2001 From: Olivier DEBAUCHE Date: Fri, 1 May 2026 13:34:55 +0200 Subject: [PATCH] Support ratio FPS tokens for BORIS/CowLog frame timecodes --- plan.md | 94 ++++++ tracker/compatibility.py | 63 +++- tracker/tests/test_compatibility.py | 269 ++++++++++++++++ tracker/tests/test_helpers.py | 134 ++++++++ tracker/tests/test_roundtrip.py | 24 ++ tracker/views.py | 478 +++++++++++++++++++++++----- 6 files changed, 969 insertions(+), 93 deletions(-) create mode 100644 plan.md diff --git a/plan.md b/plan.md new file mode 100644 index 0000000..02d2d65 --- /dev/null +++ b/plan.md @@ -0,0 +1,94 @@ +# Plan d'implémentation restant BORIS/CowLog + +## Objectif +Atteindre une compatibilité fonctionnelle plus complète avec les formats récents BORIS/CowLog, +au-delà de la compatibilité actuelle orientée import/export principal. + +## 1) Stabiliser la base de compatibilité (priorité haute) +- [ ] **Définir une matrice officielle des schémas supportés** (BORIS v1..vN, CowLog résultats v1..vN, variantes tabulaires CSV/TSV/XLSX, payloads mapping/list). +- [ ] **Versionner explicitement les extensions maison** (ex. métadonnées CowLog enrichies, préfixes d'observation fusionnées). +- [ ] **Éviter les ambiguïtés de parsing automatique** (CowLog texte vs tabulaire) via règles de détection déterministes documentées. +- [ ] **Ajouter un mode strict/lenient** pour import: + - strict = rejet des champs non documentés/incohérents + - lenient = tolérance + warnings détaillés. + +## 2) Compléter la fidélité BORIS (priorité haute) +- [ ] **Importer/exporter toutes les observations BORIS sans perte de contexte**: + - identifiants d'observation + - médias synchronisés par observation + - variables par observation + - commentaires/notes d'observation. +- [ ] **Préserver la structure multi-observation lors de l'export** (pas uniquement fusionnée), avec option de fusion configurable. +- [ ] **Supporter entièrement les colonnes BORIS tabulaires avancées**: + - start/stop/duration/frame/fps + - colonnes alias documentées BORIS + - annotation rows enrichies. +- [ ] **Ajouter un validateur d'intégrité BORIS** dédié (state pairs, ordre temporel, overlap states, comportements inconnus). + +## 3) Compléter la fidélité CowLog (priorité haute) +- [ ] **Normaliser le profil CowLog “texte résultats”**: + - en-têtes standard (session/projet/observer/video/fps) + - annotations métadonnées + - variantes de séparateurs/tabulations. +- [ ] **Améliorer la reconstruction des états CowLog**: + - stratégie configurable pour point/start/stop implicites + - rapport explicite des pertes de fidélité. +- [ ] **Garantir le round-trip CowLog↔PyBehaviorLog↔CowLog** avec mêmes métadonnées clés (dont fps/observer). + +## 4) Timecodes et frame-rate (priorité moyenne) +- [ ] **Centraliser un parseur temporel unique** (décimal, ISO8601, SMPTE, frame). +- [ ] **Ajouter la gestion explicite du drop-frame SMPTE** (si nécessaire selon corpus cible). +- [ ] **Rendre la résolution FPS explicite et traçable**: + - priorité: row > metadata > variable > défaut + - écrire la source FPS utilisée dans le rapport d'import. + +## 5) Rapports et diagnostics (priorité moyenne) +- [ ] **Étendre les rapports de compatibilité** pour lister: + - champs ignorés + - conversions appliquées + - pertes potentielles de fidélité + - niveau de confiance du parsing. +- [ ] **Ajouter un export “diagnostic JSON”** par import, archivable en CI. + +## 6) Tests et certification (priorité haute) +- [ ] **Constituer un corpus de fixtures BORIS/CowLog réels** (versions et variantes récentes). +- [ ] **Ajouter des tests de non-régression paramétrés**: + - mapping vs list + - multi-observation + - timecodes exotiques + - séparateurs régionaux. +- [ ] **Mettre en place des tests round-trip sémantiques** (pas seulement structurels). +- [ ] **Ajouter un job CI “compatibility certification”** avec seuil de réussite. + +## 7) UX / produit (priorité moyenne) +- [ ] **Ajouter un écran de prévisualisation avant import**: + - format détecté + - fps détecté + - nombre d'événements/annotations + - warnings bloquants/non bloquants. +- [ ] **Permettre à l'utilisateur de corriger manuellement**: + - fps + - mapping des colonnes + - stratégie state reconstruction. + +## 8) Documentation (priorité haute) +- [ ] **Mettre à jour la documentation de compatibilité** avec un tableau clair: + - “supporté totalement” + - “supporté partiellement” + - “non supporté”. +- [ ] **Documenter les limites connues** et les chemins recommandés (BORIS JSON vs CowLog texte). +- [ ] **Publier un guide de migration/import** pour laboratoires utilisant BORIS/CowLog. + +--- + +## Plan d'exécution recommandé (ordre) +1. Stabilisation parsing + mode strict/lenient. +2. Fidélité BORIS multi-observation complète. +3. Fidélité CowLog round-trip complète. +4. Diagnostics enrichis et CI de certification. +5. UX de pré-import et documentation finale. + +## Critères de fin +- Corpus de référence BORIS/CowLog passe à > 99% d'équivalence sémantique. +- Différences résiduelles documentées automatiquement dans les rapports. +- Pipeline CI bloque toute régression de compatibilité. diff --git a/tracker/compatibility.py b/tracker/compatibility.py index 574d70e..edd93c8 100644 --- a/tracker/compatibility.py +++ b/tracker/compatibility.py @@ -39,6 +39,7 @@ def _resolve_event_items(payload: dict[str, Any]) -> list[dict[str, Any]]: schema = payload.get('schema', '') if schema in { 'cowlog-results-v1', + 'cowlog-results-v2', 'pybehaviorlog-0.9-session', 'pybehaviorlog-0.9.1-session', 'pybehaviorlog-0.8.3-session', @@ -48,42 +49,76 @@ def _resolve_event_items(payload: dict[str, Any]) -> list[dict[str, Any]]: 'boris-tabular-xlsx-v1', 'boris-tabular-spreadsheet-v2', }: - return [item for item in payload.get('events', []) if isinstance(item, dict)] + events = payload.get('events', []) + if isinstance(events, dict): + events = list(events.values()) + return [item for item in events if isinstance(item, dict)] observations = payload.get('observations') if isinstance(observations, dict): observations = list(observations.values()) if isinstance(observations, list) and observations: - first = observations[0] - if isinstance(first, dict): - return [item for item in first.get('events', []) if isinstance(item, dict)] - if isinstance(payload.get('events'), list): - return [item for item in payload['events'] if isinstance(item, dict)] + merged_events: list[dict[str, Any]] = [] + for observation in observations: + if not isinstance(observation, dict): + continue + observation_events = observation.get('events', []) + if isinstance(observation_events, dict): + observation_events = list(observation_events.values()) + merged_events.extend( + [item for item in observation_events if isinstance(item, dict)] + ) + return merged_events + root_events = payload.get('events') + if isinstance(root_events, dict): + root_events = list(root_events.values()) + if isinstance(root_events, list): + return [item for item in root_events if isinstance(item, dict)] return [] def _resolve_annotation_items(payload: dict[str, Any]) -> list[dict[str, Any]]: if payload.get('schema', '').startswith('pybehaviorlog-'): - return [item for item in payload.get('annotations', []) if isinstance(item, dict)] + annotations = payload.get('annotations', []) + if isinstance(annotations, dict): + annotations = list(annotations.values()) + return [item for item in annotations if isinstance(item, dict)] observations = payload.get('observations') if isinstance(observations, dict): observations = list(observations.values()) if isinstance(observations, list) and observations: - first = observations[0] - if isinstance(first, dict): - return [item for item in first.get('annotations', []) if isinstance(item, dict)] + merged_annotations: list[dict[str, Any]] = [] + for observation in observations: + if not isinstance(observation, dict): + continue + observation_annotations = observation.get('annotations', []) + if isinstance(observation_annotations, dict): + observation_annotations = list(observation_annotations.values()) + merged_annotations.extend( + [item for item in observation_annotations if isinstance(item, dict)] + ) + return merged_annotations return [] def _resolve_segment_items(payload: dict[str, Any]) -> list[dict[str, Any]]: if payload.get('schema', '').startswith('pybehaviorlog-'): - return [item for item in payload.get('segments', []) if isinstance(item, dict)] + segments = payload.get('segments', []) + if isinstance(segments, dict): + segments = list(segments.values()) + return [item for item in segments if isinstance(item, dict)] observations = payload.get('observations') if isinstance(observations, dict): observations = list(observations.values()) if isinstance(observations, list) and observations: - first = observations[0] - if isinstance(first, dict): - return [item for item in first.get('segments', []) if isinstance(item, dict)] + merged_segments: list[dict[str, Any]] = [] + for observation in observations: + if not isinstance(observation, dict): + continue + observation_segments = observation.get('segments', []) + if isinstance(observation_segments, dict): + observation_segments = list(observation_segments.values()) + merged_segments.extend([item for item in observation_segments if isinstance(item, dict)]) + return merged_segments return [] diff --git a/tracker/tests/test_compatibility.py b/tracker/tests/test_compatibility.py index 9a7b2e8..1f62444 100644 --- a/tracker/tests/test_compatibility.py +++ b/tracker/tests/test_compatibility.py @@ -8,10 +8,13 @@ from tracker.models import ( Behavior, + IndependentVariableDefinition, Modifier, ObservationEvent, ObservationSession, + ObservationVariableValue, Project, + SessionAnnotation, Subject, ) from tracker.views import ( @@ -62,6 +65,159 @@ def test_load_session_import_payload_supports_cowlog_text(self): self.assertEqual(payload['events'][0]['behavior'], 'Eat') self.assertEqual(payload['events'][0]['modifiers'], ['Near']) + def test_load_session_import_payload_supports_cowlog_timecodes(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'00:01:02.500\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(payload['events'][0]['time'], 62.5) + + def test_load_session_import_payload_supports_cowlog_iso8601_durations(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'PT1M2.5S\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(payload['events'][0]['time'], 62.5) + + def test_load_session_import_payload_supports_cowlog_frame_timecodes(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'00:00:10:12\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 10.48, places=3) + + def test_load_session_import_payload_supports_smpte_semicolon_timecodes(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'00:00:10;12\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 10.48, places=3) + + def test_load_session_import_payload_supports_cowlog_frame_rate_metadata(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# fps\t30\n00:00:10:15\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(report['frame_rate'], '30') + self.assertAlmostEqual(payload['events'][0]['time'], 10.5, places=3) + + def test_load_session_import_payload_supports_cowlog_frame_rate_with_unit(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# fps\t29.97 fps\n00:00:10:15\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(report['frame_rate'], '29.97 fps') + self.assertAlmostEqual(payload['events'][0]['time'], 10.5005, places=3) + + def test_load_session_import_payload_supports_cowlog_frame_rate_ratio(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# fps\t30000/1001\n00:00:10:15\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(report['frame_rate'], '30000/1001') + self.assertAlmostEqual(payload['events'][0]['time'], 10.5005, places=3) + + def test_load_session_import_payload_supports_cowlog_colon_metadata(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# fps:30\n00:00:10:15\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(report['frame_rate'], '30') + self.assertAlmostEqual(payload['events'][0]['time'], 10.5, places=3) + + def test_load_session_import_payload_supports_cowlog_state_aliases(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'2.0\tStand\tbegin\tCow 1\n4.0\tStand\tend\tCow 1\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START) + self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP) + + def test_load_session_import_payload_supports_symbolic_state_aliases(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'2.0\tStand\t+\tCow 1\n4.0\tStand\t-\tCow 1\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START) + self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP) + + def test_load_session_import_payload_supports_semicolon_cowlog_rows(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'2.0;Stand;begin;Cow 1\n4.0;Stand;end;Cow 1\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START) + self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP) + + def test_load_session_import_payload_parses_cowlog_metadata_annotations(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# annotation\t3.0\tMarker\tInteresting moment\n1.0\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(report['annotation_count'], 1) + self.assertEqual(payload['annotations'][0]['title'], 'Marker') + self.assertEqual(payload['annotations'][0]['note'], 'Interesting moment') + + def test_load_session_import_payload_parses_quoted_cowlog_metadata_annotations(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# annotation 3.0 Marker \"Interesting moment with spaces\"\n1.0\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(report['annotation_count'], 1) + self.assertEqual(payload['annotations'][0]['title'], 'Marker') + self.assertEqual(payload['annotations'][0]['note'], 'Interesting moment with spaces') + + def test_load_session_import_payload_parses_cowlog_headers(self): + upload = SimpleUploadedFile( + 'cowlog.txt', + b'# session\tHeader Session\n# project\tHeader Project\n# primary_video\tclip.mp4\n1.0\tEat\tNear\n', + content_type='text/plain', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'cowlog-results-v1') + self.assertEqual(payload['metadata']['session'], 'Header Session') + self.assertEqual(payload['metadata']['project'], 'Header Project') + self.assertEqual(payload['metadata']['primary_video'], 'clip.mp4') + def test_session_import_view_accepts_cowlog_text(self): upload = SimpleUploadedFile( 'cowlog.txt', @@ -89,6 +245,95 @@ def test_load_session_import_payload_supports_state_intervals_from_tabular_rows( self.assertEqual(payload['events'][0]['event_kind'], 'start') self.assertEqual(payload['events'][1]['event_kind'], 'stop') + def test_load_session_import_payload_supports_tabular_timecodes(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,stop,behavior\n00:00:05.100,00:00:08.600,Stand\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertEqual(payload['events'][0]['time'], 5.1) + self.assertEqual(payload['events'][1]['time'], 8.6) + + def test_load_session_import_payload_supports_tabular_iso8601_durations(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,stop,behavior\nPT5S,PT8.5S,Stand\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertEqual(payload['events'][0]['time'], 5.0) + self.assertEqual(payload['events'][1]['time'], 8.5) + + def test_load_session_import_payload_supports_semicolon_csv_with_comma_decimals(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time;stop;behavior\n00:00:05,100;00:00:08,600;Stand\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 5.1, places=3) + self.assertAlmostEqual(payload['events'][1]['time'], 8.6, places=3) + + def test_load_session_import_payload_supports_tabular_frame_timecodes(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,stop,behavior\n00:00:05:10,00:00:08:20,Stand\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 5.4, places=3) + self.assertAlmostEqual(payload['events'][1]['time'], 8.8, places=3) + + def test_load_session_import_payload_supports_tabular_smpte_semicolon_timecodes(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,stop,behavior\n00:00:05;10,00:00:08;20,Stand\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 5.4, places=3) + self.assertAlmostEqual(payload['events'][1]['time'], 8.8, places=3) + + def test_load_session_import_payload_supports_tabular_custom_frame_rate(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,stop,behavior,frame_rate\n00:00:05:10,00:00:08:20,Stand,50 fps\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 5.2, places=3) + self.assertAlmostEqual(payload['events'][1]['time'], 8.4, places=3) + + def test_load_session_import_payload_supports_tabular_ratio_frame_rate(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,stop,behavior,frame_rate\n00:00:05:10,00:00:08:20,Stand,30000/1001\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertAlmostEqual(payload['events'][0]['time'], 5.3336, places=3) + self.assertAlmostEqual(payload['events'][1]['time'], 8.6673, places=3) + + def test_load_session_import_payload_supports_state_duration_column(self): + upload = SimpleUploadedFile( + 'boris_rows.csv', + b'time,duration,behavior\n10.0,2.5,Stand\n', + content_type='text/csv', + ) + payload, report = load_session_import_payload(upload, self.session) + self.assertEqual(report['detected_format'], 'boris-tabular-csv-v1') + self.assertEqual(payload['events'][0]['event_kind'], ObservationEvent.KIND_START) + self.assertEqual(payload['events'][1]['event_kind'], ObservationEvent.KIND_STOP) + self.assertEqual(payload['events'][1]['time'], 12.5) + def test_session_undo_and_redo_endpoints_restore_event_state(self): response = self.client.post( reverse('tracker:event_create_api', args=[self.session.pk]), @@ -180,11 +425,35 @@ def test_export_endpoints_for_compatibility_formats(self): event_kind=ObservationEvent.KIND_POINT, timestamp_seconds=Decimal('1.000'), ) + SessionAnnotation.objects.create( + session=self.session, + timestamp_seconds=Decimal('1.500'), + title='Mark', + note='CowLog annotation line', + color='#f59e0b', + created_by=self.user, + ) + fps_definition = IndependentVariableDefinition.objects.create( + project=self.project, + label='fps', + value_type=IndependentVariableDefinition.TYPE_NUMERIC, + ) + ObservationVariableValue.objects.create( + session=self.session, + definition=fps_definition, + value='30', + ) response = self.client.get( reverse('tracker:session_export_cowlog_txt', args=[self.session.pk]) ) self.assertEqual(response.status_code, 200) self.assertIn('CowLog-compatible', response.content.decode('utf-8')) + self.assertIn('# observer\tolivier', response.content.decode('utf-8')) + self.assertIn('# fps\t30', response.content.decode('utf-8')) + self.assertIn( + '# annotation\t1.5\tMark\tCowLog annotation line', + response.content.decode('utf-8'), + ) response = self.client.get( reverse('tracker:session_export_behavioral_sequences', args=[self.session.pk]) ) diff --git a/tracker/tests/test_helpers.py b/tracker/tests/test_helpers.py index c4bf9e8..0a4e849 100644 --- a/tracker/tests/test_helpers.py +++ b/tracker/tests/test_helpers.py @@ -6,11 +6,13 @@ from tracker.models import ( Behavior, + IndependentVariableDefinition, KeyboardProfile, Modifier, ObservationEvent, ObservationSession, ObservationTemplate, + ObservationVariableValue, Project, SessionVideoLink, Subject, @@ -163,6 +165,114 @@ def test_import_session_payload_v83(self): self.assertEqual(event.subjects_display, 'Cow 1') self.assertEqual(self.session.workflow_status, 'validated') + def test_import_session_payload_accepts_newer_cowlog_schema(self): + payload = { + 'schema': 'cowlog-results-v2', + 'events': [{'behavior': 'Eat', 'event_kind': 'point', 'time': 1.5}], + 'annotations': [], + } + event_count, annotation_count = import_session_payload( + self.session, payload, clear_existing=True + ) + self.assertEqual(event_count, 1) + self.assertEqual(annotation_count, 0) + + def test_import_session_payload_applies_cowlog_metadata_to_notes(self): + fps_definition = IndependentVariableDefinition.objects.create( + project=self.project, + label='fps', + value_type=IndependentVariableDefinition.TYPE_NUMERIC, + ) + payload = { + 'schema': 'cowlog-results-v2', + 'metadata': { + 'session': 'Imported header session', + 'project': 'Imported header project', + 'primary_video': 'clip.mp4', + 'fps': '29.97 fps', + }, + 'events': [{'behavior': 'Eat', 'event_kind': 'point', 'time': 1.5}], + 'annotations': [], + } + import_session_payload(self.session, payload, clear_existing=True) + import_session_payload(self.session, payload, clear_existing=True) + self.session.refresh_from_db() + self.assertIn('Imported CowLog metadata:', self.session.notes) + self.assertIn('session: Imported header session', self.session.notes) + self.assertIn('project: Imported header project', self.session.notes) + self.assertIn('primary_video: clip.mp4', self.session.notes) + self.assertEqual(self.session.notes.count('Imported CowLog metadata:'), 1) + fps_value = ObservationVariableValue.objects.get( + session=self.session, + definition=fps_definition, + ) + self.assertEqual(fps_value.value, '29.97 fps') + + def test_import_session_payload_accepts_mapping_event_rows(self): + payload = { + 'schema': 'boris-observation-v4', + 'events': { + 'evt-1': {'behavior': 'Eat', 'event_kind': 'point', 'time': 1.5}, + 'evt-2': {'behavior': 'Stand', 'event_kind': 'start', 'time': 2.0}, + }, + 'annotations': { + 'ann-1': {'time': 2.2, 'title': 'Marker', 'note': 'mapping annotation'}, + }, + 'segments': { + 'seg-1': {'title': 'Segment 1', 'start_seconds': 0.0, 'end_seconds': 3.0}, + }, + } + event_count, annotation_count = import_session_payload( + self.session, payload, clear_existing=True + ) + self.assertEqual(event_count, 2) + self.assertEqual(annotation_count, 1) + self.assertEqual(self.session.events.count(), 2) + self.assertEqual(self.session.annotations.count(), 1) + + def test_import_session_payload_merges_multiple_boris_observations(self): + definition = IndependentVariableDefinition.objects.create( + project=self.project, + label='Temperature', + value_type=IndependentVariableDefinition.TYPE_NUMERIC, + ) + payload = { + 'schema': 'boris-observation-v4', + 'observations': [ + { + 'title': 'Obs A', + 'media_paths': ['videos/a.mp4'], + 'events': [{'behavior': 'Eat', 'event_kind': 'point', 'time': 1.0}], + 'annotations': [{'time': 1.2, 'title': 'A', 'note': 'first'}], + 'variables': {'Temperature': '18'}, + }, + { + 'title': 'Obs B', + 'media_paths': ['videos/b.mp4'], + 'events': [{'behavior': 'Stand', 'event_kind': 'start', 'time': 2.0}], + 'annotations': [{'time': 2.4, 'title': 'B', 'note': 'second'}], + 'independent_variables': {'Temperature': '19'}, + }, + ], + } + event_count, annotation_count = import_session_payload( + self.session, payload, clear_existing=True + ) + self.assertEqual(event_count, 2) + self.assertEqual(annotation_count, 2) + import_session_payload(self.session, payload, clear_existing=True) + self.assertTrue(self.session.events.filter(comment__contains='BORIS observation').exists()) + self.assertTrue(self.session.annotations.filter(title__startswith='[Obs').exists()) + self.session.refresh_from_db() + self.assertIn('Imported BORIS observations: Obs A, Obs B', self.session.notes) + self.assertIn('Imported BORIS media labels: videos/a.mp4, videos/b.mp4', self.session.notes) + self.assertEqual(self.session.notes.count('Imported BORIS observations:'), 1) + self.assertEqual(self.session.notes.count('Imported BORIS media labels:'), 1) + stored_value = ObservationVariableValue.objects.get( + session=self.session, definition=definition + ) + self.assertEqual(stored_value.value, '19') + def test_keyboard_profile_payload_and_reproducibility_bundle(self): profile = KeyboardProfile.objects.create( project=self.project, @@ -332,3 +442,27 @@ def test_import_project_payload_with_templates_and_sessions(self): ) self.assertEqual(imported_session.events.count(), 1) self.assertEqual(imported_session.annotations.count(), 1) + + def test_import_project_payload_accepts_newer_boris_schema(self): + payload = { + 'schema': 'boris-project-v4', + 'ethogram': build_ethogram_payload(self.project), + 'sessions': [], + } + summary = import_project_payload( + self.project, payload, actor=self.user, import_sessions=False + ) + self.assertIn('categories_created', summary) + + def test_import_ethogram_payload_accepts_newer_boris_observation_schema(self): + payload = { + 'schema': 'boris-observation-v4', + 'categories': [{'name': 'Core', 'color': '#222222', 'sort_order': 1}], + 'behaviors': [{'name': 'Graze', 'key_binding': 'g', 'mode': Behavior.MODE_POINT}], + } + categories, modifiers, behaviors = import_ethogram_payload( + self.project, payload, replace_existing=False + ) + self.assertEqual(categories, 1) + self.assertEqual(modifiers, 0) + self.assertEqual(behaviors, 1) diff --git a/tracker/tests/test_roundtrip.py b/tracker/tests/test_roundtrip.py index 362cded..fe6e7f9 100644 --- a/tracker/tests/test_roundtrip.py +++ b/tracker/tests/test_roundtrip.py @@ -126,3 +126,27 @@ def test_roundtrip_report_flags_mismatch(self): report = build_roundtrip_report(left, right, family='session') self.assertFalse(report['equivalent']) self.assertIn('events', report['mismatches']) + + def test_roundtrip_report_normalizes_mapping_rows_and_multiple_observations(self): + left = { + 'schema': 'boris-observation-v4', + 'observations': { + 'obs_a': {'events': {'e1': {'time': 1.0, 'behavior': 'Eat', 'event_kind': 'point'}}}, + 'obs_b': { + 'events': {'e2': {'time': 2.0, 'behavior': 'Stand', 'event_kind': 'start'}}, + 'annotations': {'a1': {'time': 2.5, 'note': 'Mark'}}, + }, + }, + } + right = { + 'schema': 'boris-observation-v4', + 'observations': [ + {'events': [{'time': 1.0, 'behavior': 'Eat', 'event_kind': 'point'}]}, + { + 'events': [{'time': 2.0, 'behavior': 'Stand', 'event_kind': 'start'}], + 'annotations': [{'time': 2.5, 'note': 'Mark'}], + }, + ], + } + report = build_roundtrip_report(left, right, family='session') + self.assertTrue(report['equivalent'], report) diff --git a/tracker/views.py b/tracker/views.py index 03ec865..1ae10d0 100644 --- a/tracker/views.py +++ b/tracker/views.py @@ -6,6 +6,7 @@ import json import math import re +import shlex import wave import zipfile from collections import defaultdict @@ -588,9 +589,58 @@ def _log_audit( ) -def _decimal(value, default: str = '0') -> Decimal: +def _decimal( + value, + default: str = '0', + *, + frame_rate: str | float | Decimal | None = None, +) -> Decimal: + if isinstance(value, Decimal): + return value + token = str(value).strip() if value is not None else '' + if not token: + return Decimal(default) + if re.fullmatch(r'\d{1,3}:\d{1,2}:\d{1,2};\d{1,3}', token): + token = token.replace(';', ':') + iso_match = re.fullmatch( + r'(?i)pt(?:(?P\d+)h)?(?:(?P\d+)m)?(?:(?P\d+(?:[.,]\d+)?)s)?', + token, + ) + if iso_match and any(iso_match.groupdict().values()): + hours = Decimal(iso_match.group('hours') or '0') + minutes = Decimal(iso_match.group('minutes') or '0') + seconds = Decimal((iso_match.group('seconds') or '0').replace(',', '.')) + return (hours * Decimal('3600')) + (minutes * Decimal('60')) + seconds + if ':' in token: + parts = [part.strip() for part in token.split(':')] + if len(parts) in {2, 3, 4} and all(part for part in parts): + try: + if len(parts) == 2: + hours = Decimal('0') + minutes = Decimal(parts[0]) + seconds = Decimal(parts[1].replace(',', '.')) + return (hours * Decimal('3600')) + (minutes * Decimal('60')) + seconds + if len(parts) == 3: + hours = Decimal(parts[0]) + minutes = Decimal(parts[1]) + seconds = Decimal(parts[2].replace(',', '.')) + return (hours * Decimal('3600')) + (minutes * Decimal('60')) + seconds + # HH:MM:SS:FF (BORIS-style frame timecode; default 25 fps) + hours = Decimal(parts[0]) + minutes = Decimal(parts[1]) + seconds = Decimal(parts[2].replace(',', '.')) + frames = Decimal(parts[3]) + fps = _normalize_frame_rate_token(frame_rate) + return ( + (hours * Decimal('3600')) + + (minutes * Decimal('60')) + + seconds + + (frames / fps) + ) + except (InvalidOperation, TypeError, ValueError): + pass try: - return Decimal(str(value)) + return Decimal(token.replace(',', '.')) except (InvalidOperation, TypeError, ValueError): return Decimal(default) @@ -623,6 +673,43 @@ def _relative_media_path(video: VideoAsset | None) -> str | None: return name or None +def _append_note_line(existing: str | None, line: str, *, max_length: int = 2000) -> str: + """Append one note line while avoiding duplicate entries.""" + current = (existing or '').strip() + candidate = (line or '').strip() + if not candidate: + return current[:max_length] + lines = [item.strip() for item in current.splitlines() if item.strip()] + if candidate in lines: + return '\n'.join(lines)[:max_length] + lines.append(candidate) + return '\n'.join(lines)[:max_length] + + +def _normalize_frame_rate_token(value: str | float | Decimal | None) -> Decimal: + """Return a positive frame rate value parsed from tokens like '29.97 fps'.""" + if value is None: + return Decimal('25') + if isinstance(value, Decimal): + return value if value > 0 else Decimal('25') + token = str(value).strip() + ratio_match = re.search( + r'(?P[-+]?\d+(?:[.,]\d+)?)\s*/\s*(?P[-+]?\d+(?:[.,]\d+)?)', token + ) + if ratio_match: + numerator = _decimal(ratio_match.group('num').replace(',', '.'), default='25') + denominator = _decimal(ratio_match.group('den').replace(',', '.'), default='1') + if denominator > 0: + ratio = numerator / denominator + if ratio > 0: + return ratio + match = re.search(r'[-+]?\d+(?:[.,]\d+)?', token) + if not match: + return Decimal('25') + parsed = _decimal(match.group(0), default='25') + return parsed if parsed > 0 else Decimal('25') + + def _resolve_storage_path(video: VideoAsset | None) -> Path | None: """Resolve a local filesystem path when the file is available on local storage.""" if video is None or not getattr(video, 'file', None): @@ -2033,22 +2120,81 @@ def parse_cowlog_results_text(session: ObservationSession, raw_text: str) -> tup lines_processed = 0 warnings: list[str] = [] events: list[dict] = [] + annotations: list[dict] = [] + metadata: dict[str, str] = {} + detected_frame_rate: str | None = None state_marker_used = False for line_number, raw_line in enumerate(raw_text.splitlines(), start=1): line = raw_line.strip() - if not line or line.startswith('#'): + if not line or line.startswith(';'): continue - parts = [ - part.strip() - for part in (line.split(' ') if ' ' in line else line.split()) - if part.strip() - ] + if line.startswith('#'): + metadata_line = line[1:].strip() + if metadata_line: + if ' ' in metadata_line: + raw_metadata_parts = metadata_line.split(' ') + else: + try: + raw_metadata_parts = shlex.split(metadata_line) + except ValueError: + raw_metadata_parts = metadata_line.split() + metadata_parts = [part.strip() for part in raw_metadata_parts if part.strip()] + if len(metadata_parts) == 1 and ':' in metadata_parts[0]: + key, value = metadata_parts[0].split(':', 1) + metadata_parts = [key.strip(), value.strip()] + if metadata_parts and metadata_parts[0].casefold() in {'note', 'annotation'}: + annotation_time = ( + _decimal(metadata_parts[1], default='NaN') + if len(metadata_parts) > 1 + else Decimal('NaN') + ) + if not annotation_time.is_nan(): + title = ( + metadata_parts[2] + if len(metadata_parts) > 2 + else _('Imported note') + ) + note = ( + ' '.join(metadata_parts[3:]) + if len(metadata_parts) > 3 + else ( + metadata_parts[2] + if len(metadata_parts) > 2 + else _('Imported from CowLog metadata') + ) + ) + annotations.append( + { + 'time': float(annotation_time), + 'title': title, + 'note': note, + 'color': '#f59e0b', + } + ) + elif metadata_parts and len(metadata_parts) > 1: + metadata_key = _normalize_import_header(metadata_parts[0]) + if metadata_key: + metadata_value = ' '.join(metadata_parts[1:]) + metadata[metadata_key] = metadata_value + if metadata_key in {'fps', 'frame_rate', 'framerate'}: + detected_frame_rate = metadata_value + continue + if ' ' in line: + raw_parts = line.split(' ') + elif ';' in line and line.count(';') >= 2: + raw_parts = line.split(';') + else: + try: + raw_parts = shlex.split(line) + except ValueError: + raw_parts = line.split() + parts = [part.strip() for part in raw_parts if part and part.strip()] if len(parts) < 2: continue - try: - timestamp = float(parts[0].replace(',', '.')) - except ValueError: + timestamp_decimal = _decimal(parts[0], default='NaN', frame_rate=detected_frame_rate) + if timestamp_decimal.is_nan(): continue + timestamp = float(timestamp_decimal) lines_processed += 1 tokens = parts[1:] behavior = behavior_lookup.get(tokens[0].casefold()) @@ -2063,8 +2209,9 @@ def parse_cowlog_results_text(session: ObservationSession, raw_text: str) -> tup subject_names: list[str] = [] for token in tokens[1:]: lowered = token.casefold() - if lowered in {'point', 'start', 'stop'}: - event_kind = lowered + resolved_kind = _resolve_event_kind_token(lowered) + if resolved_kind is not None: + event_kind = resolved_kind state_marker_used = True continue modifier = modifier_lookup.get(lowered) @@ -2094,12 +2241,16 @@ def parse_cowlog_results_text(session: ObservationSession, raw_text: str) -> tup payload = { 'schema': 'cowlog-results-v1', 'events': events, - 'annotations': [], + 'annotations': annotations, + 'metadata': metadata, } report = { 'detected_format': 'cowlog-results-v1', 'line_count': lines_processed, 'event_count': len(events), + 'annotation_count': len(annotations), + 'metadata_keys': sorted(metadata.keys()), + 'frame_rate': detected_frame_rate, 'warnings': warnings, 'state_marker_used': state_marker_used, } @@ -2134,6 +2285,8 @@ def parse_tabular_session_rows( stop_token = ( row.get('stop') or row.get('end') or row.get('stop_time') or row.get('end_time') ) + duration_token = row.get('duration') or row.get('duration_seconds') or row.get('delta') + frame_rate_token = row.get('fps') or row.get('frame_rate') or row.get('framerate') behavior_token = ( row.get('behavior') or row.get('code') @@ -2147,10 +2300,10 @@ def parse_tabular_session_rows( and note_token not in {None, ''} and time_token not in {None, ''} ): - try: - note_time = float(str(time_token).replace(',', '.')) - except ValueError: + note_time_decimal = _decimal(time_token, default='NaN', frame_rate=frame_rate_token) + if note_time_decimal.is_nan(): continue + note_time = float(note_time_decimal) annotations.append( { 'time': note_time, @@ -2162,20 +2315,27 @@ def parse_tabular_session_rows( continue if time_token in {None, ''} or behavior_token in {None, ''}: continue - try: - timestamp = float(str(time_token).replace(',', '.')) - except ValueError: + timestamp_decimal = _decimal(time_token, default='NaN', frame_rate=frame_rate_token) + if timestamp_decimal.is_nan(): warnings.append( _('Row %(row)s: invalid time value “%(value)s”.') % {'row': index, 'value': time_token} ) continue + timestamp = float(timestamp_decimal) stop_seconds = None if stop_token not in {None, ''}: - try: - stop_seconds = float(str(stop_token).replace(',', '.')) - except ValueError: + stop_seconds_decimal = _decimal(stop_token, default='NaN', frame_rate=frame_rate_token) + if stop_seconds_decimal.is_nan(): stop_seconds = None + else: + stop_seconds = float(stop_seconds_decimal) + elif duration_token not in {None, ''}: + duration_decimal = _decimal( + duration_token, default='NaN', frame_rate=frame_rate_token + ) + if not duration_decimal.is_nan() and duration_decimal >= 0: + stop_seconds = float(timestamp_decimal + duration_decimal) behavior = behavior_lookup.get(str(behavior_token).casefold()) if behavior is None: warnings.append( @@ -2292,6 +2452,10 @@ def parse_tabular_session_file( or filename.endswith('.tsv') else ',' ) + if delimiter == ',' and filename.endswith('.csv'): + first_line = text_payload.splitlines()[0] if text_payload.splitlines() else '' + if ';' in first_line and first_line.count(';') >= first_line.count(','): + delimiter = ';' reader = csv.DictReader(io.StringIO(text_payload), delimiter=delimiter) if not reader.fieldnames: raise ValueError(_('The uploaded tabular file does not contain a header row.')) @@ -2348,7 +2512,7 @@ def load_session_import_payload(uploaded_file, session: ObservationSession) -> t payload, parsed_report = parse_cowlog_results_text(session, text_payload) report.update(parsed_report) return payload, report - if ',' in first_line or ' ' in first_line: + if ',' in first_line or ';' in first_line or ' ' in first_line: payload, parsed_report = parse_tabular_session_file(session, uploaded_file, raw_bytes) report.update(parsed_report) return payload, report @@ -2506,6 +2670,21 @@ def _coerce_named_items(value, *, label_mode: bool = False) -> list[dict]: return items +def _coerce_object_rows(value) -> list[dict]: + """Accept either a list or mapping of object rows and normalize to dict list.""" + if isinstance(value, dict): + rows: list[dict] = [] + for key, item in value.items(): + if isinstance(item, dict): + normalized = dict(item) + normalized.setdefault('id', str(key)) + rows.append(normalized) + return rows + if isinstance(value, list): + return [dict(item) for item in value if isinstance(item, dict)] + return [] + + def _coerce_name_list(value) -> list[str]: """Convert list/dict/string inputs into a flat list of names for imports.""" if value is None: @@ -2563,17 +2742,67 @@ def _resolve_behavior_name(item: dict) -> str: ) +def _schema_matches(value: str | None, pattern: str) -> bool: + return bool(value and re.fullmatch(pattern, value)) + + +def _is_supported_session_schema(value: str | None) -> bool: + return any( + ( + _schema_matches(value, r'cowlog-django-v\d+-session'), + _schema_matches(value, r'pybehaviorlog-0(?:\.\d+)*-session'), + _schema_matches(value, r'cowlog-results-v\d+'), + _schema_matches(value, r'boris-tabular-(?:csv|tsv|xlsx)-v\d+'), + _schema_matches(value, r'boris-tabular-spreadsheet-v\d+'), + ) + ) + + +def _is_supported_observation_schema(value: str | None) -> bool: + return _schema_matches(value, r'boris-observation-v\d+') + + +def _is_supported_project_schema(value: str | None) -> bool: + return any( + ( + _schema_matches(value, r'boris-project-v\d+'), + _schema_matches(value, r'pybehaviorlog-0(?:\.\d+)*-bundle'), + ) + ) + + +def _is_supported_ethogram_schema(value: str | None) -> bool: + return any( + ( + _schema_matches(value, r'cowlog-django-v\d+-ethogram'), + _schema_matches(value, r'pybehaviorlog-0(?:\.\d+)*-ethogram'), + _schema_matches(value, r'boris-project-v\d+'), + _schema_matches(value, r'boris-observation-v\d+'), + ) + ) + + def _resolve_event_kind_token(value: str | None) -> str | None: token = (value or '').strip().lower().replace('_', ' ') mapping = { 'point': ObservationEvent.KIND_POINT, + 'p': ObservationEvent.KIND_POINT, + '.': ObservationEvent.KIND_POINT, + '0': ObservationEvent.KIND_POINT, 'instant': ObservationEvent.KIND_POINT, 'start': ObservationEvent.KIND_START, + 's': ObservationEvent.KIND_START, + '+': ObservationEvent.KIND_START, + '1': ObservationEvent.KIND_START, 'state start': ObservationEvent.KIND_START, 'begin': ObservationEvent.KIND_START, + 'on': ObservationEvent.KIND_START, 'stop': ObservationEvent.KIND_STOP, + '-': ObservationEvent.KIND_STOP, + '2': ObservationEvent.KIND_STOP, 'state stop': ObservationEvent.KIND_STOP, 'end': ObservationEvent.KIND_STOP, + 'off': ObservationEvent.KIND_STOP, } return mapping.get(token) @@ -2645,15 +2874,7 @@ def import_project_payload( """Import a richer BORIS-like project payload into an existing project.""" bundled_sessions = bundled_sessions or {} schema = payload.get('schema') - if schema not in { - 'boris-project-v1', - 'boris-project-v2', - 'boris-project-v3', - 'pybehaviorlog-0.8.3-bundle', - 'pybehaviorlog-0.9-bundle', - 'pybehaviorlog-0.9.1-bundle', - 'pybehaviorlog-0.9.5-bundle', - }: + if not _is_supported_project_schema(schema): raise ValueError(_('Unsupported project payload format.')) ethogram_payload = payload.get('ethogram') or payload @@ -2947,22 +3168,7 @@ def build_ethogram_payload(project: Project) -> dict: # pragma: no cover def import_ethogram_payload( project: Project, payload: dict, replace_existing: bool = False ) -> tuple[int, int, int]: # pragma: no cover - if payload.get('schema') not in { - 'cowlog-django-v3-ethogram', - 'cowlog-django-v4-ethogram', - 'cowlog-django-v5-ethogram', - 'pybehaviorlog-0.8-ethogram', - 'pybehaviorlog-0.8.3-ethogram', - 'pybehaviorlog-0.9-ethogram', - 'pybehaviorlog-0.9.1-ethogram', - 'pybehaviorlog-0.9.5-ethogram', - 'boris-project-v1', - 'boris-project-v2', - 'boris-project-v3', - 'boris-observation-v1', - 'boris-observation-v2', - 'boris-observation-v3', - }: + if not _is_supported_ethogram_schema(payload.get('schema')): raise ValueError('Unsupported JSON schema.') if replace_existing and ( @@ -3252,30 +3458,16 @@ def import_session_payload( annotation_items: list[dict] = [] segment_items: list[dict] = [] variable_items = payload.get('variables', {}) or payload.get('independent_variables', {}) or {} + metadata_items = payload.get('metadata') if isinstance(payload.get('metadata'), dict) else {} - if payload.get('schema') in { - 'cowlog-django-v5-session', - 'pybehaviorlog-v6-session', - 'pybehaviorlog-0.8-session', - 'pybehaviorlog-0.8.3-session', - 'pybehaviorlog-0.9-session', - 'pybehaviorlog-0.9.1-session', - 'pybehaviorlog-0.9.5-session', - 'cowlog-results-v1', - 'boris-tabular-csv-v1', - 'boris-tabular-tsv-v1', - 'boris-tabular-xlsx-v1', + if _is_supported_session_schema(payload.get('schema')) or payload.get('schema') in { + 'pybehaviorlog-v6-session' }: - event_items = payload.get('events', []) - annotation_items = payload.get('annotations', []) - segment_items = payload.get('segments', []) + event_items = _coerce_object_rows(payload.get('events')) + annotation_items = _coerce_object_rows(payload.get('annotations')) + segment_items = _coerce_object_rows(payload.get('segments')) elif ( - payload.get('schema') - in { - 'boris-observation-v1', - 'boris-observation-v2', - 'boris-observation-v3', - } + _is_supported_observation_schema(payload.get('schema')) or payload.get('observations') or payload.get('events') ): @@ -3283,16 +3475,84 @@ def import_session_payload( if isinstance(observations, dict): observations = list(observations.values()) if observations: - first = observations[0] - event_items = first.get('events', []) - annotation_items = first.get('annotations', []) - segment_items = first.get('segments', []) - if isinstance(first.get('variables'), dict): - variable_items = first.get('variables') + merged_variables: dict = {} + multi_observation = len(observations) > 1 + imported_observation_labels: list[str] = [] + imported_media_labels: list[str] = [] + for observation in observations: + if not isinstance(observation, dict): + continue + observation_label = ( + observation.get('title') + or observation.get('description') + or observation.get('id') + or observation.get('observation_id') + or '' + ) + if observation_label: + imported_observation_labels.append(str(observation_label)) + imported_media_labels.extend(_extract_media_labels(observation)) + observation_events = [] + observation_event_rows = _coerce_object_rows(observation.get('events')) + for raw_event in observation_event_rows: + event_item = dict(raw_event) if isinstance(raw_event, dict) else raw_event + if ( + multi_observation + and isinstance(event_item, dict) + and observation_label + ): + existing_comment = ( + event_item.get('comment') + or event_item.get('note') + or event_item.get('remarks') + or '' + ) + event_item['comment'] = ( + f'[BORIS observation: {observation_label}] {existing_comment}'.strip() + ) + observation_events.append(event_item) + observation_annotations = [] + observation_annotation_rows = _coerce_object_rows(observation.get('annotations')) + for raw_annotation in observation_annotation_rows: + annotation_item = ( + dict(raw_annotation) if isinstance(raw_annotation, dict) else raw_annotation + ) + if ( + multi_observation + and isinstance(annotation_item, dict) + and observation_label + ): + annotation_item['title'] = ( + f'[{observation_label}] {annotation_item.get("title") or "Note"}' + )[:120] + observation_annotations.append(annotation_item) + event_items.extend(observation_events) + annotation_items.extend(observation_annotations) + segment_items.extend(_coerce_object_rows(observation.get('segments'))) + if isinstance(observation.get('variables'), dict): + merged_variables.update(observation.get('variables')) + if isinstance(observation.get('independent_variables'), dict): + merged_variables.update(observation.get('independent_variables')) + if merged_variables: + variable_items = merged_variables + if imported_observation_labels: + unique_labels = ', '.join(dict.fromkeys(imported_observation_labels)) + session.notes = _append_note_line( + session.notes, + str(_('Imported BORIS observations: %(labels)s') % {'labels': unique_labels}), + ) + if imported_media_labels: + unique_media = ', '.join(dict.fromkeys(imported_media_labels)) + session.notes = _append_note_line( + session.notes, + str(_('Imported BORIS media labels: %(labels)s') % {'labels': unique_media}), + ) + if imported_observation_labels or imported_media_labels: + session.save(update_fields=['notes']) else: - event_items = payload.get('events', []) - annotation_items = payload.get('annotations', []) - segment_items = payload.get('segments', []) + event_items = _coerce_object_rows(payload.get('events')) + annotation_items = _coerce_object_rows(payload.get('annotations')) + segment_items = _coerce_object_rows(payload.get('segments')) else: raise ValueError(_('Unsupported session payload format.')) @@ -3312,6 +3572,7 @@ def import_session_payload( timestamp_seconds=_decimal( item.get('timestamp_seconds', item.get('time', item.get('timestamp'))), default='0', + frame_rate=item.get('fps') or item.get('frame_rate') or item.get('framerate'), ), frame_index=item.get('frame_index') or item.get('frame') or None, comment=(item.get('comment') or item.get('note') or item.get('remarks') or '').strip(), @@ -3355,6 +3616,40 @@ def import_session_payload( session.review_notes = payload.get('review_notes', session.review_notes or '') session.save(update_fields=['workflow_status', 'review_notes']) + if _schema_matches(payload.get('schema'), r'cowlog-results-v\d+') and metadata_items: + metadata_notes = [] + for key in ('session', 'project', 'primary_video', 'observer'): + value = str(metadata_items.get(key, '')).strip() + if value: + metadata_notes.append(f'{key}: {value}') + fps_metadata = None + for key in ('fps', 'frame_rate', 'framerate'): + value = str(metadata_items.get(key, '')).strip() + if value: + fps_metadata = value + break + if fps_metadata: + fps_definition = next( + ( + definition + for label, definition in variable_map.items() + if label.casefold().replace(' ', '_') in {'fps', 'frame_rate', 'framerate'} + ), + None, + ) + if fps_definition is not None: + ObservationVariableValue.objects.update_or_create( + session=session, + definition=fps_definition, + defaults={'value': fps_metadata}, + ) + if metadata_notes: + session.notes = _append_note_line( + session.notes, + str(_('Imported CowLog metadata: %(items)s') % {'items': '; '.join(metadata_notes)}), + ) + session.save(update_fields=['notes']) + for raw_item in annotation_items: item = dict(raw_item) if isinstance(raw_item, dict) else {} SessionAnnotation.objects.create( @@ -3362,6 +3657,7 @@ def import_session_payload( timestamp_seconds=_decimal( item.get('timestamp_seconds', item.get('time', item.get('timestamp'))), default='0', + frame_rate=item.get('fps') or item.get('frame_rate') or item.get('framerate'), ), title=(item.get('title') or 'Note').strip()[:120] or 'Note', note=(item.get('note') or item.get('comment') or item.get('text') or '').strip(), @@ -5534,10 +5830,34 @@ def session_export_cowlog_txt(request, pk: int): # pragma: no cover response.write('# PyBehaviorLog 0.9.5 CowLog-compatible export\n') response.write(f'# session\t{session.title}\n') response.write(f'# project\t{session.project.name}\n') + response.write(f'# observer\t{session.observer.username if session.observer else ""}\n') response.write(f'# primary_video\t{session.primary_label}\n') + fps_value = ( + ObservationVariableValue.objects.filter( + session=session, + definition__label__iregex=r'^(fps|frame[_ ]?rate|framerate)$', + ) + .order_by('definition__sort_order', 'definition__label') + .values_list('value', flat=True) + .first() + ) + if fps_value: + response.write(f'# fps\t{fps_value}\n') report = build_session_compatibility_report(session) for warning in report['cowlog']['warnings']: response.write(f'# warning\t{warning}\n') + for annotation in session.annotations.all().order_by('timestamp_seconds', 'pk'): + response.write( + '# annotation\t' + + '\t'.join( + [ + _format_seconds_token(annotation.timestamp_seconds), + (annotation.title or 'Note').replace('\t', ' '), + (annotation.note or '').replace('\t', ' ').replace('\n', ' '), + ] + ) + + '\n' + ) for event in session.events.all().order_by('timestamp_seconds', 'pk'): row = [ _format_seconds_token(event.timestamp_seconds),