diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala index 729fee36b2..c60eac2634 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala @@ -48,17 +48,26 @@ object DatabricksUtilities { val Folder = s"/SynapseMLBuild/build_${BuildInfo.version}" val ScalaVersion: String = BuildInfo.scalaVersion.split(".".toCharArray).dropRight(1).mkString(".") - val Libraries: String = List( - Map("maven" -> Map("coordinates" -> PackageMavenCoordinate, "repo" -> PackageRepository)), - Map("pypi" -> Map("package" -> "nltk")), - Map("pypi" -> Map("package" -> "bs4")), - Map("pypi" -> Map("package" -> "plotly")), - Map("pypi" -> Map("package" -> "Pillow")), - Map("pypi" -> Map("package" -> "onnxmltools==1.7.0")), - Map("pypi" -> Map("package" -> "lightgbm")), - Map("pypi" -> Map("package" -> "mlflow")), - Map("pypi" -> Map("package" -> "openai")) - ).toJson.compactPrint + val PipPackages: Seq[String] = Seq( + "nltk", + "bs4", + "plotly", + "Pillow", + "onnxmltools==1.7.0", + "lightgbm", + "mlflow", + "openai", + "langchain", + "pdf2image", + "pdfminer.six", + "pytesseract", + "unstructured" + ) + + val Libraries: String = ( + List(Map("maven" -> Map("coordinates" -> PackageMavenCoordinate, "repo" -> PackageRepository))) ++ + PipPackages.map(p => Map("pypi" -> Map("package" -> p))) + ).toJson.compactPrint // TODO: install synapse.ml.dl wheel package here val GPULibraries: String = List( @@ -285,7 +294,7 @@ object DatabricksUtilities { val (url, nbName) = getRunUrlAndNBName(runId) if (logLevel >= 1) println(s"Started Monitoring notebook $nbName, url: $url") - while (finalState.isEmpty & //scalastyle:ignore while + while (finalState.isEmpty & //scalastyle:ignore while (System.currentTimeMillis() - startTime) < timeout & lifeCycleState != "INTERNAL_ERROR" ) { diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala index fd126004b9..5dfd76c4ae 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala @@ -48,6 +48,7 @@ class SynapseTests extends TestBase { .filterNot(_.getAbsolutePath.contains("DeepLearningDeepVisionClassification")) // Excluded by design task 1829306 .filterNot(_.getAbsolutePath.contains("VowpalWabbitClassificationusingVWnativeFormat")) .filterNot(_.getAbsolutePath.contains("VowpalWabbitMulticlassclassification")) // Wait for Synpase fix + .filterNot(_.getAbsolutePath.contains("Langchain")) // Wait for Synpase fix .sortBy(_.getAbsolutePath) val expectedPoolCount: Int = selectedPythonFiles.length diff --git a/notebooks/features/cognitive_services/CognitiveServices - LangchainTransformer.ipynb b/notebooks/features/cognitive_services/CognitiveServices - LangchainTransformer.ipynb new file mode 100644 index 0000000000..577aedc2b8 --- /dev/null +++ b/notebooks/features/cognitive_services/CognitiveServices - LangchainTransformer.ipynb @@ -0,0 +1,540 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b8534344-9012-4ed2-9901-d080390bccdf", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Using the LangChain Transformer\n", + "\n", + "LangChain is a software development framework designed to simplify the creation of applications using large language models (LLMs). Chains in LangChain go beyond just a single LLM call and are sequences of calls (can be a call to an LLM or a different utility), automating the execution of a series of calls and actions.\n", + "To make it easier to scale up the LangChain execution on a large dataset, we have integrated LangChain with the distributed machine learning library [SynapseML](https://www.microsoft.com/en-us/research/blog/synapseml-a-simple-multilingual-and-massively-parallel-machine-learning-library/). This integration makes it easy to use the [Apache Spark](https://spark.apache.org/) distributed computing framework to process millions of data with the LangChain Framework.\n", + "\n", + "This tutorial shows how to apply LangChain at scale for paper summarization and organization. We start with a table of arxiv links and apply the LangChain Transformerto automatically extract the corresponding paper title, authors, summary, and some related works." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "29295c7f-c3ca-4717-9ae0-9d0f7b35247f", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Step 1: Prerequisites\n", + "\n", + "The key prerequisites for this quickstart include a working Azure OpenAI resource, and an Apache Spark cluster with SynapseML installed. We suggest creating a Synapse workspace, but an Azure Databricks, HDInsight, or Spark on Kubernetes, or even a python environment with the `pyspark` package will work. If you need to use the last component of the chain - An agent with web searching capabilities, you also need a SerpAPIKey.\n", + "\n", + "1. An Azure OpenAI resource – request access [here](https://customervoice.microsoft.com/Pages/ResponsePage.aspx?id=v4j5cvGGr0GRqy180BHbR7en2Ais5pxKtso_Pz4b1_xUOFA5Qk1UWDRBMjg0WFhPMkIzTzhKQ1dWNyQlQCN0PWcu) before [creating a resource](https://docs.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource)\n", + "1. [Create a Synapse workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/get-started-create-workspace)\n", + "1. [Create a serverless Apache Spark pool](https://docs.microsoft.com/en-us/azure/synapse-analytics/get-started-analyze-spark#create-a-serverless-apache-spark-pool)\n", + "1. Get a SerpAPIKey from [SerpApi](https://serpapi.com/)." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2aac717c-7d43-4181-bf29-c7dcd64aa846", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Step 2: Import this guide as a notebook\n", + "\n", + "The next step is to add this code into your Spark cluster. You can either create a notebook in your Spark platform and copy the code into this notebook to run the demo. Or download the notebook and import it into Synapse Analytics\n", + "\n", + "1. Import the notebook [into the Synapse Workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-development-using-notebooks#create-a-notebook) or if using Databricks [into the Databricks Workspace](https://docs.microsoft.com/en-us/azure/databricks/notebooks/notebooks-manage#create-a-notebook)\n", + "1. Install SynapseML on your cluster. Please see the installation instructions for Synapse at the bottom of [the SynapseML website](https://microsoft.github.io/SynapseML/). Note that this requires pasting an additional cell at the top of the notebook you just imported\n", + "1. Connect your notebook to a cluster and follow along, editing and running the cells below." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d0642e69-1669-4b18-94a2-258af0fbcf9f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# This cell ensures make magic command like '%pip install' works on synapse scheduled spark jobs\n", + "from synapse.ml.core.platform import running_on_synapse\n", + "\n", + "if running_on_synapse():\n", + " from IPython import get_ipython\n", + " from IPython.terminal.interactiveshell import TerminalInteractiveShell\n", + "\n", + " try:\n", + " shell = TerminalInteractiveShell.instance()\n", + " except:\n", + " pass\n", + " from notebookutils.visualization import display" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "be13665b-f9fc-46e8-b83b-ad7d57057a2f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "%pip install langchain openai pdf2image pdfminer.six pytesseract unstructured" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "cec093ca-2d2e-422e-91bb-19e57216b01e", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import os, openai, langchain, uuid\n", + "from langchain.llms import AzureOpenAI, OpenAI\n", + "from langchain.agents import load_tools, initialize_agent, AgentType\n", + "from langchain.chat_models import AzureChatOpenAI\n", + "from langchain.chains import TransformChain, LLMChain, SimpleSequentialChain\n", + "from langchain.document_loaders import OnlinePDFLoader\n", + "from langchain.prompts import PromptTemplate\n", + "import pyspark.sql.functions as f\n", + "from synapse.ml.cognitive.langchain import LangchainTransformer\n", + "from synapse.ml.core.platform import running_on_synapse, find_secret" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "487f439a-6993-4f1f-ae5a-25d15200f4e3", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Step 3: Fill in the service information and construct the LLM\n", + "Next, please edit the cell in the notebook to point to your service. In particular set the `model_name`, `deployment_name`, `openai_api_base`, and `open_api_key` variables to match those for your OpenAI service. Please feel free to replace `find_secret` with your key as follows\n", + "\n", + "`openai_api_key = \"99sj2w82o....\"`\n", + "\n", + "Note: If using SerpAPI you'll need to first [create a key](https://serpapi.com/dashboard)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "4ce8c876-8bdb-4acc-9921-5aed52b57156", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "os.environ[\"SERPAPI_API_KEY\"] = \"YOURSERPAPIKEY\"\n", + "openai_api_key = find_secret(\"openai-api-key\")\n", + "openai_api_base = \"https://synapseml-openai.openai.azure.com/\"\n", + "openai_api_version = \"2022-12-01\"\n", + "openai_api_type = \"azure\"\n", + "\n", + "os.environ[\"OPENAI_API_TYPE\"] = openai_api_type\n", + "os.environ[\"OPENAI_API_VERSION\"] = openai_api_version\n", + "os.environ[\"OPENAI_API_BASE\"] = openai_api_base\n", + "os.environ[\"OPENAI_API_KEY\"] = openai_api_key\n", + "llm = AzureOpenAI(\n", + " deployment_name=\"text-davinci-003\",\n", + " model_name=\"text-davinci-003\",\n", + " temperature=0.1,\n", + " verbose=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d3e929d7-6df3-44e8-9508-55e65095bb35", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Step 4: Basic Usage of LangChain Transformer\n", + "\n", + "### Create a chain\n", + "We will start by demonstrating the basic usage with a simple chain that creates definitions for input words" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b5147d56-0a71-4c13-b0b8-41b2ac3f96f4", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "copy_prompt = PromptTemplate(\n", + " input_variables=[\"technology\"],\n", + " template=\"Define the following word: {technology}\",\n", + ")\n", + "\n", + "chain = LLMChain(llm=llm, prompt=copy_prompt)\n", + "transformer = (\n", + " LangchainTransformer()\n", + " .setInputCol(\"technology\")\n", + " .setOutputCol(\"definition\")\n", + " .setChain(chain)\n", + " .setSubscriptionKey(openai_api_key)\n", + " .setUrl(openai_api_base)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3221b3c0-d185-4c21-a8a9-1a97024156f6", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Create a dataset and apply the chain" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "57ba7d0b-be1a-40ba-b1f5-15001590d706", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# construction of test dataframe\n", + "df = spark.createDataFrame(\n", + " [(0, \"docker\"), (1, \"spark\"), (2, \"python\")], [\"label\", \"technology\"]\n", + ")\n", + "display(transformer.transform(df))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "85729799-aa27-4514-b1f4-78a023867039", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Save and load the LangChain transformer\n", + "LangChain Transformers can be saved and loaded. Note that LangChain serialization only works for chains that don't have memory." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "7e79dfc7-3c9e-4df2-9df1-07f8d588bb72", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "temp_dir = \"tmp\"\n", + "if not os.path.exists(temp_dir):\n", + " os.mkdir(temp_dir)\n", + "path = os.path.join(temp_dir, \"langchainTransformer\")\n", + "transformer.save(path)\n", + "loaded = LangchainTransformer.load(path)\n", + "display(loaded.transform(df))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "d63922ac-4f38-4d0e-b409-747078378821", + "showTitle": false, + "title": "" + } + }, + "source": [ + "## Step 5: Using LangChain for Large scale literature review" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "6674f86f-05e9-4cd7-8f9e-adf7821034da", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Create a Sequential Chain for paper summarization\n", + "\n", + "We will now construct a Sequential Chain for extracting structured information from an arxiv link. In particular, we will ask langchain to extract the title, author information, and a summary of the paper content. After that, we use a web search tool to find the recent papers written by the first author.\n", + "\n", + "To summarize, our sequential chain contains the following steps:\n", + "\n", + "1. **Transform Chain**: Extract Paper Content from arxiv Link **=>**\n", + "1. **LLMChain**: Summarize the Paper, extract paper title and authors **=>**\n", + "1. **Transform Chain**: to generate the prompt **=>**\n", + "1. **Agent with Web Search Tool**: Use Web Search to find the recent papers by the first author (this part is commented out as it needs the SerpAPIKey to run successfully)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "882a8f56-1a51-4fbd-b984-1df4b844f018", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "def paper_content_extraction(inputs: dict) -> dict:\n", + " arxiv_link = inputs[\"arxiv_link\"]\n", + " loader = OnlinePDFLoader(arxiv_link)\n", + " pages = loader.load_and_split()\n", + " return {\"paper_content\": pages[0].page_content + pages[1].page_content}\n", + "\n", + "\n", + "def prompt_generation(inputs: dict) -> dict:\n", + " output = inputs[\"Output\"]\n", + " prompt = (\n", + " \"find the paper title, author, summary in the paper description below, output them. After that, Use websearch to find out 3 recent papers of the first author in the author section below (first author is the first name separated by comma) and list the paper titles in bullet points: \\n\"\n", + " + output\n", + " + \".\"\n", + " )\n", + " return {\"prompt\": prompt}\n", + "\n", + "\n", + "paper_content_extraction_chain = TransformChain(\n", + " input_variables=[\"arxiv_link\"],\n", + " output_variables=[\"paper_content\"],\n", + " transform=paper_content_extraction,\n", + " verbose=False,\n", + ")\n", + "\n", + "paper_summarizer_template = \"\"\"You are a paper summarizer, given the paper content, it is your job to summarize the paper into a short summary, and extract authors and paper title from the paper content.\n", + "Here is the paper content:\n", + "{paper_content}\n", + "Output:\n", + "paper title, authors and summary.\n", + "\"\"\"\n", + "prompt = PromptTemplate(\n", + " input_variables=[\"paper_content\"], template=paper_summarizer_template\n", + ")\n", + "summarize_chain = LLMChain(llm=llm, prompt=prompt, verbose=False)\n", + "\n", + "sequential_chain = SimpleSequentialChain(\n", + " chains=[paper_content_extraction_chain, summarize_chain]\n", + ")\n", + "\n", + "\"\"\"\n", + "Uncomment the following when you have a SerpAPIKey to enable the final websearch component of the chain.\n", + "\"\"\"\n", + "# prompt_generation_chain = TransformChain(input_variables=[\"Output\"], output_variables=[\"prompt\"], transform=prompt_generation, verbose=False)\n", + "# tools = load_tools([\"serpapi\"], llm=llm)\n", + "# web_search_agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=False)\n", + "# sequential_chain = SimpleSequentialChain(chains=[\n", + "# paper_content_extraction_chain, summarize_chain, prompt_generation_chain, web_search_agent\n", + "# ])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "67309f0a-6c03-4c0b-89df-1f98d09f0ded", + "showTitle": false, + "title": "" + } + }, + "source": [ + "### Apply the LangChain transformer to perform this workload at scale\n", + "\n", + "We can now use our chain at scale using the `LangchainTransformer`" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9edc7b1c-46ab-45e9-9919-89d17b8740bf", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "paper_df = spark.createDataFrame(\n", + " [\n", + " (0, \"https://arxiv.org/pdf/2107.13586.pdf\"),\n", + " (1, \"https://arxiv.org/pdf/2101.00190.pdf\"),\n", + " (2, \"https://arxiv.org/pdf/2103.10385.pdf\"),\n", + " (3, \"https://arxiv.org/pdf/2110.07602.pdf\"),\n", + " ],\n", + " [\"label\", \"arxiv_link\"],\n", + ")\n", + "\n", + "# construct langchain transformer using the paper summarizer chain define above\n", + "paper_info_extractor = (\n", + " LangchainTransformer()\n", + " .setInputCol(\"arxiv_link\")\n", + " .setOutputCol(\"paper_info\")\n", + " .setChain(sequential_chain)\n", + " .setSubscriptionKey(openai_api_key)\n", + " .setUrl(openai_api_base)\n", + ")\n", + "\n", + "\n", + "# extract paper information from arxiv links, the paper information needs to include:\n", + "# paper title, paper authors, brief paper summary, and recent papers published by the first author\n", + "display(paper_info_extractor.transform(paper_df))" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "LangchainTransformer", + "widgets": {} + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}