File size: 4,484 Bytes
e67043b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import requests
import json
from datetime import date, datetime, timedelta
import os
from ..tool import Tool


def build_tool(config) -> Tool:
    tool = Tool(
        "Stock Info",
        "Look up stock information",
        name_for_model="Stock",
        description_for_model="Plugin for look up stock information",
        logo_url="https://your-app-url.com/.well-known/logo.png",
        contact_email="[email protected]",
        legal_info_url="[email protected]",
    )

    functions = [
        "TIME_SERIES_INTRADAY",
        "TIME_SERIES_INTRADAY_EXTENDED",
        "TIME_SERIES_DAILY",
        "TIME_SERIES_DAILY_ADJUSTED",
    ]
    types = ["open", "close", "high", "low"]

    KEY = config["subscription_key"]
    BASE_URL = "https://www.alphavantage.co/query?"

    def get_json_data(
        function,
        symbol,
        interval="5min",
        adjusted="true",
        outputsize="compact",
        datatype="json",
    ):
        url = BASE_URL + "function=" + function + "&symbol=" + symbol + "&apikey=" + KEY
        r = requests.get(url)
        data = json.loads(r.text)
        return data

    @tool.get("/get_today_date")
    def get_today_date():
        """Get today's date"""
        today = date.today()
        return today.strftime("%Y-%m-%d")

    @tool.get("/add_date")
    def add_date(date: str, days: int):
        """Add days to a date. Date should be pass as 'yyyy-mm-dd'."""
        date = datetime.strptime(date, "%Y-%m-%d")
        new_date = date + timedelta(days=days)
        return new_date.strftime("%Y-%m-%d")

    @tool.get("/get_daily_prices")
    def get_daily_prices(symbol: str, date: str = ""):
        """Get the stock price of an entity in the stock market. Date should be pass as 'yyyy-mm-dd'."""
        if "," in symbol:
            symbol, date = symbol.split(",")
        if date.strip() == "":
            return "Please specify a date and try again. You can you get_today_date to up-to-date time information."
        data = get_json_data("TIME_SERIES_DAILY_ADJUSTED", symbol)
        # print(data.keys())
        time_series = data["Time Series (Daily)"]
        final_time = ""
        print(time_series)
        # 查找最接近查询日期的数据
        for timestamp, daily_data in time_series.items():
            print(timestamp)
            if timestamp == date:
                open_price = daily_data["1. open"]
                high_price = daily_data["2. high"]
                low_price = daily_data["3. low"]
                close_price = daily_data["4. close"]
                volume = daily_data["6. volume"]
                break
            elif timestamp < date:
                final_time = timestamp
                open_price = time_series[timestamp]["1. open"]
                high_price = time_series[timestamp]["2. high"]
                low_price = time_series[timestamp]["3. low"]
                close_price = time_series[timestamp]["4. close"]
                volume = time_series[timestamp]["6. volume"]
                break
        return {
            "open": open_price,
            "close": close_price,
            "high": high_price,
            "low": low_price,
            "symbol": symbol,
            "date": final_time,
            "volume": volume,
        }

    @tool.get("/get_open_info")
    def get_open_info(region: str = "United States"):
        """get information about if the market in the region is open"""
        url = "https://www.alphavantage.co/query?function=MARKET_STATUS&apikey=" + KEY
        r = requests.get(url)
        data = json.loads(r.text)
        for item in data["markets"]:
            if item["region"] == region:
                return item["current_status"]
        return " not found"

    @tool.get("/get_exchange_rate")
    def get_exchange_rate(from_currency: str = "USD", to_currency: str = "BTC"):
        """This API returns the realtime exchange rate for a pair of digital currency (e.g., Bitcoin) and physical currency (e.g., USD)."""
        url = (
            "https://www.alphavantage.co/query?function=CURRENCY_EXCHANGE_RATE&from_currency="
            + from_currency
            + "&to_currency="
            + to_currency
            + "&apikey="
            + KEY
        )
        r = requests.get(url)
        data = json.loads(r.text)
        try:
            rate = data["Realtime Currency Exchange Rate"]["5. Exchange Rate"]
            return rate
        except:
            return "error"

    return tool