cloud-function-base/examples/examle_request.py
ry.yamafuji 47d1f0b982
Some checks failed
Python Test / python-test (push) Failing after 9s
テストとロガーを追加します
2025-12-06 03:38:24 +09:00

105 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Cloud Functionにリクエストを送信するサンプルコード
"""
import requests
BASE_URL = "http://localhost:8080"
def example_get_request():
"""GETリクエストの例"""
print("=== GET Request ===")
# クエリパラメータを指定
params = {"name": "Python"}
response = requests.get(BASE_URL, params=params)
response.raise_for_status()
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")
def example_get_request_no_params():
"""GETリクエストパラメータなしの例"""
print("=== GET Request (No Parameters) ===")
response = requests.get(BASE_URL)
response.raise_for_status()
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")
def example_post_request():
"""POSTリクエストの例"""
print("=== POST Request ===")
# JSONデータを送信
data = {"name": "Python"}
headers = {"Content-Type": "application/json"}
response = requests.post(BASE_URL, json=data, headers=headers)
response.raise_for_status()
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")
def example_put_request():
"""PUTリクエストの例サポートされていないメソッド"""
print("=== PUT Request (Unsupported Method) ===")
data = {"name": "Python"}
headers = {"Content-Type": "application/json"}
response = requests.put(BASE_URL, json=data, headers=headers)
response.raise_for_status()
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")
def example_event_request():
"""Cloud Eventリクエストの例"""
print("=== Cloud Event Request ===")
# Cloud Eventのペイロードを作成
event_payload = {
"message": {
"data": "aGVsbG8tbG9jYWw=" # "hello-local"をBase64エンコードしたもの
}
}
headers = {
"Content-Type": "application/json",
"Ce-Specversion": "1.0",
"Ce-Type": "google.cloud.pubsub.topic.v1.messagePublished",
"Ce-Source": "//pubsub.googleapis.com/projects/test-project/topics/test-topic",
"Ce-Id": "1234567890",
"Ce-Time": "2025-12-06T00:00:00Z",
}
response = requests.post(BASE_URL, json=event_payload, headers=headers)
response.raise_for_status()
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")
if __name__ == "__main__":
print("Cloud Function Request Examples")
print(f"Target URL: {BASE_URL}")
print("=" * 50)
print()
try:
# 各種リクエストの実行例
example_get_request()
example_get_request_no_params()
example_post_request()
example_put_request()
print("All examples completed!")
except requests.exceptions.ConnectionError:
print("Error: Could not connect to the server.")
print("Please make sure the Cloud Function is running on port 8080.")
print("\nStart the server with:")
print(
"functions-framework --source=src/main.py --target=main --signature-type=http --port=8080"
)
except Exception as e:
print(f"Error: {e}")