Skip to content

Commit

Permalink
fix: gsheet request too large -> use a function created in data
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorentLvr committed Mar 13, 2024
1 parent 9b4fac7 commit 88a6be3
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 91 deletions.
7 changes: 3 additions & 4 deletions __pipeline__.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,10 @@
" prompt=\"Find timezone in the format 'Region/City'. If there is no exact match, please return a subjective answer based on the data you received\",\n",
" message=f\"Region: {region}, Country: {country}\",\n",
" )\n",
" print(\"- Timezone:\", timezone)\n",
" print()\n",
" naas.set_remote_timezone(timezone)\n",
" pdump(output_dir, timezone, \"timezone\")\n",
" naas.dependency.add(os.path.join(output_dir, \"timezone.pickle\"))\n",
" print(\"- Timezone:\", timezone)\n",
" naas.set_remote_timezone(timezone)\n",
" print()\n",
" \n",
" # Create notebook steps\n",
Expand Down Expand Up @@ -358,7 +357,7 @@
"outputs": [],
"source": [
"# Schedule pipeline\n",
"cron = \"0 8 * * *\"\n",
"cron = \"0 12 * * *\"\n",
"print(\"⏰ Scheduler:\", cron)\n",
"naas.scheduler.add(cron=cron)"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
"outputs": [],
"source": [
"df_init = gsheet.connect(spreadsheet_url).get(sheet_name=sheet_contacts)\n",
"df_init = pload(output_dir, file_contacts)\n",
"if not isinstance(df_init, pd.DataFrame):\n",
" df_init = pd.DataFrame()\n",
" messaging_options = {}\n",
Expand Down Expand Up @@ -430,7 +431,9 @@
" messaging = create_chat_completion(api_key, prompt_messaging, str(tmp_df.to_dict()))\n",
" except Exception as e:\n",
" print(e)\n",
" print(\"Messaging options:\", messaging)\n",
" if str(messaging) == \"None\":\n",
" messaging = \"TBD\"\n",
" print(\"Messaging options:\", messaging)\n",
" print()\n",
" messaging_options[profile_url] = messaging\n",
" pdump(output_dir, messaging_options, \"messaging_options\")\n",
Expand Down Expand Up @@ -491,31 +494,14 @@
{
"cell_type": "code",
"execution_count": null,
"id": "32e91b35-c1f8-4fe6-ae78-a2d4b79c8be7",
"id": "ea691820-773d-4148-943b-34aad838a759",
"metadata": {
"papermill": {},
"tags": []
},
"outputs": [],
"source": [
"df_check = pd.concat([df_init.astype(str), df_contacts.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" if len(df_contacts) < 3000:\n",
" gsheet.connect(spreadsheet_url).send(data=df_contacts, sheet_name=sheet_contacts, append=False)\n",
" else:\n",
" gsheet.connect(spreadsheet_url).send(data=df_contacts[:3000], sheet_name=sheet_contacts, append=False)\n",
" gsheet.connect(spreadsheet_url).send(data=df_contacts[3000:], sheet_name=sheet_contacts, append=True)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\")"
"send_data_to_gsheet(df_contacts, df_init, spreadsheet_url, sheet_contacts)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ea691820-773d-4148-943b-34aad838a759",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
"# Inputs\n",
"entity_dir = pload(os.path.join(naas_data_product.OUTPUTS_PATH, \"entities\", \"0\"), \"entity_dir\")\n",
"input_dir = os.path.join(entity_dir, \"growth-engine\", date.today().isoformat())\n",
"file_interactions = \"linkedin_interactions\"\n",
"file_interactions = \"interactions\"\n",
"\n",
"# Outputs\n",
"spreadsheet_url = pload(os.path.join(naas_data_product.OUTPUTS_PATH, \"entities\", \"0\"), \"abi_spreadsheet\")\n",
Expand Down Expand Up @@ -439,17 +439,13 @@
},
"outputs": [],
"source": [
"df_check = pd.concat([df_init.astype(str), df_growth.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" gsheet.connect(spreadsheet_url).send(sheet_name=sheet_growth, data=df_growth, append=False)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\")"
"send_data_to_gsheet(df_growth, df_init, spreadsheet_url, sheet_growth)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "13cc4241-9a56-4495-9369-bf3de2bcfb42",
"id": "c81ada64-6da0-4c75-a433-751e28b62b47",
"metadata": {},
"outputs": [],
"source": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
"\n",
"# Outputs\n",
"output_dir = os.path.join(entity_dir, \"growth-engine\", date.today().isoformat())\n",
"output_file = \"linkedin_interactions\"\n",
"output_file = \"interactions\"\n",
"spreadsheet_url = pload(os.path.join(naas_data_product.OUTPUTS_PATH, \"entities\", \"0\"), \"abi_spreadsheet\")\n",
"sheet_interaction = \"INTERACTIONS\""
]
Expand Down Expand Up @@ -216,11 +216,7 @@
" tmp_posts_url = tmp_df_reactions[\"POST_URL\"].unique().tolist()\n",
" for x in tmp_posts_url:\n",
" if x not in posts_url:\n",
" # Histo\n",
" if date_dir < date(2024, 5 , 1):\n",
" tmp_df_reactions[\"DATE_REACTION\"] = pd.to_datetime(tmp_df_reactions['PUBLISHED_DATE'], format='%Y-%m-%d %H:%M:%S%z').dt.tz_convert(TIMEZONE).dt.strftime(\"%Y-%m-%d %H:%M:%S%z\")\n",
" else:\n",
" tmp_df_reactions[\"DATE_REACTION\"] = tmp_df_reactions['PUBLISHED_DATE']\n",
" tmp_df_reactions[\"DATE_REACTION\"] = tmp_df_reactions['PUBLISHED_DATE']\n",
" posts_url.append(x)\n",
" else:\n",
" tmp_df_reactions[\"DATE_REACTION\"] = pd.to_datetime(tmp_df_reactions['DATE_EXTRACT'], format='%Y-%m-%d %H:%M:%S').dt.tz_localize(pytz.timezone(\"Europe/Paris\")).dt.tz_convert(TIMEZONE).dt.strftime(\"%Y-%m-%d %H:%M:%S%z\")\n",
Expand Down Expand Up @@ -329,10 +325,10 @@
" \n",
" # Concat df\n",
" df = pd.concat([df1, df2]).reset_index(drop=True)\n",
" \n",
"\n",
" # Exclude Entity from Full name\n",
" if len(df) > 0:\n",
" df.insert(loc=2, column=\"DATE\", value=pd.to_datetime(df['DATE_INTERACTION'], format=\"%Y-%m-%d %H:%M:%S%z\").dt.strftime(\"%a. %d %b.\"))\n",
" df.insert(loc=2, column=\"DATE\", value=pd.to_datetime(df['DATE_INTERACTION'].str[:19], format=\"%Y-%m-%d %H:%M:%S\").dt.strftime(\"%a. %d %b.\"))\n",
" entity = df.loc[0 , \"ENTITY\"]\n",
" df = df[df[\"FULLNAME\"] != entity]\n",
" \n",
Expand Down Expand Up @@ -419,17 +415,13 @@
},
"outputs": [],
"source": [
"df_check = pd.concat([df_init.astype(str), df_interactions.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" gsheet.connect(spreadsheet_url).send(sheet_name=sheet_interaction, data=df_interactions, append=False)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\")"
"send_data_to_gsheet(df_interactions, df_init, spreadsheet_url, sheet_interaction)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "29368ef4-de34-44bc-8d8a-346909b8ea30",
"id": "c15fc896-7362-4ab9-8c99-6c45fc41c9d1",
"metadata": {},
"outputs": [],
"source": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
"id": "4a5d888e-9fa7-4fe9-983b-c028cd0bb2a9",
"metadata": {},
"source": [
"### Get companies"
"### Get organizations"
]
},
{
Expand Down Expand Up @@ -611,12 +611,13 @@
" # Update org name with company name from LinkedIn\n",
" df = df_organizations.copy()\n",
" df = df[(df[\"ORGANIZATION\"] != df[\"ORG_NAME\"]) & (df[\"ORG_LINKEDIN_ID\"] != \"TBD\")]\n",
" org_names_1 = get_dict_from_df(df, \"ORG_NAME\", \"ORGANIZATION\", f\"organizations_names_{datetime.now().isoformat()}\", output_dir)\n",
" org_names_1 = get_dict_from_df(df, \"ORG_NAME\", \"ORGANIZATION\", f\"organizations_names\", output_dir)\n",
" print(\"-> New Organization to be updated in People db:\", len(org_names_1))\n",
" if len(org_names_1) > 0:\n",
" df_org, df_p = update_dfs_from_dict(org_names_1, df_p, df_org)\n",
" org_lk_urls = pload(output_dir, \"org_lk_urls\")\n",
" for key, value in org_names_1.items():\n",
" key = key.replace(\"Not Found\", \"\")\n",
" org_lk_urls[value] = org_lk_urls[key]\n",
" pdump(output_dir, org_lk_urls, \"org_lk_urls\")\n",
" \n",
Expand All @@ -626,7 +627,7 @@
" # Get org name with data tbd with same LinkedIn URL as org found\n",
" df1 = df_org.copy()\n",
" df1 = df1[(df1[\"LINKEDIN_URL\"] != \"NA\") & (df1[\"ORG_LINKEDIN_ID\"] != \"TBD\")]\n",
" organizations_org_urls = get_dict_from_df(df1, \"ORGANIZATION\", \"LINKEDIN_URL\", f\"organizations_org_urls_{datetime.now().isoformat()}\", output_dir)\n",
" organizations_org_urls = get_dict_from_df(df1, \"ORGANIZATION\", \"LINKEDIN_URL\", f\"organizations_org_urls\", output_dir)\n",
"\n",
" # Get org name with data tbd with similar LinkedIn URL\n",
" df2 = df_org.copy()\n",
Expand Down Expand Up @@ -717,18 +718,13 @@
{
"cell_type": "code",
"execution_count": null,
"id": "32e91b35-c1f8-4fe6-ae78-a2d4b79c8be7",
"id": "5098f953-0607-4225-b3f3-39e06dd301db",
"metadata": {
"papermill": {},
"tags": []
},
"outputs": [],
"source": [
"df_check = pd.concat([df_init.astype(str), df_organizations_u.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" gsheet.connect(spreadsheet_url).send(data=df_organizations_u, sheet_name=sheet_people_organizations, append=False)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\")"
"send_data_to_gsheet(df_organizations_u, df_init, spreadsheet_url, sheet_people_organizations)"
]
},
{
Expand All @@ -742,24 +738,19 @@
{
"cell_type": "code",
"execution_count": null,
"id": "f3c0277a-239f-4d5d-b34f-655c9f7c0028",
"id": "23bcfff6-e962-4191-b79c-164ab9f65ff6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"df_check = pd.concat([df_people.astype(str), df_people_u.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" pdump(output_dir, df_people_u, file_people)\n",
" gsheet.connect(spreadsheet_url).send(data=df_people_u, sheet_name=sheet_people, append=False)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\")"
"send_data_to_gsheet(df_people_u, df_people, spreadsheet_url, sheet_people)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "23bcfff6-e962-4191-b79c-164ab9f65ff6",
"id": "f0ca1da1-617f-459e-a698-c5bb695a83be",
"metadata": {},
"outputs": [],
"source": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
"# Inputs\n",
"entity_dir = pload(os.path.join(naas_data_product.OUTPUTS_PATH, \"entities\", \"0\"), \"entity_dir\")\n",
"input_dir = os.path.join(entity_dir, \"growth-engine\", date.today().isoformat())\n",
"file_interactions = \"linkedin_interactions\"\n",
"file_interactions = \"interactions\"\n",
"api_key = os.environ.get(\"NAAS_API_TOKEN\") or naas.secret.get('NAAS_API_TOKEN')\n",
"li_at = os.environ.get(\"LINKEDIN_LI_AT\") or naas.secret.get(\"LINKEDIN_LI_AT\")\n",
"JSESSIONID = os.environ.get(\"LINKEDIN_JSESSIONID\") or naas.secret.get(\"LINKEDIN_JSESSIONID\")\n",
Expand Down Expand Up @@ -676,23 +676,19 @@
{
"cell_type": "code",
"execution_count": null,
"id": "59a498dd-a1d3-46fd-8b81-efa4df1813f1",
"id": "6219b07a-9052-4f7f-8754-b2aa22eeabc7",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"df_check = pd.concat([df_init.astype(str), df_people.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" gsheet.connect(spreadsheet_url).send(data=df_people, sheet_name=sheet_people, append=False)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\") "
"send_data_to_gsheet(df_people, df_init, spreadsheet_url, sheet_people)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6219b07a-9052-4f7f-8754-b2aa22eeabc7",
"id": "5f9806ef-87dc-4582-8d32-772eb036c8ad",
"metadata": {},
"outputs": [],
"source": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
"\n",
"# Outputs\n",
"output_dir = os.path.join(entity_dir, \"growth-engine\", date.today().isoformat())\n",
"file_posts = \"linkedin_posts\"\n",
"file_posts = \"posts\"\n",
"file_reactions = \"linkedin_post_reactions\"\n",
"file_comments = \"linkedin_post_comments\""
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,30 +392,15 @@
"### Send data to Google Sheets spreadsheet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32e91b35-c1f8-4fe6-ae78-a2d4b79c8be7",
"metadata": {
"papermill": {},
"tags": []
},
"outputs": [],
"source": [
"df_check = pd.concat([db_deals.astype(str), df_init.astype(str)]).drop_duplicates(keep=False)\n",
"if len(df_check) > 0:\n",
" gsheet.connect(spreadsheet_url).send(sheet_name=sheet_name_output, data=db_deals, append=False)\n",
"else:\n",
" print(\"Noting to update in Google Sheets!\") "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "13cc4241-9a56-4495-9369-bf3de2bcfb42",
"metadata": {},
"outputs": [],
"source": []
"source": [
"send_data_to_gsheet(db_deals, df_init, spreadsheet_url, sheet_name_output)"
]
}
],
"metadata": {
Expand Down
61 changes: 60 additions & 1 deletion utils/data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
"import naas\n",
"from naas_drivers import linkedin\n",
"import naas_data_product\n",
"import shutil"
"import shutil\n",
"import pandas as pd"
]
},
{
Expand Down Expand Up @@ -136,6 +137,64 @@
" return None"
]
},
{
"cell_type": "markdown",
"id": "8cee202d-2085-46ff-ba31-54baee04f0be",
"metadata": {},
"source": [
"### Send data to Gsheet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c3baae15-fcf9-4c22-9940-fedbeb20b262",
"metadata": {},
"outputs": [],
"source": [
"def send_data_to_gsheet(\n",
" df: pd.DataFrame,\n",
" df_init: pd.DataFrame, \n",
" spreadsheet_url: str, \n",
" sheet_name: str, \n",
" chunck_size: int = 100000\n",
"):\n",
" \"\"\"\n",
" This function compares two dataframes and if they are different, sends the data from the second dataframe to a Google Sheet.\n",
"\n",
" :param df: The main dataframe to be sent to Google Sheet.\n",
" :param df_init: The initial dataframe to be compared with the main dataframe.\n",
" :param spreadsheet_url: The URL of the Google Sheet to send data to.\n",
" :param sheet_name: The name of the sheet in the Google Sheet to send data to.\n",
" :param chunck_size: The size of the chunks to split the data into for sending. Default is 100000.\n",
" \"\"\"\n",
" # Compare dataframes\n",
" df_check = pd.concat([df.astype(str), df_init.astype(str)]).drop_duplicates(keep=False)\n",
" \n",
" # Update or Do nothing\n",
" if len(df_check) > 0:\n",
" df_size = len(df) * len(df.columns)\n",
" if df_size < chunck_size:\n",
" gsheet.connect(spreadsheet_url).send(sheet_name=sheet_name, data=df, append=False)\n",
" print(f\"✅ DataFrame successfully sent to Google Sheets!\")\n",
" else:\n",
" max_rows = int(chunck_size / len(df.columns))\n",
" start = 0\n",
" limit = start + max_rows\n",
" gsheet.connect(spreadsheet_url).send(sheet_name=sheet_name, data=df[start:limit], append=False)\n",
" print(f\"✅ Rows {start} to {limit} successfully added to Google Sheets!\")\n",
" start += max_rows\n",
" while start < len(df):\n",
" limit = start + max_rows\n",
" if limit > len(df):\n",
" limit = len(df)\n",
" gsheet.connect(spreadsheet_url).send(sheet_name=sheet_name, data=df[start:limit], append=True)\n",
" print(f\"✅ Rows {start} to {limit} successfully added to Google Sheets!\")\n",
" start += max_rows\n",
" else:\n",
" print(\"Noting to update in Google Sheets!\")"
]
},
{
"cell_type": "markdown",
"id": "e929759d-1080-40df-912a-873fa21deb92",
Expand Down

0 comments on commit 88a6be3

Please sign in to comment.