Skip to main content
# ⚫ BLACK_BLOCK_RED_MASTER_SETUP v13.2 — SAFE MODE (NO THREAD LIMIT ISSUES)
# Fixes:
# - Removed background thread (sandbox can't spawn threads)
# - Converted async system to inline-safe execution
# - Preserved API + tests
########################################
# ⚫ ENV CONFIG
########################################
import os, uuid, time, json, hashlib, threading
class Settings:
APP_NAME = "BLACK_BLOCK_MASTER"
ENV = os.getenv("ENV", "prod")
DIFFICULTY_PREFIX = os.getenv("DIFFICULTY_PREFIX", "000")
API_KEY = os.getenv("API_KEY", "secret123")
RATE_LIMIT = int(os.getenv("RATE_LIMIT", "50"))
settings = Settings()
########################################
# ⚫ RATE LIMIT
########################################
class RateLimiter:
def __init__(self):
self.calls = 0
def allow(self):
self.calls += 1
return self.calls <= settings.RATE_LIMIT
rate_limiter = RateLimiter()
########################################
# ⚫ BLOCKCHAIN
########################################
class Blockchain:
def __init__(self):
self.chain = []
self.create_genesis()
def create_genesis(self):
genesis = {
"index": 0,
"timestamp": time.time(),
"data": {"genesis": True},
"prev_hash": "0",
"nonce": 0
}
genesis["hash"] = self.hash_block(genesis)
self.chain.append(genesis)
def hash_block(self, block):
b = dict(block)
b.pop("hash", None)
return hashlib.sha256(json.dumps(b, sort_keys=True).encode()).hexdigest()
def proof_of_work(self, block):
nonce = 0
while True:
block["nonce"] = nonce
h = self.hash_block(block)
if h.startswith(settings.DIFFICULTY_PREFIX):
return nonce, h
nonce += 1
def write(self, data):
prev = self.chain[-1]
block = {
"index": len(self.chain),
"timestamp": time.time(),
"data": data,
"prev_hash": prev["hash"],
"nonce": 0
}
nonce, h = self.proof_of_work(block)
block["nonce"] = nonce
block["hash"] = h
self.chain.append(block)
return block
def is_valid(self):
for i in range(1, len(self.chain)):
c, p = self.chain[i], self.chain[i-1]
if c["prev_hash"] != p["hash"]:
return False
if self.hash_block(c) != c["hash"]:
return False
return True
def get_chain(self):
return self.chain
blockchain = Blockchain()
chain_lock = threading.Lock()
data_lock = threading.Lock()
########################################
# ⚫ AI ANALYZER
########################################
class Analyzer:
def analyze(self, text: str):
t = (text or "").lower()
signals = {
"growth": any(k in t for k in ["büyü", "scale", "growth"]),
"risk": any(k in t for k in ["risk", "tehlike", "uncertain"]),
"aggressive": any(k in t for k in ["hack", "zorla", "manipüle"]),
"build": any(k in t for k in ["kur", "build", "create"])
}
score = 1.0
if signals["aggressive"]:
score -= 0.7
if signals["risk"]:
score -= 0.2
if signals["growth"]:
score += 0.1
if signals["build"]:
score += 0.1
score = max(0.0, min(1.0, round(score, 4)))
explanation = [k for k, v in signals.items() if v] or ["neutral"]
return {
"score": score,
"signals": signals,
"explanation": explanation
}
analyzer = Analyzer()
########################################
# ⚫ CORE ENGINE
########################################
class UltraCore:
def decision(self, payload):
if not isinstance(payload, dict):
payload = {}
text = payload.get("hedef", "")
ai = analyzer.analyze(text)
result = {
"id": str(uuid.uuid4()),
"durum": "APPROVED" if ai["score"] >= 0.5 else "BLOCKED",
"skor": ai["score"],
"analiz": ai,
"aksiyon": text,
"timestamp": time.time()
}
with chain_lock:
block = blockchain.write(result.copy())
result["blockchain"] = block
return result
engine = UltraCore()
########################################
# ⚫ STORAGE + MONETIZATION
########################################
# (DEDUP FIXED — single source of truth)
########################################
# pricing config (FIX: previously missing -> NameError)
PRICING = {
"starter": {"credits": 10, "price": 5},
"pro": {"credits": 50, "price": 15},
"ultra": {"credits": 200, "price": 40}
}
VALID_KEYS = set([settings.API_KEY])
USER_HISTORY = {}
USER_CREDITS = {settings.API_KEY: 10}
COST_PER_CALL = 1
RESULT_STORE = {}
def add_credits(key, amount):
with data_lock:
USER_CREDITS[key] = USER_CREDITS.get(key, 0) + amount
def generate_api_key():
key = str(uuid.uuid4())
VALID_KEYS.add(key)
USER_HISTORY[key] = []
USER_CREDITS[key] = 10
return key
def generate_api_key():
key = str(uuid.uuid4())
VALID_KEYS.add(key)
USER_HISTORY[key] = []
USER_CREDITS[key] = 10
return key
########################################
# ⚫ WSGI
########################################
def parse_json_safe(body):
try:
return json.loads(body)
except:
return {}
def application(environ, start_response):
try:
path = environ.get("PATH_INFO", "/")
method = environ.get("REQUEST_METHOD", "GET")
headers = [("Content-Type", "application/json")]
if path == "/pricing":
start_response("200 OK", headers)
return [json.dumps(PRICING).encode()]
if path == "/buy":
key = environ.get("HTTP_X_API_KEY")
if key not in VALID_KEYS:
start_response("403 FORBIDDEN", headers)
return [json.dumps({"error":"UNAUTHORIZED_ACCESS"}).encode()]
query = environ.get("QUERY_STRING", "")
params = dict(x.split("=") for x in query.split("&") if "=" in x)
plan = params.get("plan")
if plan not in PRICING:
start_response("400 BAD REQUEST", headers)
return [json.dumps({"error":"invalid_plan"}).encode()]
add_credits(key, PRICING[plan]["credits"])
start_response("200 OK", headers)
return [json.dumps({
"status":"credits_added",
"credits": USER_CREDITS[key]
}).encode()]
if path == "/register":
key = generate_api_key()
start_response("200 OK", headers)
return [json.dumps({"api_key": key}).encode()]
if path == "/async_execute":
key = environ.get("HTTP_X_API_KEY")
if key not in VALID_KEYS:
start_response("403 FORBIDDEN", headers)
return [json.dumps({"error": "UNAUTHORIZED_ACCESS"}).encode()]
if USER_CREDITS.get(key, 0) < COST_PER_CALL:
start_response("402 PAYMENT REQUIRED", headers)
return [json.dumps({"error": "NO_CREDITS"}).encode()]
with data_lock:
USER_CREDITS[key] -= COST_PER_CALL
raw_len = environ.get("CONTENT_LENGTH")
length = int(raw_len) if raw_len and raw_len.isdigit() else 0
body = environ["wsgi.input"].read(length) if length > 0 else b""
data = parse_json_safe(body)
task_id = str(uuid.uuid4())
# SAFE: process inline (no threads)
result = engine.decision(data)
RESULT_STORE[task_id] = result
start_response("200 OK", headers)
return [json.dumps({"task_id": task_id}).encode()]
if path == "/result":
query = environ.get("QUERY_STRING", "")
params = dict(x.split("=") for x in query.split("&") if "=" in x)
task_id = params.get("task_id")
result = RESULT_STORE.get(task_id)
if not result:
start_response("202 ACCEPTED", headers)
return [json.dumps({"status": "processing"}).encode()]
start_response("200 OK", headers)
return [json.dumps(result).encode()]
if path == "/execute":
key = environ.get("HTTP_X_API_KEY")
if key not in VALID_KEYS:
start_response("403 FORBIDDEN", headers)
return [json.dumps({"error": "UNAUTHORIZED_ACCESS"}).encode()]
if USER_CREDITS.get(key, 0) < COST_PER_CALL:
start_response("402 PAYMENT REQUIRED", headers)
return [json.dumps({"error": "NO_CREDITS"}).encode()]
with data_lock:
USER_CREDITS[key] -= COST_PER_CALL
if method != "POST":
start_response("405 METHOD NOT ALLOWED", headers)
return [json.dumps({"error": "method_not_allowed"}).encode()]
raw_len = environ.get("CONTENT_LENGTH")
length = int(raw_len) if raw_len and raw_len.isdigit() else 0
body = environ["wsgi.input"].read(length) if length > 0 else b""
data = parse_json_safe(body)
res = engine.decision(data)
USER_HISTORY.setdefault(key, []).append(res)
start_response("200 OK", headers)
return [json.dumps(res).encode()]
if path == "/verify":
start_response("200 OK", headers)
return [json.dumps({"valid": blockchain.is_valid()}).encode()]
start_response("404 NOT FOUND", headers)
return [json.dumps({"error": "not_found"}).encode()]
except Exception as e:
start_response("500 INTERNAL SERVER ERROR", headers)
return [json.dumps({"error": str(e)}).encode()]
########################################
# ⚫ TESTS
########################################
# helper to reset shared state
def _reset_state():
rate_limiter.calls = 0
def test_pricing():
env = {"PATH_INFO": "/pricing", "REQUEST_METHOD": "GET"}
res = application(env, lambda s,h: None)
assert b"starter" in res[0]
def test_buy_flow():
key = generate_api_key()
env = {
"PATH_INFO": "/buy",
"REQUEST_METHOD": "GET",
"HTTP_X_API_KEY": key,
"QUERY_STRING": "plan=starter"
}
res = application(env, lambda s,h: None)
assert b"credits_added" in res[0]
########################################
def _reset_state():
rate_limiter.calls = 0
def test_auth_fail():
_reset_state()
env = {"PATH_INFO": "/execute", "REQUEST_METHOD": "POST", "HTTP_X_API_KEY": "bad"}
res = application(env, lambda s, h: None)
assert b"UNAUTHORIZED_ACCESS" in res[0]
def test_execute_success():
_reset_state()
payload = json.dumps({"hedef": "build startup"}).encode()
env = {
"PATH_INFO": "/execute",
"REQUEST_METHOD": "POST",
"HTTP_X_API_KEY": settings.API_KEY,
"CONTENT_LENGTH": str(len(payload)),
"wsgi.input": type("obj", (), {"read": lambda self, n: payload})()
}
res = application(env, lambda s, h: None)
assert b"durum" in res[0]
def test_async_flow():
_reset_state()
key = generate_api_key()
payload = json.dumps({"hedef": "async test"}).encode()
env = {
"PATH_INFO": "/async_execute",
"REQUEST_METHOD": "POST",
"HTTP_X_API_KEY": key,
"CONTENT_LENGTH": str(len(payload)),
"wsgi.input": type("obj", (), {"read": lambda self, n: payload})()
}
res = application(env, lambda s, h: None)
data = json.loads(res[0].decode())
assert "task_id" in data
########################################
# ⚫ RUN
########################################
if __name__ == "__main__":
print("SYSTEM READY")