-
Notifications
You must be signed in to change notification settings - Fork 504
Expand file tree
/
Copy path_llmobs.py
More file actions
2972 lines (2663 loc) · 135 KB
/
_llmobs.py
File metadata and controls
2972 lines (2663 loc) · 135 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import csv
from dataclasses import dataclass
from dataclasses import field
import inspect
import json
import math
import os
import sys
import time
from typing import Any
from typing import Callable
from typing import Literal
from typing import Optional
from typing import Sequence
from typing import Union
from typing import cast
import urllib.parse
import ddtrace
from ddtrace import config
from ddtrace import patch
from ddtrace._trace.apm_filter import APMTracingEnabledFilter
from ddtrace._trace.context import Context
from ddtrace._trace.span import Span
from ddtrace._trace.tracer import Tracer
from ddtrace.constants import ERROR_MSG
from ddtrace.constants import ERROR_STACK
from ddtrace.constants import ERROR_TYPE
from ddtrace.ext import SpanTypes
from ddtrace.internal import atexit
from ddtrace.internal import core
from ddtrace.internal import forksafe
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.logger import get_logger
from ddtrace.internal.native import generate_128bit_trace_id
from ddtrace.internal.native import rand64bits
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.service import Service
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.telemetry import get_config as _get_config
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.threads import RLock
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import format_trace_id
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.llmobs import _constants as constants
from ddtrace.llmobs import _telemetry as telemetry
from ddtrace.llmobs._constants import AGENT_MANIFEST
from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID
from ddtrace.llmobs._constants import DECORATOR
from ddtrace.llmobs._constants import DEFAULT_PROJECT_NAME
from ddtrace.llmobs._constants import DEFAULT_PROMPTS_CACHE_TTL
from ddtrace.llmobs._constants import DEFAULT_PROMPTS_TIMEOUT
from ddtrace.llmobs._constants import DISPATCH_ON_GUARDRAIL_SPAN_START
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_SPAN_FINISH
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_TOOL_CHOICE
from ddtrace.llmobs._constants import DISPATCH_ON_OPENAI_AGENT_SPAN_FINISH
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL_OUTPUT_USED
from ddtrace.llmobs._constants import EXPERIMENT_CONFIG
from ddtrace.llmobs._constants import EXPERIMENT_CSV_FIELD_MAX_SIZE
from ddtrace.llmobs._constants import EXPERIMENT_DATASET_NAME_KEY
from ddtrace.llmobs._constants import EXPERIMENT_EXPECTED_OUTPUT
from ddtrace.llmobs._constants import EXPERIMENT_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_NAME_KEY
from ddtrace.llmobs._constants import EXPERIMENT_PROJECT_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_PROJECT_NAME_KEY
from ddtrace.llmobs._constants import EXPERIMENT_RUN_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_RUN_ITERATION_KEY
from ddtrace.llmobs._constants import EXPERIMENTS_INPUT
from ddtrace.llmobs._constants import EXPERIMENTS_OUTPUT
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_PROMPT
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import INSTRUMENTATION_METHOD_ANNOTATED
from ddtrace.llmobs._constants import INTEGRATION
from ddtrace.llmobs._constants import LLMOBS_STRUCT
from ddtrace.llmobs._constants import LLMOBS_TRACE_ID
from ddtrace.llmobs._constants import MCP_TOOL_CALL_INTENT
from ddtrace.llmobs._constants import METADATA
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import ML_APP
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_DOCUMENTS
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROMPT_TRACKING_INSTRUMENTATION_METHOD
from ddtrace.llmobs._constants import PROPAGATED_LLMOBS_TRACE_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_ML_APP_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._constants import ROOT_PARENT_ID
from ddtrace.llmobs._constants import SESSION_ID
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_LINKS
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._constants import TOOL_DEFINITIONS
from ddtrace.llmobs._context import LLMObsContextProvider
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._experiment import AsyncEvaluatorType
from ddtrace.llmobs._experiment import AsyncSummaryEvaluatorType
from ddtrace.llmobs._experiment import AsyncTaskType
from ddtrace.llmobs._experiment import BaseAsyncEvaluator
from ddtrace.llmobs._experiment import BaseAsyncSummaryEvaluator
from ddtrace.llmobs._experiment import BaseEvaluator
from ddtrace.llmobs._experiment import BaseSummaryEvaluator
from ddtrace.llmobs._experiment import ConfigType
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import DatasetRecordInputType
from ddtrace.llmobs._experiment import EvaluatorType
from ddtrace.llmobs._experiment import Experiment
from ddtrace.llmobs._experiment import ExperimentResult
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._experiment import Project
from ddtrace.llmobs._experiment import SummaryEvaluatorType
from ddtrace.llmobs._experiment import SyncExperiment
from ddtrace.llmobs._experiment import TaskType
from ddtrace.llmobs._experiment import _deep_eval_async_evaluator_wrapper
from ddtrace.llmobs._experiment import _deep_eval_evaluator_wrapper
from ddtrace.llmobs._experiment import _get_base_url
from ddtrace.llmobs._experiment import _is_deep_eval_evaluator
from ddtrace.llmobs._experiment import _is_pydantic_evaluator
from ddtrace.llmobs._experiment import _pydantic_async_evaluator_wrapper
from ddtrace.llmobs._experiment import _pydantic_evaluator_wrapper
from ddtrace.llmobs._prompt_optimization import PromptOptimization
from ddtrace.llmobs._prompt_optimization import validate_dataset
from ddtrace.llmobs._prompt_optimization import validate_dataset_split
from ddtrace.llmobs._prompt_optimization import validate_evaluators
from ddtrace.llmobs._prompt_optimization import validate_optimization_task
from ddtrace.llmobs._prompt_optimization import validate_task
from ddtrace.llmobs._prompt_optimization import validate_test_dataset
from ddtrace.llmobs._prompts import ManagedPrompt
from ddtrace.llmobs._prompts.cache import WarmCache
from ddtrace.llmobs._prompts.manager import PromptManager
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import _batched
from ddtrace.llmobs._utils import _get_llmobs_data_metastruct
from ddtrace.llmobs._utils import _get_ml_app
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
from ddtrace.llmobs._utils import _get_parent_prompt
from ddtrace.llmobs._utils import _get_session_id
from ddtrace.llmobs._utils import _get_span_kind
from ddtrace.llmobs._utils import _get_span_name
from ddtrace.llmobs._utils import _is_evaluation_span
from ddtrace.llmobs._utils import _validate_prompt
from ddtrace.llmobs._utils import add_span_link
from ddtrace.llmobs._utils import enforce_message_role
from ddtrace.llmobs._utils import get_span_links
from ddtrace.llmobs._utils import safe_json
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent
from ddtrace.llmobs._writer import LLMObsExperimentsClient
from ddtrace.llmobs._writer import LLMObsSpanData
from ddtrace.llmobs._writer import LLMObsSpanEvent
from ddtrace.llmobs._writer import LLMObsSpanWriter
from ddtrace.llmobs._writer import should_use_agentless
from ddtrace.llmobs.types import ExportedLLMObsSpan
from ddtrace.llmobs.types import Message
from ddtrace.llmobs.types import Prompt
from ddtrace.llmobs.types import PromptFallback
from ddtrace.llmobs.types import _ErrorField
from ddtrace.llmobs.types import _Meta
from ddtrace.llmobs.types import _MetaIO
from ddtrace.llmobs.types import _SpanField
from ddtrace.llmobs.utils import Documents
from ddtrace.llmobs.utils import Messages
from ddtrace.llmobs.utils import extract_tool_definitions
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.version import __version__
log = get_logger(__name__)
SUPPORTED_LLMOBS_INTEGRATIONS = {
"anthropic": "anthropic",
"bedrock": "botocore",
"openai": "openai",
"langchain": "langchain",
"google_adk": "google_adk",
"google_genai": "google_genai",
"vertexai": "vertexai",
"langgraph": "langgraph",
"litellm": "litellm",
"crewai": "crewai",
"openai_agents": "openai_agents",
"mcp": "mcp",
"pydantic_ai": "pydantic_ai",
"claude_agent_sdk": "claude_agent_sdk",
# requests/concurrent frameworks for distributed injection/extraction
"requests": "requests",
"httpx": "httpx",
"urllib3": "urllib3",
"grpc": "grpc",
"flask": "flask",
"starlette": "starlette",
"fastapi": "fastapi",
"aiohttp": "aiohttp",
"asyncio": "asyncio",
"futures": "futures",
}
# Constants for validation
_TASK_REQUIRED_PARAMS = {"input_data", "config"}
_EVALUATOR_REQUIRED_PARAMS = ("input_data", "output_data", "expected_output")
_SUMMARY_EVALUATOR_REQUIRED_PARAMS = (
"inputs",
"outputs",
"expected_outputs",
"evaluators_results",
)
def _validate_task_signature(task: Callable, is_async: bool) -> None:
if not callable(task):
raise TypeError("task must be a callable function.")
if is_async and not asyncio.iscoroutinefunction(task):
raise TypeError("task must be an async function (coroutine function).")
sig = inspect.signature(task)
params = sig.parameters
if not all(param in params for param in _TASK_REQUIRED_PARAMS):
raise TypeError("Task function must have 'input_data' and 'config' parameters.")
def _validate_evaluator_signature(evaluator: Any, is_async: bool) -> None:
valid_base_classes: tuple[type, ...] = (BaseEvaluator,)
if is_async:
# async experiment allows both sync and async evaluators
valid_base_classes = (BaseEvaluator, BaseAsyncEvaluator)
if isinstance(evaluator, valid_base_classes):
return
if _is_deep_eval_evaluator(evaluator):
return
if _is_pydantic_evaluator(evaluator):
return
if not callable(evaluator):
if is_async:
raise TypeError(
f"Evaluator {evaluator} must be callable or an instance of BaseEvaluator/BaseAsyncEvaluator."
)
else:
raise TypeError(f"Evaluator {evaluator} must be callable or an instance of BaseEvaluator.")
sig = inspect.signature(evaluator)
params = sig.parameters
if not all(param in params for param in _EVALUATOR_REQUIRED_PARAMS):
raise TypeError("Evaluator function must have parameters {}.".format(tuple(_EVALUATOR_REQUIRED_PARAMS)))
def _validate_summary_evaluator_signature(evaluator: Any, is_async: bool) -> None:
valid_base_classes: tuple[type, ...] = (BaseSummaryEvaluator,)
if is_async:
# async experiment allows both sync and async summary evaluators
valid_base_classes = (BaseSummaryEvaluator, BaseAsyncSummaryEvaluator)
if isinstance(evaluator, valid_base_classes):
return
if not callable(evaluator):
if is_async:
raise TypeError(
f"Summary evaluator {evaluator} must be callable "
"or an instance of BaseSummaryEvaluator/BaseAsyncSummaryEvaluator."
)
else:
raise TypeError(f"Summary evaluator {evaluator} must be callable or an instance of BaseSummaryEvaluator.")
sig = inspect.signature(evaluator)
params = sig.parameters
if not all(param in params for param in _SUMMARY_EVALUATOR_REQUIRED_PARAMS):
raise TypeError(
"Summary evaluator function must have parameters {}.".format(tuple(_SUMMARY_EVALUATOR_REQUIRED_PARAMS))
)
class LLMObsExportSpanError(Exception):
"""Error raised when exporting a span."""
pass
class LLMObsAnnotateSpanError(Exception):
"""Error raised when annotating a span."""
pass
class LLMObsSubmitEvaluationError(Exception):
"""Error raised when submitting an evaluation."""
pass
class LLMObsInjectDistributedHeadersError(Exception):
"""Error raised when injecting distributed headers."""
pass
class LLMObsActivateDistributedHeadersError(Exception):
"""Error raised when activating distributed headers."""
pass
@dataclass
class LLMObsSpan:
"""LLMObs span object.
Passed to the `span_processor` function in the `enable` or `register_processor` methods.
Example::
def span_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]:
# Modify input/output
if span.get_tag("omit_span") == "1":
return None
if span.get_tag("no_input") == "1":
span.input = []
return span
"""
input: list[Message] = field(default_factory=list)
output: list[Message] = field(default_factory=list)
_tags: dict[str, str] = field(default_factory=dict)
def get_tag(self, key: str) -> Optional[str]:
"""Get a tag from the span.
:param str key: The key of the tag to get.
:return: The value of the tag or None if the tag does not exist.
:rtype: Optional[str]
"""
return self._tags.get(key)
def _build_llmobs_span(
span_kind: str,
llmobs_input: _MetaIO,
llmobs_output: _MetaIO,
) -> tuple[LLMObsSpan, Literal["value", "messages", ""], Literal["value", "messages", ""]]:
"""Build an LLMObsSpan populated for the user span processor.
Routes input/output to messages or value depending on span kind.
Returns (llmobs_span, input_type, output_type).
"""
llmobs_span = LLMObsSpan()
input_type: Literal["value", "messages", ""] = ""
output_type: Literal["value", "messages", ""] = ""
input_value = llmobs_input.get(LLMOBS_STRUCT.VALUE)
if input_value is not None:
input_type = "value"
llmobs_span.input = [Message(content=safe_json(input_value, ensure_ascii=False) or "", role="")]
input_messages = llmobs_input.get(LLMOBS_STRUCT.MESSAGES)
if span_kind == "llm" and input_messages is not None:
input_type = "messages"
llmobs_span.input = enforce_message_role(input_messages)
output_value = llmobs_output.get(LLMOBS_STRUCT.VALUE)
if output_value is not None:
output_type = "value"
llmobs_span.output = [Message(content=safe_json(output_value, ensure_ascii=False) or "", role="")]
output_messages = llmobs_output.get(LLMOBS_STRUCT.MESSAGES)
if span_kind == "llm" and output_messages is not None:
output_type = "messages"
llmobs_span.output = enforce_message_role(output_messages)
return llmobs_span, input_type, output_type
def _build_span_meta(
span: Span,
llmobs_span: LLMObsSpan,
llmobs_meta: _Meta,
span_kind: str,
input_type: Literal["value", "messages", ""],
output_type: Literal["value", "messages", ""],
) -> _Meta:
"""Build and return the full meta dict for a span event."""
llmobs_input: _MetaIO = llmobs_meta.get(LLMOBS_STRUCT.INPUT) or _MetaIO()
llmobs_output: _MetaIO = llmobs_meta.get(LLMOBS_STRUCT.OUTPUT) or _MetaIO()
meta = _Meta(
span=_SpanField(kind=span_kind),
input=llmobs_input,
output=llmobs_output,
model_name=llmobs_meta.get(LLMOBS_STRUCT.MODEL_NAME) or "",
model_provider=(llmobs_meta.get(LLMOBS_STRUCT.MODEL_PROVIDER) or "custom").lower(),
metadata=llmobs_meta.get(LLMOBS_STRUCT.METADATA) or {},
tool_definitions=llmobs_meta.get(LLMOBS_STRUCT.TOOL_DEFINITIONS) or [],
intent=str(llmobs_meta.get(LLMOBS_STRUCT.INTENT) or ""),
error=_ErrorField(
message=span.get_tag(ERROR_MSG) or "",
stack=span.get_tag(ERROR_STACK) or "",
type=span.get_tag(ERROR_TYPE) or "",
),
)
input_prompt = llmobs_input.get(LLMOBS_STRUCT.PROMPT)
if input_prompt is not None and span_kind != "llm":
log.warning("Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds.")
meta["input"].pop(LLMOBS_STRUCT.PROMPT, None)
elif input_prompt is None and span_kind == "llm":
parent_prompt = _get_parent_prompt(span)
if parent_prompt is not None:
meta["input"]["prompt"] = parent_prompt
expected_output = llmobs_meta.get(LLMOBS_STRUCT.EXPECTED_OUTPUT)
if span.context.get_baggage_item(EXPERIMENT_ID_KEY) and span_kind == "experiment" and expected_output is not None:
meta["expected_output"] = expected_output
if input_type == "messages":
meta["input"]["messages"] = llmobs_span.input
elif input_type == "value" and llmobs_span.input:
meta["input"]["value"] = llmobs_span.input[0].get("content", "")
if output_type == "messages":
meta["output"]["messages"] = llmobs_span.output
elif output_type == "value" and llmobs_span.output:
meta["output"]["value"] = llmobs_span.output[0].get("content", "")
return meta
class LLMObs(Service):
_instance = None # type: LLMObs
enabled = False
_app_key: str = os.getenv("DD_APP_KEY", "")
_project_name: str = os.getenv("DD_LLMOBS_PROJECT_NAME", DEFAULT_PROJECT_NAME)
def __init__(
self,
tracer: Optional[Tracer] = None,
span_processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None,
) -> None:
super(LLMObs, self).__init__()
self.tracer = tracer or ddtrace.tracer
self._llmobs_context_provider = LLMObsContextProvider()
self._user_span_processor = span_processor
agentless_enabled = config._llmobs_agentless_enabled if config._llmobs_agentless_enabled is not None else True
self._llmobs_span_writer = LLMObsSpanWriter(
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
is_agentless=agentless_enabled,
)
self._llmobs_eval_metric_writer = LLMObsEvalMetricWriter(
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
is_agentless=agentless_enabled,
)
self._evaluator_runner = EvaluatorRunner(
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)),
llmobs_service=self,
)
self._dne_client = LLMObsExperimentsClient(
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
_app_key=self._app_key,
_default_project=Project(name=self._project_name, _id=""),
is_agentless=True, # agent proxy doesn't seem to work for experiments
)
forksafe.register(self._child_after_fork)
self._link_tracker = LinkTracker()
self._annotations: list[tuple[str, str, dict[str, Any]]] = []
self._annotation_context_lock = RLock()
def _on_span_start(self, span: Span) -> None:
if self.enabled and span.span_type == SpanTypes.LLM:
self._activate_llmobs_span(span)
telemetry.record_span_started()
self._do_annotations(span)
def _on_span_finish(self, span: Span) -> None:
if self.enabled and span.span_type == SpanTypes.LLM:
self._submit_llmobs_span(span)
telemetry.record_span_created(span)
def _submit_llmobs_span(self, span: Span) -> None:
"""Generate and submit an LLMObs span event to be sent to LLMObs."""
span_event = None
try:
span_event = self._llmobs_span_event(span)
if span_event is None:
return
self._llmobs_span_writer.enqueue(span_event)
except (KeyError, TypeError, ValueError):
log.error(
"Error generating LLMObs span event for span %s, likely due to malformed span",
span,
exc_info=True,
)
finally:
if span_event and span._get_ctx_item(SPAN_KIND) == "llm" and not _is_evaluation_span(span):
if self._evaluator_runner:
self._evaluator_runner.enqueue(span_event, span)
def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]:
"""Generate LLMObs span event using either the meta_struct path or the legacy _store path."""
llmobs_data = _get_llmobs_data_metastruct(span)
if llmobs_data:
return self._build_span_event_from_meta_struct(span, llmobs_data)
return self._build_span_event_from_ctx_items(span)
def _apply_user_span_processor(self, llmobs_span: LLMObsSpan, llmobs_data: LLMObsSpanData) -> Optional[LLMObsSpan]:
"""Run the user span processor.
Returns the possibly mutated span, or None if the span should be dropped.
On error, logs and returns the original span unchanged.
"""
if self._user_span_processor is None:
return llmobs_span
error = False
try:
llmobs_span._tags = cast(dict[str, str], llmobs_data.get(LLMOBS_STRUCT.TAGS, {}))
result = self._user_span_processor(llmobs_span)
if result is None:
return None
if not isinstance(result, LLMObsSpan):
raise TypeError("User span processor must return an LLMObsSpan or None, got %r" % type(result))
return result
except Exception as e:
log.error("Error in LLMObs span processor (%r): %r", self._user_span_processor, e)
error = True
return llmobs_span
finally:
telemetry.record_llmobs_user_processor_called(error)
def _build_span_event_from_meta_struct(self, span: Span, llmobs_data: LLMObsSpanData) -> Optional[LLMObsSpanEvent]:
llmobs_meta = llmobs_data.get(LLMOBS_STRUCT.META) or _Meta()
llmobs_input = llmobs_meta.get(LLMOBS_STRUCT.INPUT) or _MetaIO()
llmobs_output = llmobs_meta.get(LLMOBS_STRUCT.OUTPUT) or _MetaIO()
span_kind = _get_span_kind(span)
if not span_kind:
raise KeyError("Span kind not found in span context")
ml_app = _get_ml_app(span)
if ml_app is None:
raise ValueError(
"ML app is required for sending LLM Observability data. "
"Ensure this configuration is set before running your application."
)
span._set_ctx_item(ML_APP, ml_app)
parent_id = llmobs_data.get(LLMOBS_STRUCT.PARENT_ID) or ROOT_PARENT_ID
llmobs_trace_id = llmobs_data.get(LLMOBS_STRUCT.TRACE_ID)
if llmobs_trace_id is None:
raise ValueError("Failed to extract LLMObs trace ID from span context.")
if span_kind == "llm":
core.dispatch(DISPATCH_ON_LLM_SPAN_FINISH, (span,))
llmobs_span, input_type, output_type = _build_llmobs_span(span_kind, llmobs_input, llmobs_output)
user_processed_span = self._apply_user_span_processor(llmobs_span, llmobs_data)
if user_processed_span is None:
return None
llmobs_span = user_processed_span
# Wait to build meta until after user processors apply and potentially mutate I/O
meta = _build_span_meta(span, llmobs_span, llmobs_meta, span_kind, input_type, output_type)
metrics = llmobs_data.get(LLMOBS_STRUCT.METRICS) or {}
session_id = _get_session_id(span)
tags = self._llmobs_tags(span, ml_app, session_id, True, llmobs_data)
span_links = get_span_links(span)
_dd_attrs = {
"span_id": str(span.span_id),
"trace_id": format_trace_id(span.trace_id),
"apm_trace_id": format_trace_id(span.trace_id),
}
if span.context.get_baggage_item(EXPERIMENT_ID_KEY):
_dd_attrs["scope"] = "experiments"
llmobs_span_event: LLMObsSpanEvent = {
"trace_id": llmobs_trace_id,
"span_id": str(span.span_id),
"parent_id": parent_id,
"name": _get_span_name(span),
"start_ns": span.start_ns,
"duration": cast(int, span.duration_ns),
"status": "error" if span.error else "ok",
"meta": meta,
"metrics": metrics,
"session_id": session_id or "",
"tags": tags,
"span_links": span_links,
"_dd": _dd_attrs,
}
experiment_config = llmobs_data.get(LLMOBS_STRUCT.CONFIG)
if experiment_config:
llmobs_span_event["config"] = experiment_config
return llmobs_span_event
def _build_span_event_from_ctx_items(self, span: Span) -> Optional[LLMObsSpanEvent]:
"""Build span event from ctx_item data (legacy path for backward compatibility)."""
span_kind = span._get_ctx_item(SPAN_KIND)
if not span_kind:
raise KeyError("Span kind not found in span context")
if span_kind == "llm":
core.dispatch(DISPATCH_ON_LLM_SPAN_FINISH, (span,))
llmobs_span = LLMObsSpan()
_dd_attrs = {
"span_id": str(span.span_id),
"trace_id": format_trace_id(span.trace_id),
"apm_trace_id": format_trace_id(span.trace_id),
}
meta = _Meta(span=_SpanField(kind=span_kind), input=_MetaIO(), output=_MetaIO())
if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None:
meta["model_name"] = span._get_ctx_item(MODEL_NAME) or ""
meta["model_provider"] = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower()
metadata = span._get_ctx_item(METADATA) or {}
if span_kind == "agent" and span._get_ctx_item(AGENT_MANIFEST) is not None:
metadata_dd = _dd_val if isinstance(_dd_val := metadata.get("_dd"), dict) else {}
metadata_dd["agent_manifest"] = span._get_ctx_item(AGENT_MANIFEST)
metadata["_dd"] = metadata_dd
meta["metadata"] = metadata
input_type: Literal["value", "messages", ""] = ""
output_type: Literal["value", "messages", ""] = ""
if span._get_ctx_item(INPUT_VALUE) is not None:
input_type = "value"
llmobs_span.input = [
Message(
content=safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False) or "",
role="",
)
]
if span.context.get_baggage_item(EXPERIMENT_ID_KEY):
_dd_attrs["scope"] = "experiments"
if span_kind == "experiment":
expected_output = span._get_ctx_item(EXPERIMENT_EXPECTED_OUTPUT)
if expected_output:
meta["expected_output"] = expected_output
input_data = span._get_ctx_item(EXPERIMENTS_INPUT)
if input_data:
meta["input"] = input_data
output_data = span._get_ctx_item(EXPERIMENTS_OUTPUT)
if output_data:
meta["output"] = output_data
input_messages = span._get_ctx_item(INPUT_MESSAGES)
if span_kind == "llm" and input_messages is not None:
input_type = "messages"
llmobs_span.input = cast(list[Message], enforce_message_role(input_messages))
if span._get_ctx_item(OUTPUT_VALUE) is not None:
output_type = "value"
llmobs_span.output = [
Message(
content=safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False) or "",
role="",
)
]
output_messages = span._get_ctx_item(OUTPUT_MESSAGES)
if span_kind == "llm" and output_messages is not None:
output_type = "messages"
llmobs_span.output = cast(list[Message], enforce_message_role(output_messages))
if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None:
meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) or []
if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None:
meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) or []
if span._get_ctx_item(INPUT_PROMPT) is not None:
prompt_json_str = span._get_ctx_item(INPUT_PROMPT)
if span_kind != "llm":
log.warning(
"Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds."
)
else:
prompt_dict = cast(Prompt, prompt_json_str)
meta["input"]["prompt"] = prompt_dict
elif span_kind == "llm":
parent_span = _get_nearest_llmobs_ancestor(span)
if parent_span is not None:
parent_llmobs_data = _get_llmobs_data_metastruct(parent_span)
if parent_llmobs_data:
parent_llmobs_input = parent_llmobs_data.get(LLMOBS_STRUCT.META, {}).get(LLMOBS_STRUCT.INPUT, {})
parent_prompt = (
parent_llmobs_input.get(LLMOBS_STRUCT.PROMPT) if isinstance(parent_llmobs_input, dict) else None
)
else:
parent_prompt = parent_span._get_ctx_item(INPUT_PROMPT)
if parent_prompt is not None:
meta["input"]["prompt"] = parent_prompt
if span._get_ctx_item(TOOL_DEFINITIONS) is not None:
meta["tool_definitions"] = span._get_ctx_item(TOOL_DEFINITIONS) or []
intent = span._get_ctx_item(MCP_TOOL_CALL_INTENT)
if intent is not None:
meta["intent"] = str(intent)
if span.error:
meta["error"] = _ErrorField(
message=span.get_tag(ERROR_MSG) or "",
stack=span.get_tag(ERROR_STACK) or "",
type=span.get_tag(ERROR_TYPE) or "",
)
if self._user_span_processor:
error = False
try:
llmobs_span._tags = cast(dict[str, str], span._get_ctx_item(TAGS))
user_llmobs_span = self._user_span_processor(llmobs_span)
if user_llmobs_span is None:
return None
if not isinstance(user_llmobs_span, LLMObsSpan):
raise TypeError(
"User span processor must return an LLMObsSpan or None, got %r" % type(user_llmobs_span)
)
llmobs_span = user_llmobs_span
except Exception as e:
log.error(
"Error in LLMObs span processor (%r): %r",
self._user_span_processor,
e,
)
error = True
finally:
telemetry.record_llmobs_user_processor_called(error)
if llmobs_span.input is not None:
if input_type == "messages":
meta["input"]["messages"] = llmobs_span.input
elif input_type == "value":
meta["input"]["value"] = llmobs_span.input[0].get("content", "")
if llmobs_span.output is not None:
if output_type == "messages":
meta["output"]["messages"] = llmobs_span.output
elif output_type == "value":
meta["output"]["value"] = llmobs_span.output[0].get("content", "")
if not meta["input"]:
meta.pop("input")
if not meta["output"]:
meta.pop("output")
metrics = span._get_ctx_item(METRICS) or {}
ml_app = _get_ml_app(span)
if ml_app is None:
raise ValueError(
"ML app is required for sending LLM Observability data. "
"Ensure this configuration is set before running your application."
)
span._set_ctx_item(ML_APP, ml_app)
parent_id = span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID
llmobs_trace_id = span._get_ctx_item(LLMOBS_TRACE_ID)
if llmobs_trace_id is None:
raise ValueError("Failed to extract LLMObs trace ID from span context.")
llmobs_span_event: LLMObsSpanEvent = {
"trace_id": format_trace_id(llmobs_trace_id),
"span_id": str(span.span_id),
"parent_id": parent_id,
"name": _get_span_name(span),
"start_ns": span.start_ns,
"duration": cast(int, span.duration_ns),
"status": "error" if span.error else "ok",
"meta": meta,
"metrics": metrics,
"tags": [],
"_dd": _dd_attrs,
}
session_id = _get_session_id(span)
if session_id is not None:
span._set_ctx_item(SESSION_ID, session_id)
llmobs_span_event["session_id"] = session_id
llmobs_span_event["tags"] = self._llmobs_tags(span, ml_app, session_id, False, None)
span_links = span._get_ctx_item(SPAN_LINKS)
if isinstance(span_links, list) and span_links:
llmobs_span_event["span_links"] = span_links
experiment_config = span._get_ctx_item(EXPERIMENT_CONFIG)
if experiment_config:
llmobs_span_event["config"] = experiment_config
return llmobs_span_event
@staticmethod
def _llmobs_tags(
span: Span,
ml_app: str,
session_id: Optional[str] = None,
use_meta_struct: bool = False,
llmobs_data: Optional[LLMObsSpanData] = None,
) -> list[str]:
dd_tags = config.tags
tags = {
**dd_tags,
"version": config.version or "",
"env": config.env or "",
"service": span.service or "",
"source": "integration",
"ml_app": ml_app,
"ddtrace.version": __version__,
"language": "python",
"error": span.error,
}
err_type = span.get_tag(ERROR_TYPE)
if err_type:
tags["error_type"] = err_type
if session_id:
tags["session_id"] = session_id
existing_tags: Optional[dict[str, str]] = None
if use_meta_struct and llmobs_data:
llmobs_tags = llmobs_data.get(LLMOBS_STRUCT.TAGS, {})
if llmobs_tags.get("integration"):
tags["integration"] = llmobs_tags.get("integration")
existing_tags = llmobs_tags
else:
if span._get_ctx_item(INTEGRATION):
tags["integration"] = span._get_ctx_item(INTEGRATION)
existing_tags = span._get_ctx_item(TAGS)
if _is_evaluation_span(span):
tags[constants.RUNNER_IS_INTEGRATION_SPAN_TAG] = "ragas"
if existing_tags is not None:
tags.update(existing_tags)
# set experiment tags on children spans if the tags do not already exist
experiment_id = span.context.get_baggage_item(EXPERIMENT_ID_KEY)
if experiment_id and "experiment_id" not in tags:
tags["experiment_id"] = experiment_id
run_id = span.context.get_baggage_item(EXPERIMENT_RUN_ID_KEY)
if run_id and "run_id" not in tags:
tags["run_id"] = run_id
run_iteration = span.context.get_baggage_item(EXPERIMENT_RUN_ITERATION_KEY)
if run_iteration and "run_iteration" not in tags:
tags["run_iteration"] = run_iteration
dataset_name = span.context.get_baggage_item(EXPERIMENT_DATASET_NAME_KEY)
if dataset_name and "dataset_name" not in tags:
tags["dataset_name"] = dataset_name
project_name = span.context.get_baggage_item(EXPERIMENT_PROJECT_NAME_KEY)
if project_name and "project_name" not in tags:
tags["project_name"] = project_name
project_id = span.context.get_baggage_item(EXPERIMENT_PROJECT_ID_KEY)
if project_id and "project_id" not in tags:
tags["project_id"] = project_id
experiment_name = span.context.get_baggage_item(EXPERIMENT_NAME_KEY)
if experiment_name and "experiment_name" not in tags:
tags["experiment_name"] = experiment_name
return ["{}:{}".format(k, v) for k, v in tags.items()]
def _do_annotations(self, span: Span) -> None:
# get the current span context
# only do the annotations if it matches the context
if span.span_type != SpanTypes.LLM: # do this check to avoid the warning log in `annotate`
return
current_context = self._instance.tracer.current_trace_context()
if current_context is None:
return
current_context_id = current_context.get_baggage_item(ANNOTATIONS_CONTEXT_ID)
with self._annotation_context_lock:
for _, context_id, annotation_kwargs in self._instance._annotations:
if current_context_id == context_id:
self.annotate(span, **annotation_kwargs, _suppress_span_kind_error=True)
def _child_after_fork(self) -> None:
self._llmobs_span_writer = self._llmobs_span_writer.recreate()
self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate()
self._evaluator_runner = self._evaluator_runner.recreate()
LLMObs._prompt_manager = None
if self.enabled:
self._start_service()
def _start_service(self) -> None:
try:
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()
except ServiceStatusError:
log.debug("Error starting LLMObs writers")
try:
self._evaluator_runner.start()
except ServiceStatusError:
log.debug("Error starting evaluator runner")
def _stop_service(self) -> None:
try:
self._evaluator_runner.stop()
# flush remaining evaluation spans & evaluations
self._instance._llmobs_span_writer.periodic()
self._instance._llmobs_eval_metric_writer.periodic()
except ServiceStatusError:
log.debug("Error stopping evaluator runner")
try:
self._llmobs_span_writer.stop()
self._llmobs_eval_metric_writer.stop()
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")
# Remove listener hooks for span events
core.reset_listeners("trace.span_start", self._on_span_start)
core.reset_listeners("trace.span_finish", self._on_span_finish)
core.reset_listeners("http.span_inject", self._inject_llmobs_context)
core.reset_listeners(
"http.activate_distributed_headers",
self._activate_llmobs_distributed_context_soft_fail,
)
core.reset_listeners("threading.submit", self._current_trace_context)
core.reset_listeners("threading.execution", self._llmobs_context_provider.activate)
core.reset_listeners("asyncio.create_task", self._on_asyncio_create_task)
core.reset_listeners("asyncio.execute_task", self._on_asyncio_execute_task)
core.reset_listeners(DISPATCH_ON_LLM_TOOL_CHOICE, self._link_tracker.on_llm_tool_choice)
core.reset_listeners(DISPATCH_ON_TOOL_CALL, self._link_tracker.on_tool_call)
core.reset_listeners(
DISPATCH_ON_TOOL_CALL_OUTPUT_USED,
self._link_tracker.on_tool_call_output_used,
)
core.reset_listeners(DISPATCH_ON_GUARDRAIL_SPAN_START, self._link_tracker.on_guardrail_span_start)
core.reset_listeners(DISPATCH_ON_LLM_SPAN_FINISH, self._link_tracker.on_llm_span_finish)
core.reset_listeners(
DISPATCH_ON_OPENAI_AGENT_SPAN_FINISH,
self._link_tracker.on_openai_agent_span_finish,
)
forksafe.unregister(self._child_after_fork)
@classmethod
def enable(
cls,
ml_app: Optional[str] = None,
integrations_enabled: bool = True,
agentless_enabled: Optional[bool] = None,
instrumented_proxy_urls: Optional[set[str]] = None,
site: Optional[str] = None,
api_key: Optional[str] = None,
app_key: Optional[str] = None,
project_name: Optional[str] = None,
env: Optional[str] = None,
service: Optional[str] = None,
span_processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None,
_tracer: Optional[Tracer] = None,
_auto: bool = False,
) -> None:
"""
Enable LLM Observability tracing.
:param str ml_app: The name of your ml application.
:param bool integrations_enabled: set to `true` to enable LLM integrations.
:param bool agentless_enabled: set to `true` to disable sending data that requires a Datadog Agent.
:param set[str] instrumented_proxy_urls: A set of instrumented proxy URLs to help detect when to emit LLM spans.
:param str site: Your datadog site.
:param str api_key: Your datadog api key.
:param str app_key: Your datadog application key.
:param str project_name: Your project name used for experiments.
:param str env: Your environment name.
:param str service: Your service name.
:param Callable[[LLMObsSpan], Optional[LLMObsSpan]] span_processor: A function that takes an LLMObsSpan and
returns an LLMObsSpan or None. If None is returned, the span will be omitted and not sent to LLMObs.
"""
if cls.enabled:
log.debug("%s already enabled", cls.__name__)
return
cls._warn_if_litellm_was_imported()
if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")):
log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
return
# grab required values for LLMObs
config._dd_site = site or config._dd_site
config._dd_api_key = api_key or config._dd_api_key
cls._app_key = app_key or cls._app_key
cls._project_name = project_name or cls._project_name or DEFAULT_PROJECT_NAME