Skip to content

Commit

Permalink
CrewAI support (#212)
Browse files Browse the repository at this point in the history
* Write a stream wrapper

* Example

* run formatter
  • Loading branch information
karthikscale3 authored Jun 24, 2024
1 parent 4316b4c commit aad958d
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 212 deletions.
70 changes: 70 additions & 0 deletions src/examples/crewai_example/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os

os.environ["OPENAI_MODEL_NAME"] = "gpt-3.5-turbo"
os.environ["SERPER_API_KEY"] = "" # serper.dev API key
from langtrace_python_sdk import langtrace
from langtrace_python_sdk.utils.with_root_span import with_langtrace_root_span
from crewai import Crew, Process
from crewai import Task
from crewai import Agent
from crewai_tools import SerperDevTool
from crewai_tools import YoutubeVideoSearchTool

langtrace.init()

search_tool = SerperDevTool()

# Targeted search within a specific Youtube video's content
youtube_tool = YoutubeVideoSearchTool(
youtube_video_url="https://www.youtube.com/watch?v=blqIZGXWUpU"
)

# Creating a senior researcher agent with memory and verbose mode
researcher = Agent(
role="Senior Researcher",
goal="Uncover groundbreaking technologies in {topic}",
verbose=True,
memory=True,
backstory=(
"Driven by curiosity, you're at the forefront of"
"innovation, eager to explore and share knowledge that could change"
"the world."
),
tools=[youtube_tool],
)

# Research task
research_task = Task(
description=(
"Do a {topic} of the given youtube video."
"Focus on identifying the overall narrative."
"Your final report should clearly articulate the key points."
),
expected_output="10 key points from the shared video.",
tools=[youtube_tool],
agent=researcher,
callback="research_callback", # Example of task callback
human_input=True,
)


# Forming the tech-focused crew with some enhanced configurations
crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential, # Optional: Sequential task execution is default
memory=False,
cache=False,
max_rpm=20,
)

# Starting the task execution process with enhanced feedback


@with_langtrace_root_span("Crew")
def test_crew():
result = crew.kickoff(inputs={"topic": "summary"})
return result


test_crew()
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"ANTHROPIC": "Anthropic",
"AZURE": "Azure",
"CHROMA": "Chroma",
"CREWAI": "CrewAI",
"DSPY": "DSPy",
"GROQ": "Groq",
"LANGCHAIN": "Langchain",
Expand Down
6 changes: 4 additions & 2 deletions src/langtrace_python_sdk/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .anthropic import AnthropicInstrumentation
from .chroma import ChromaInstrumentation
from .cohere import CohereInstrumentation
from .crewai import CrewAIInstrumentation
from .groq import GroqInstrumentation
from .langchain import LangchainInstrumentation
from .langchain_community import LangchainCommunityInstrumentation
Expand All @@ -12,12 +13,13 @@
from .qdrant import QdrantInstrumentation
from .weaviate import WeaviateInstrumentation
from .ollama import OllamaInstrumentor
from .dspy import DspyInstrumentor
from .dspy import DspyInstrumentation

__all__ = [
"AnthropicInstrumentation",
"ChromaInstrumentation",
"CohereInstrumentation",
"CrewAIInstrumentation",
"GroqInstrumentation",
"LangchainInstrumentation",
"LangchainCommunityInstrumentation",
Expand All @@ -29,5 +31,5 @@
"QdrantInstrumentation",
"WeaviateInstrumentation",
"OllamaInstrumentor",
"DspyInstrumentor",
"DspyInstrumentation",
]
3 changes: 3 additions & 0 deletions src/langtrace_python_sdk/instrumentation/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .instrumentation import CrewAIInstrumentation

__all__ = ["CrewAIInstrumentation"]
53 changes: 53 additions & 0 deletions src/langtrace_python_sdk/instrumentation/crewai/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Copyright (c) 2024 Scale3 Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper as _W
from typing import Collection
from importlib_metadata import version as v
from .patch import patch_crew


class CrewAIInstrumentation(BaseInstrumentor):
"""
The CrewAIInstrumentation class represents the CrewAI instrumentation"""

def instrumentation_dependencies(self) -> Collection[str]:
return ["crewai >= 0.32.0"]

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
version = v("crewai")
_W(
"crewai.crew",
"Crew.kickoff",
patch_crew("Crew.kickoff", version, tracer),
)
_W(
"crewai.agent",
"Agent.execute_task",
patch_crew("Agent.execute_task", version, tracer),
)
_W(
"crewai.task",
"Task.execute",
patch_crew("Task.execute", version, tracer),
)

def _uninstrument(self, **kwargs):
pass
173 changes: 173 additions & 0 deletions src/langtrace_python_sdk/instrumentation/crewai/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import json
from importlib_metadata import version as v
from langtrace_python_sdk.constants import LANGTRACE_SDK_NAME
from langtrace_python_sdk.utils import set_span_attribute
from langtrace_python_sdk.utils.silently_fail import silently_fail
from langtrace_python_sdk.constants.instrumentation.common import (
LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY,
SERVICE_PROVIDERS,
)
from opentelemetry import baggage
from langtrace.trace_attributes import FrameworkSpanAttributes
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode


