Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions dev_utils/dev_utils/service_helper_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import threading
import queue
import copy
#NEW
import json
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -54,6 +56,26 @@ def get_message_source_from_event(event):
"""
return event.message.annotations["iothub-message-source".encode()].decode()

#NEW
def _reported_properties_match(actual, expected):
if isinstance(expected, dict):
if not isinstance(actual, dict):
return False

for key, expected_value in expected.items():
if expected_value is None:
if key in actual and actual[key] is not None:
return False
else:
if key not in actual:
return False
if not _reported_properties_match(actual[key], expected_value):
return False

return True

return actual == expected


class EventhubEvent(object):
def __init__(self):
Expand Down Expand Up @@ -126,6 +148,36 @@ def set_desired_properties(self, desired_props):
self.device_id, Twin(properties=TwinProperties(desired=desired_props)), "*"
)

#NEW
def get_service_twin(self):
if self.module_id:
return self._registry_manager.get_module_twin(self.device_id, self.module_id)
else:
return self._registry_manager.get_twin(self.device_id)

#NEW
def wait_for_reported_properties(self, expected_patch, timeout=240, poll_interval=1):
end_time = time.time() + timeout
last_reported = None

while time.time() < end_time:
twin = self.get_service_twin()
reported = ((twin.properties and twin.properties.reported) or {}).copy()
last_reported = reported

if _reported_properties_match(reported, expected_patch):
return reported

time.sleep(poll_interval)

raise Exception(
"reported properties did not match expected patch within {} seconds. expected={}, actual={}".format(
timeout,
json.dumps(expected_patch, sort_keys=True),
json.dumps(last_reported, sort_keys=True),
)
)

def invoke_method(
self,
method_name,
Expand Down
37 changes: 26 additions & 11 deletions tests/e2e/iothub_e2e/sync/test_sync_twin.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ def test_sync_patch_reported_connect_if_necessary(
@pytest.mark.dropped_connection
@pytest.mark.describe("Client Reported Properties with dropped connection")
@pytest.mark.keep_alive(5)
@pytest.mark.skip(reason="Disabling as tests are failing. Needs investigation.")
#@pytest.mark.skip(reason="Disabling as tests are failing. Needs investigation.")
class TestReportedPropertiesDroppedConnection(object):

# TODO: split drop tests between first and second patches

@pytest.mark.it("Updates reported properties if connection drops before sending")
# def test_sync_updates_reported_if_drop_before_sending(
# self, client, random_reported_props, dropper, executor, leak_tracker
# ):
#NEW
def test_sync_updates_reported_if_drop_before_sending(
self, client, random_reported_props, dropper, service_helper, executor, leak_tracker
):
Expand All @@ -135,16 +139,23 @@ def test_sync_updates_reported_if_drop_before_sending(

send_task.result()

received_patch = service_helper.get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT]
== random_reported_props[const.TEST_CONTENT]
)
# received_patch = service_helper.get_next_reported_patch_arrival()
# assert (
# received_patch[const.REPORTED][const.TEST_CONTENT]
# == random_reported_props[const.TEST_CONTENT]
# )
#NEW
reported = service_helper.wait_for_reported_properties(random_reported_props)
assert reported[const.TEST_CONTENT] == random_reported_props[const.TEST_CONTENT]

# TODO: investigate leak
# leak_tracker.check_for_leaks()

@pytest.mark.it("Updates reported properties if connection rejects send")
# def test_sync_updates_reported_if_reject_before_sending(
# self, client, random_reported_props, dropper, executor, leak_tracker
# ):
#NEW
def test_sync_updates_reported_if_reject_before_sending(
self, client, random_reported_props, dropper, service_helper, executor, leak_tracker
):
Expand All @@ -165,11 +176,14 @@ def test_sync_updates_reported_if_reject_before_sending(

send_task.result()

received_patch = service_helper.get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT]
== random_reported_props[const.TEST_CONTENT]
)
# received_patch = service_helper.get_next_reported_patch_arrival()
# assert (
# received_patch[const.REPORTED][const.TEST_CONTENT]
# == random_reported_props[const.TEST_CONTENT]
# )
#NEW
reported = service_helper.wait_for_reported_properties(random_reported_props)
assert reported[const.TEST_CONTENT] == random_reported_props[const.TEST_CONTENT]

# TODO: investigate leak
# leak_tracker.check_for_leaks()
Expand All @@ -178,6 +192,7 @@ def test_sync_updates_reported_if_reject_before_sending(
@pytest.mark.describe("Client Desired Properties")
@pytest.mark.skip(reason="Disabling as tests are failing. Needs investigation.")
class TestDesiredProperties(object):

@pytest.mark.it("Receives a patch for a simple desired property")
@pytest.mark.quicktest_suite
def test_sync_receives_simple_desired_patch(self, client, service_helper, leak_tracker):
Expand Down
Loading