File size: 2,038 Bytes
0c2ac5a
 
9dfc538
0c2ac5a
9dfc538
7989aa4
 
0c2ac5a
 
 
 
5f63c51
0c2ac5a
 
 
 
 
 
 
5078916
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from google.oauth2 import service_account
from google.auth.transport.requests import Request
from dotenv import load_dotenv
import json
import os
import logging


def get_access_token():
    # Load service account credentials from JSON file or environment variable
    credentials = service_account.Credentials.from_service_account_info(
        json.loads(os.getenv('ACCOUNT_CREDS')),
        scopes=['https://www.googleapis.com/auth/cloud-platform']
    )
    
    # Refresh token if needed
    if credentials.expired:
        credentials.refresh(Request())
    
    return credentials.token

def get_access_token_v1():
    """
    Generate a Google Cloud access token from service account credentials.
    Returns the access token string or raises an exception if failed.
    """
    try:
        # Set up logging
        logging.basicConfig(level=logging.INFO)
        logger = logging.getLogger(__name__)

        # Get credentials from environment variable
        creds_json = os.getenv('ACCOUNT_CREDS')
        if not creds_json:
            raise ValueError("ACCOUNT_CREDS environment variable not found")

        # Parse credentials JSON
        try:
            creds_dict = json.loads(creds_json)
        except json.JSONDecodeError:
            raise ValueError("Invalid JSON in ACCOUNT_CREDS")

        # Create credentials object
        credentials = service_account.Credentials.from_service_account_info(
            creds_dict,
            scopes=['https://www.googleapis.com/auth/cloud-platform']
        )

        # Ensure token is valid and refresh if needed
        if not credentials.valid:
            logger.info("Token expired or invalid, refreshing...")
            credentials.refresh(Request())
        
        if not credentials.token:
            raise ValueError("No token generated after refresh")

        logger.info("Successfully generated access token")
        return credentials.token

    except Exception as e:
        logger.error(f"Error generating access token: {str(e)}")
        raise