crew_properties = {
"tasks": "object",
"agents": "object",
"cache": "bool",
"process": "object",
"verbose": "bool",
"memory": "bool",
"embedder": "json",
"full_output": "bool",
"manager_llm": "object",
"manager_agent": "object",
"manager_callbacks": "object",
"function_calling_llm": "object",
"config": "json",
"id": "object",
"max_rpm": "int",
"share_crew": "bool",
"step_callback": "object",
"task_callback": "object",
"prompt_file": "object",
"output_log_file": "object",
}

task_properties = {
"id": "object",
"used_tools": "int",
"tools_errors": "int",
"delegations": "int",
"i18n": "object",
"thread": "object",
"prompt_context": "object",
"description": "str",
"expected_output": "str",
"config": "object",
"callback": "str",
"agent": "object",
"context": "object",
"async_execution": "bool",
"output_json": "object",
"output_pydantic": "object",
"output_file": "object",
"output": "object",
"tools": "object",
"human_input": "bool",
}

agent_properties = {
"formatting_errors": "int",
"id": "object",
"role": "str",
"goal": "str",
"backstory": "str",
"cache": "bool",
"config": "object",
"max_rpm": "int",
"verbose": "bool",
"allow_delegation": "bool",
"tools": "object",
"max_iter": "int",
"max_execution_time": "object",
"agent_executor": "object",
"tools_handler": "object",
"force_answer_max_iterations": "int",
"crew": "object",
"cache_handler": "object",
"step_callback": "object",
"i18n": "object",
"llm": "object",
"function_calling_llm": "object",
"callbacks": "object",
"system_template": "object",
"prompt_template": "object",
"response_template": "object",
}


def patch_crew(operation_name, version, tracer):
def traced_method(wrapped, instance, args, kwargs):

service_provider = SERVICE_PROVIDERS["CREWAI"]
extra_attributes = baggage.get_baggage(LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY)
span_attributes = {
"langtrace.sdk.name": "langtrace-python-sdk",
"langtrace.service.name": service_provider,
"langtrace.service.type": "framework",
"langtrace.service.version": version,
"langtrace.version": v(LANGTRACE_SDK_NAME),
**(extra_attributes if extra_attributes is not None else {}),
}

crew_config = {}
for key, value in instance.__dict__.items():
if instance.__class__.__name__ == "Crew":
if key in crew_properties and value is not None:
if crew_properties[key] == "json":
crew_config[key] = json.dumps(value)
elif crew_properties[key] == "object":
crew_config[key] = str(value)
else:
crew_config[key] = value
elif instance.__class__.__name__ == "Agent":
if key in agent_properties and value is not None:
if agent_properties[key] == "json":
crew_config[key] = json.dumps(value)
elif agent_properties[key] == "object":
crew_config[key] = str(value)
else:
crew_config[key] = value
elif instance.__class__.__name__ == "Task":
if key in task_properties and value is not None:
if task_properties[key] == "json":
crew_config[key] = json.dumps(value)
elif task_properties[key] == "object":
crew_config[key] = str(value)
else:
crew_config[key] = value
if crew_config:
if instance.__class__.__name__ == "Crew":
if "inputs" in kwargs and kwargs["inputs"]:
crew_config["inputs"] = json.dumps(kwargs["inputs"])
span_attributes["crewai.crew.config"] = json.dumps(crew_config)
elif instance.__class__.__name__ == "Agent":
if "context" in kwargs and kwargs["context"]:
crew_config["context"] = json.dumps(kwargs["context"])
span_attributes["crewai.agent.config"] = json.dumps(crew_config)
elif instance.__class__.__name__ == "Task":
span_attributes["crewai.task.config"] = json.dumps(crew_config)

attributes = FrameworkSpanAttributes(**span_attributes)

with tracer.start_as_current_span(operation_name, kind=SpanKind.CLIENT) as span:
_set_input_attributes(span, kwargs, attributes)

try:
result = wrapped(*args, **kwargs)
if result:
span.set_status(Status(StatusCode.OK))

span.end()
return result

except Exception as err:
# Record the exception in the span
span.record_exception(err)

# Set the span status to indicate an error
span.set_status(Status(StatusCode.ERROR, str(err)))

# Reraise the exception to ensure it's not swallowed
raise

return traced_method


@silently_fail
def _set_input_attributes(span, kwargs, attributes):
for field, value in attributes.model_dump(by_alias=True).items():
set_span_attribute(span, field, value)
4 changes: 2 additions & 2 deletions src/langtrace_python_sdk/instrumentation/dspy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .instrumentation import DspyInstrumentor
from .instrumentation import DspyInstrumentation

__all__ = ["DspyInstrumentor"]
__all__ = ["DspyInstrumentation"]
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .patch import patch_bootstrapfewshot_optimizer, patch_signature, patch_evaluate


class DspyInstrumentor(BaseInstrumentor):
class DspyInstrumentation(BaseInstrumentor):
"""
The DspyInstrumentor class represents the DSPy instrumentation"""

Expand Down
Loading

0 comments on commit aad958d

Please sign in to comment.