opencv | create live stream | bind youtube stream | Search

The code imports necessary libraries, defines constants and functions for interacting with the YouTube API, and exports two main functions: list_livestream and youtube_authorize. These functions allow users to authorize with the YouTube API and list their live streams, with the option to sanitize filenames and handle OAuth 2.0 credentials.

Run example

npm run import -- "list live stream"

list live stream

import os
import re
import json
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
import_notebook("authorize youtube in python",
"globals("))


SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
HOME_DIR = os.environ.get("HOME") or os.environ.get("USERPROFILE")

def youtube_authorize():

    # Load OAuth 2.0 credentials from a saved token file
    sanitized_name = sanitize_filename("_".join(SCOPES))
    filename = f"{os.path.join(HOME_DIR, '.credentials')}/{sanitized_name}.json"
    #credentials = Credentials.from_authorized_user_file(filename, SCOPES)


    with open(filename, "r") as f:
        credentials_dict = json.load(f)

    if(not credentials_dict or not os.path.exists(filename)):
        authorize()

    # Load credentials from the session.
    credentials = Credentials(
    credentials_dict["token"],
    refresh_token = credentials_dict["refresh_token"],
    token_uri = credentials_dict["token_uri"],
    client_id = credentials_dict["client_id"],
    client_secret = credentials_dict["client_secret"],
    scopes = SCOPES)
    return credentials

def list_livestream():
    credentials = youtube_authorize()

    # Build the YouTube API client
    youtube = build("youtube", "v3", credentials=credentials)

    request = youtube.liveStreams().list(
        part="snippet,cdn,status",
        mine=True)

    response = request.execute()

    filtered_streams = [
        stream for stream in response["items"]
        if stream["status"]["streamStatus"] != "active"
    ]

    return filtered_streams[0]

def sanitize_filename(name):
    """Sanitize a string to be a valid filename."""
    return re.sub(r"[^\w.-]", "_", name)  # Replace invalid characters with "_"


__all__ = {
  "list_livestream": list_livestream,
  "youtube_authorize": youtube_authorize
}

What the code could have been:

import os
import re
import json
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

def youtube_authorize() -> Credentials:
    """
    Load OAuth 2.0 credentials from a saved token file or authorize a new user.

    Returns:
        A Credentials object for the YouTube API client.
    """
    SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
    HOME_DIR = os.environ.get("HOME") or os.environ.get("USERPROFILE")

    sanitized_name = sanitize_filename("_".join(SCOPES))
    filename = f"{os.path.join(HOME_DIR, '.credentials')}/{sanitized_name}.json"

    try:
        with open(filename, "r") as f:
            credentials_dict = json.load(f)
    except FileNotFoundError:
        # If credentials file does not exist, authorize a new user
        authorize()
        return youtube_authorize()

    credentials = Credentials(
        credentials_dict["token"],
        refresh_token=credentials_dict["refresh_token"],
        token_uri=credentials_dict["token_uri"],
        client_id=credentials_dict["client_id"],
        client_secret=credentials_dict["client_secret"],
        scopes=SCOPES
    )
    return credentials

def list_livestream() -> dict:
    """
    List the user's live streams.

    Returns:
        A dictionary containing the first live stream with an active status.
    """
    credentials = youtube_authorize()

    # Build the YouTube API client
    youtube = build("youtube", "v3", credentials=credentials)

    try:
        request = youtube.liveStreams().list(
            part="snippet,cdn,status",
            mine=True)

        response = request.execute()

        # Filter streams with inactive status
        filtered_streams = [stream for stream in response["items"]
                            if stream["status"]["streamStatus"] == "active"]

        return filtered_streams[0]
    except HttpError as e:
        print(f"An error occurred: {e}")
        return None

def sanitize_filename(name: str) -> str:
    """
    Sanitize a string to be a valid filename.

    Replaces invalid characters with "_".

    Args:
        name: The string to sanitize.

    Returns:
        A sanitized string.
    """
    return re.sub(r"[^\w.-]", "_", name)

def authorize() -> None:
    """
    Authorize a new user with the YouTube API.

    This function is a placeholder and should be implemented separately.

    TODO: Implement authorization logic
    """
    print("Authorize a new user with the YouTube API")
    # TODO: Implement authorization logic

if __name__ == "__main__":
    print(list_livestream())

Code Breakdown

Importing Libraries

The code starts by importing the necessary libraries:

Defining Constants and Functions

The code defines two constants:

It also defines two functions:

Helper Function

The code defines a helper function:

Exporting Functions

The code exports two functions:

These functions are available for use in other parts of the code.