Skip to content

Bedrock integration backup#677

Open
vaquarkhan wants to merge 3 commits intoapache:mainfrom
vaquarkhan:bedrock-integration-backup
Open

Bedrock integration backup#677
vaquarkhan wants to merge 3 commits intoapache:mainfrom
vaquarkhan:bedrock-integration-backup

Conversation

@vaquarkhan
Copy link
Copy Markdown
Contributor

[Short description explaining the high-level reason for the pull request]

Changes

How I tested this

Notes

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

@vaquarkhan vaquarkhan force-pushed the bedrock-integration-backup branch from 995a1ba to f593ee3 Compare March 16, 2026 08:28
Copy link
Copy Markdown
Collaborator

@andreahlert andreahlert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two features in one PR, would've been easier to review split. Bedrock side is straightforward, the SQS consumer needs work.

if s3_key and s3_key.endswith(".jsonl"):
await self._handle_s3_event(s3_key, event_time)

await sqs_client.delete_message(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s3_key = None and event_time = None are dead. They're assigned here but never read, the actual values come from s3_keys_with_times on line 855.


async def indexing_jobs(
self, offset: int = 0, limit: int = 100, filter_empty: bool = True
) -> Sequence[schema.IndexingJob]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocker: if _handle_s3_event raises on the Nth record of a multi-record SQS message, records 0..N-1 get re-indexed on retry with no dedup. Make _handle_s3_event idempotent (check if s3_path exists in LogFile before insert).

Comment thread burr/integrations/bedrock.py Outdated
self._name = name
self._region = region
self._guardrail_id = guardrail_id
self._guardrail_version = guardrail_version or "DRAFT"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guardrail_version or "DRAFT" silently defaults to DRAFT when only guardrail_id is set. Is that intentional? Feels risky for prod, someone could set a guardrail ID and not realize they're running against an unpublished draft. Same at line 191.

Comment thread burr/integrations/bedrock.py Outdated
return result, new_state


class BedrockStreamingAction(StreamingAction):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO BedrockStreamingAction is a copy-paste of BedrockAction. __init__, _get_client, reads/writes/name properties are all identical, ~70 lines duplicated. Pull them into a _BedrockBase and let each subclass just implement its execution method.

Comment thread burr/integrations/bedrock.py Outdated
self._region = region
self._guardrail_id = guardrail_id
self._guardrail_version = guardrail_version or "DRAFT"
self._inference_config = inference_config or {"maxTokens": 4096}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a small nit: inference_config or {"maxTokens": 4096} means inference_config={} gives you the default because empty dict is falsy. Use if inference_config is not None if you want to allow empty configs.

Comment thread .github/workflows/python-package.yml Outdated
- name: Install dependencies
run: |
python -m pip install -e ".[tests,tracking-client,graphviz]"
python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking-server-s3 in the main CI install and apache-burr[bedrock] in [tests] (pyproject.toml:101) means every contributor now pulls boto3, aiobotocore, tortoise-orm, aerich even for unrelated PRs. Keep AWS deps in a separate CI job and test group, like the existing test-persister-dbs pattern.

"""
if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not self._sqs_queue_url:
logger.info("Event consumer not configured, skipping")
return
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the data/{project}/... prefix structure guaranteed for all event sources? WDYT about getting the project name from data_file.prefix (you already parse it via DataFile.from_path one line above) instead of a raw split?

response = await sqs_client.receive_message(
QueueUrl=self._sqs_queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=self._sqs_wait_time_seconds,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, auto-creating the project from the S3 event is a good call for the event-driven flow.

})
}

resource "aws_sqs_queue_redrive_policy" "main" {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the DLQ + redrive policy setup, clean separation into modules.

@skrawcz skrawcz mentioned this pull request Mar 28, 2026
7 tasks
@andreahlert andreahlert added kind/feature Net-new functionality area/integrations External integrations (LLMs, frameworks) labels Mar 30, 2026
@github-actions github-actions bot added area/storage Persisters, state storage area/tracking Telemetry, tracing, OpenTelemetry area/ci Workflows, build, release scripts area/examples Relates to /examples pr/needs-rebase Conflicts with main and removed pr/needs-rebase Conflicts with main labels Apr 11, 2026
@vaquarkhan vaquarkhan force-pushed the bedrock-integration-backup branch from 4c66bf7 to 0499dec Compare April 12, 2026 04:48
@github-actions github-actions bot added the area/website burr.apache.org website label Apr 12, 2026
@vaquarkhan
Copy link
Copy Markdown
Contributor Author

PR ready for review , also The documentation GitHub Actions workflow reports “No jobs were run” / failed at startup on commit .
https://github.com/apache/burr/actions/runs/24301015848

Could a committer confirm whether workflow approval is needed for fork PRs, or if there’s an infra issue with that workflow?

@vaquarkhan
Copy link
Copy Markdown
Contributor Author

@andreahlert @skrawcz plz review bedrock changes we moved out to reduce scope of previous PR

Copy link
Copy Markdown
Contributor

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look good. Can we get an example under /examples using the two actions? (and also fix the pre-commit please)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/ci Workflows, build, release scripts area/examples Relates to /examples area/integrations External integrations (LLMs, frameworks) area/storage Persisters, state storage area/tracking Telemetry, tracing, OpenTelemetry area/website burr.apache.org website kind/feature Net-new functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants