OpenTelemetry interoperability in Python
OpenTelemetry interoperability connects the Dynatrace AWS Lambda extension to the OpenTelemetry Python instrumentation to use the instrumentation packages and extensions. You can then monitor technologies like databases or messaging frameworks that aren't supported by Dynatrace AWS Lambda extension out of the box.
Before you start
-
Ensure that OpenTelemetry interoperability is enabled.
-
Verify that the installed OpenTelemetry Python API version is compatible with the Dynatrace AWS Lambda extension. The following table lists the compatible versions:
OneAgent version Maximum OpenTelemetry API version 1.233+ 1.7.x 1.235+ 1.8.x 1.239+ 1.9.x 1.243+ 1.11.x 1.249+ 1.12.x 1.253+ 1.13.x 1.257+ 1.14.x 1.259+ 1.15.x 1.265+ 1.17.x 1.269+ 1.18.x 1.273+ 1.19.x 1.277+ 1.20.x
Use OpenTelemetry Python instrumentation
OpenTelemetry for Python provides several instrumentation packages in their OpenTelemetry Python contributions repository.
The following code example shows how to instrument PostgreSQL calls to a Python Lambda function by using the aiopg instrumentation package.
1import json2import aiopg34from opentelemetry.instrumentation.aiopg import AiopgInstrumentor56AiopgInstrumentor().instrument()789def lambda_handler(event, context):10 return {11 'statusCode': 200,12 'body': json.dumps(execute_query())13 }1415def execute_query():16 result = []17 with aiopg.connect(database='my_db') as conn:18 with conn.cursor() as cur:19 cur.execute("SELECT 'hello db';")20 for row in cur:21 result.append(row)2223 return result
To instrument boto3, the AWS SDK for Python, OpenTelemetry provides the botocore instrumentation package.
The code example below shows how the botocore
instrumentation can be used to add observability for calls to a DynamoDB database (Dynatrace version 1.244+).
1import boto32import json34from opentelemetry.instrumentation.botocore import BotocoreInstrumentor56BotocoreInstrumentor().instrument()78dynamodb = boto3.resource('dynamodb')9table = dynamodb.Table('MyTable')101112def lambda_handler(event, handler):13 result = table.get_item(Key={'mykey': 42})1415 return {16 "statusCode": 200,17 "answer": json.dumps(result.get("Item"))18 }
Use OpenTelemetry Python API
OpenTelemetry Python can be used in an SDK-like approach to trace additional operations that aren't covered by an instrumentation package.
1import json2from opentelemetry import trace34def lambda_handler(event, context):5 tracer = trace.get_tracer(__name__)67 with tracer.start_as_current_span("do work"):8 # do work9 with tracer.start_as_current_span("do some more work") as span:10 span.set_attribute("foo", "bar")11 # do some more work1213 return {14 'statusCode': 200,15 'body': json.dumps('Hello from Hello world from OpenTelemetry Python!')16 }
These spans are displayed on the Code level tab.
Trace AWS SQS and SNS messages with Python
OneAgent version 1.253+ for SQS OneAgent version 1.257+ for SNS
You can use open-source instrumentation packages to trace AWS SQS and SNS messages and collect them via the Dynatrace AWS Lambda extension.
Install the required dependencies
1pip install -U "opentelemetry-api>=1.12" "opentelemetry-instrumentation-boto3sqs>=0.34b0"
At this point, opentelemetry-instrumentation-boto3sqs
 is a separate package from opentelemetry-instrumentation-botocore
. The latter instruments all AWS SDK calls, but lacks enhanced support for SQS.
If you install the dependencies into a Lambda function or layer, you can use the -t
option to specify a target directory where the installed packages should be copied.
Send an SQS/SNS message
The boto3 package is available out of the box if the code runs in AWS Lambda, but you can also install it using pip install -U boto3
.
This code defining a function named lambda_handler
can be used
-
Inside AWS Lambda (we recommend monitoring it with our AWS Lambda layer)
-
Outside AWS Lambda (monitoring is performed with OpenTelemetry and exported to Dynatrace via OTLP/HTTP ingest)
You might want to remove the function parameters and return value.
1from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor23Boto3SQSInstrumentor().instrument()45import json6import boto37from datetime import datetime89QUEUE_URL = "<Your SQS Queue URL>"1011sqs = boto3.client("sqs")1213def lambda_handler(event, context):14 sent = []15 for i in range(5):16 res = sqs.send_message(QueueUrl=QUEUE_URL, MessageBody=f"hello #{i} at {datetime.now()}")17 sent.append(res["MessageId"])1819 return {20 "statusCode": 200,21 "body": json.dumps({"produced_messages": sent})22 }
Receive an SQS/SNS message
You can trace SQS messages forwarded from
-
An SQS topic
Receiving messages works out of the box when you use an AWS Lambda with an SQS trigger monitored with the Dynatrace AWS Lambda extension. Because a span can have only a single parent, if your Lambda function receives a batch of multiple messages, you need to manually create spans to process every single message if you want to track them separately and have them linked to the sender.
To configure the Dynatrace AWS Lambda extension to allow setting parent spans manually:
-
For the environment variables configuration method, set the
DT_OPEN_TELEMETRY_ALLOW_EXPLICIT_PARENT
environment variable totrue
:1DT_OPEN_TELEMETRY_ALLOW_EXPLICIT_PARENT=true -
For the JSON file configuration method, in
dtconfig.json
, set the following field totrue
:1{2 ...other configuration properties...3 "OpenTelemetry": {4 "AllowExplicitParent": "true"5 }6}
Then new spans can be created with the parent span extracted from each received SQS message.
If you invoke the sender and have deployed the example, it will be invoked automatically by SQS.
1from pprint import pformat23import boto34import json5from opentelemetry import trace, propagate6from opentelemetry.semconv.trace import SpanAttributes, MessagingOperationValues78tracer = trace.get_tracer("lambda-sqs-triggered")910def lambda_handler(event, context):11 recvcount = 012 print("Trigger", pformat(event))13 messages = event.get("Records") or ()14 # Lambda SQS event uses lowerCamelCase in its attribute names15 for msg in messages:16 recvcount += 117 print("Processing", msg["messageId"])18 parent = _extract_parent(msg, from_sns_payload=False)19 with tracer.start_as_current_span("manual-trigger-process", context=parent, kind=trace.SpanKind.CONSUMER, attributes={20 SpanAttributes.MESSAGING_MESSAGE_ID : msg["messageId"],21 SpanAttributes.MESSAGING_URL : msg["eventSourceARN"],22 SpanAttributes.MESSAGING_SYSTEM : msg["eventSource"],23 SpanAttributes.MESSAGING_OPERATION : MessagingOperationValues.PROCESS.value,24 }):25 # ... Here your actual processing would go...26 pass27 print("Processed", recvcount, "messages")2829def _extract_parent(msg, from_sns_payload=False):30 if from_sns_payload:31 try:32 body = json.loads(msg.get("body", "{}"))33 except json.JSONDecodeError:34 body = {}35 carrier = {key: value["Value"] for key, value in body.get("MessageAttributes", {}).items() if "Value" in value}36 else:37 carrier = {key: value["stringValue"] for key, value in msg.get("messageAttributes", {}).items() if "stringValue" in value}38 return propagate.extract(carrier)The resulting path looks as follows:
The invoked Lambda function is a child of one of the messages by which it's triggered. Since there can only be one parent, the other manual-trigger–process spans aren't linked directly to the Lambda invocation in which they are handled. Often, there's more than one Lambda invocation node for a batch of messages. In those cases, AWS distributed the batch over multiple Lambda invocations. This can happen even if the messages are delivered within your configured batch window time and number less than your configured batch size.
If you have deployed the example that uses the
receive
API in code, you need to invoke it manually and it will attempt to read all messages from the queue.This example uses the boto3sqs instrumentation. If you don't want to use it, you need to uncomment the
MessageAttributeNames
argument in thereceive_message
function, otherwise, SQS will omit data required to link the message to its sender from the retrieved data.This code can also be used outside a Lambda function and monitored with OpenTelemetry without the Dynatrace AWS Lambda extension.
1from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor23Boto3SQSInstrumentor().instrument()45from pprint import pformat67import boto38import json9from opentelemetry import trace, propagate10from opentelemetry.semconv.trace import SpanAttributes, MessagingOperationValues1112QUEUE_URL = '<Your SQS Queue URL>'13sqs = boto3.client("sqs")14tracer = trace.get_tracer("lambda-receive-function")1516def lambda_handler(event, context):17 recvcount = 018 while True:19 msg_receive_result = sqs.receive_message(20 MaxNumberOfMessages=10,21 QueueUrl=QUEUE_URL,22 WaitTimeSeconds=1, # WaitTime of zero would use sampled receive, may return empty even if there is a message2324 # This argument is only required if you do not use the boto3sqs instrumentation:25 #MessageAttributeNames=list(propagate.get_global_textmap().fields)26 )27 print("Received", pformat(msg_receive_result))28 if not msg_receive_result.get('Messages'):29 break30 messages = msg_receive_result.get("Messages")3132 # receive result uses PascalCase in its attribute names33 for msg in messages:34 recvcount += 135 print("Processing", msg["MessageId"])36 parent = _extract_parent(msg, from_sns_payload=False)37 with tracer.start_as_current_span("manual-receive-process", context=parent, kind=trace.SpanKind.CONSUMER, attributes={38 SpanAttributes.MESSAGING_MESSAGE_ID: msg["MessageId"],39 SpanAttributes.MESSAGING_URL: QUEUE_URL,40 SpanAttributes.MESSAGING_SYSTEM: "aws.sqs",41 SpanAttributes.MESSAGING_OPERATION: MessagingOperationValues.PROCESS.value,42 }):43 # ... Here your actual processing would go...4445 print("Delete result", sqs.delete_message(46 QueueUrl=QUEUE_URL,47 ReceiptHandle=msg['ReceiptHandle'],48 ))49 print("Processed", recvcount, "messages")5051def _extract_parent(msg, from_sns_payload=False):52 if from_sns_payload:53 try:54 body = json.loads(msg.get("Body", "{}"))55 except json.JSONDecodeError:56 body = {}57 carrier = {key: value["Value"] for key, value in body.get("MessageAttributes", {}).items() if "Value" in value}58 else:59 carrier = {key: value["StringValue"] for key, value in msg.get("MessageAttributes", {}).items() if "StringValue" in value}6061 return propagate.extract(carrier)Creating the
manual-receive-process
span manually is necessary because the boto3sqs instrumentation doesn't set the sender as a parent for the processing span, but uses OpenTelemetry links, which are currently not supported by Dynatrace. For themanual-receive-process
span linking to work correctly, you need to configure the Dynatrace AWS Lambda extension to allow setting parent spans manually. See the previous example for guidance.Invoking first the code that sends SQS messages, then the manual receive code, deployed as Lambda functions, results in two traces:
-
The first trace shows the flow of the message from the sender to the processor:
There are additional
Requests to public networks
nodes because the boto3 package uses HTTP requests to send SQS messages, which are captured by Dynatrace HTTP instrumentation.You'll notice that the invocation and receive node of the second Lambda invocation are missing from this trace, even though the
manual-receive-process
nodes are there. This is because the Lambda function was triggered independently of the message flow, and just happened to receive the message as part of its handler code. -
The second trace in Dynatrace shows the Lambda invocation until it's cut in two by setting the explicit parent:
-
-
An SNS topic
For SNS messages that are forwarded to SQS, the message format depends on the raw message delivery configuration on the SNS subscription.
Raw message delivery Message format Example Enabled
The SNS message attributes are converted to SQS message attributes and the parent can be directly extracted from the
MessageAttributes
of the SQS message.Disabled
The SNS message and its
MessageAttributes
are delivered as a serialized JSON string in the body of the received SQS message. To correctly link the receive span, the parent needs to be extracted from theMessageAttributes
of the serialized SNS message.Additional configuration is required for both examples; when calling the
_extract_parent
method, set the value of thefrom_sns_payload
parameter toTrue
.