105 lines
3.2 KiB
Python
105 lines
3.2 KiB
Python
"""
|
||
Cloud Functionにリクエストを送信するサンプルコード
|
||
"""
|
||
|
||
import requests
|
||
|
||
BASE_URL = "http://localhost:8080"
|
||
|
||
|
||
def example_get_request():
|
||
"""GETリクエストの例"""
|
||
print("=== GET Request ===")
|
||
# クエリパラメータを指定
|
||
params = {"name": "Python"}
|
||
response = requests.get(BASE_URL, params=params)
|
||
|
||
response.raise_for_status()
|
||
print(f"Status Code: {response.status_code}")
|
||
print(f"Response: {response.json()}")
|
||
|
||
|
||
def example_get_request_no_params():
|
||
"""GETリクエスト(パラメータなし)の例"""
|
||
print("=== GET Request (No Parameters) ===")
|
||
response = requests.get(BASE_URL)
|
||
|
||
response.raise_for_status()
|
||
print(f"Status Code: {response.status_code}")
|
||
print(f"Response: {response.json()}")
|
||
|
||
|
||
def example_post_request():
|
||
"""POSTリクエストの例"""
|
||
print("=== POST Request ===")
|
||
# JSONデータを送信
|
||
data = {"name": "Python"}
|
||
headers = {"Content-Type": "application/json"}
|
||
|
||
response = requests.post(BASE_URL, json=data, headers=headers)
|
||
|
||
response.raise_for_status()
|
||
print(f"Status Code: {response.status_code}")
|
||
print(f"Response: {response.json()}")
|
||
|
||
|
||
def example_put_request():
|
||
"""PUTリクエストの例(サポートされていないメソッド)"""
|
||
print("=== PUT Request (Unsupported Method) ===")
|
||
|
||
data = {"name": "Python"}
|
||
headers = {"Content-Type": "application/json"}
|
||
|
||
response = requests.put(BASE_URL, json=data, headers=headers)
|
||
|
||
response.raise_for_status()
|
||
print(f"Status Code: {response.status_code}")
|
||
print(f"Response: {response.json()}")
|
||
|
||
|
||
def example_event_request():
|
||
"""Cloud Eventリクエストの例"""
|
||
print("=== Cloud Event Request ===")
|
||
# Cloud Eventのペイロードを作成
|
||
event_payload = {
|
||
"message": {
|
||
"data": "aGVsbG8tbG9jYWw=" # "hello-local"をBase64エンコードしたもの
|
||
}
|
||
}
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Ce-Specversion": "1.0",
|
||
"Ce-Type": "google.cloud.pubsub.topic.v1.messagePublished",
|
||
"Ce-Source": "//pubsub.googleapis.com/projects/test-project/topics/test-topic",
|
||
"Ce-Id": "1234567890",
|
||
"Ce-Time": "2025-12-06T00:00:00Z",
|
||
}
|
||
response = requests.post(BASE_URL, json=event_payload, headers=headers)
|
||
response.raise_for_status()
|
||
print(f"Status Code: {response.status_code}")
|
||
print(f"Response: {response.json()}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("Cloud Function Request Examples")
|
||
print(f"Target URL: {BASE_URL}")
|
||
print("=" * 50)
|
||
print()
|
||
|
||
try:
|
||
# 各種リクエストの実行例
|
||
example_get_request()
|
||
example_get_request_no_params()
|
||
example_post_request()
|
||
example_put_request()
|
||
print("All examples completed!")
|
||
except requests.exceptions.ConnectionError:
|
||
print("Error: Could not connect to the server.")
|
||
print("Please make sure the Cloud Function is running on port 8080.")
|
||
print("\nStart the server with:")
|
||
print(
|
||
"functions-framework --source=src/main.py --target=main --signature-type=http --port=8080"
|
||
)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|