Skip to main content
# ⚫ BLACK_BLOCK_RED_MASTER_SETUP v13.2 — SAFE MODE (NO THREAD LIMIT ISSUES) # Fixes: # - Removed background thread (sandbox can't spawn threads) # - Converted async system to inline-safe execution # - Preserved API + tests ######################################## # ⚫ ENV CONFIG ######################################## import os, uuid, time, json, hashlib, threading class Settings: APP_NAME = "BLACK_BLOCK_MASTER" ENV = os.getenv("ENV", "prod") DIFFICULTY_PREFIX = os.getenv("DIFFICULTY_PREFIX", "000") API_KEY = os.getenv("API_KEY", "secret123") RATE_LIMIT = int(os.getenv("RATE_LIMIT", "50")) settings = Settings() ######################################## # ⚫ RATE LIMIT ######################################## class RateLimiter: def __init__(self): self.calls = 0 def allow(self): self.calls += 1 return self.calls <= settings.RATE_LIMIT rate_limiter = RateLimiter() ######################################## # ⚫ BLOCKCHAIN ######################################## class Blockchain: def __init__(self): self.chain = [] self.create_genesis() def create_genesis(self): genesis = { "index": 0, "timestamp": time.time(), "data": {"genesis": True}, "prev_hash": "0", "nonce": 0 } genesis["hash"] = self.hash_block(genesis) self.chain.append(genesis) def hash_block(self, block): b = dict(block) b.pop("hash", None) return hashlib.sha256(json.dumps(b, sort_keys=True).encode()).hexdigest() def proof_of_work(self, block): nonce = 0 while True: block["nonce"] = nonce h = self.hash_block(block) if h.startswith(settings.DIFFICULTY_PREFIX): return nonce, h nonce += 1 def write(self, data): prev = self.chain[-1] block = { "index": len(self.chain), "timestamp": time.time(), "data": data, "prev_hash": prev["hash"], "nonce": 0 } nonce, h = self.proof_of_work(block) block["nonce"] = nonce block["hash"] = h self.chain.append(block) return block def is_valid(self): for i in range(1, len(self.chain)): c, p = self.chain[i], self.chain[i-1] if c["prev_hash"] != p["hash"]: return False if self.hash_block(c) != c["hash"]: return False return True def get_chain(self): return self.chain blockchain = Blockchain() chain_lock = threading.Lock() data_lock = threading.Lock() ######################################## # ⚫ AI ANALYZER ######################################## class Analyzer: def analyze(self, text: str): t = (text or "").lower() signals = { "growth": any(k in t for k in ["büyü", "scale", "growth"]), "risk": any(k in t for k in ["risk", "tehlike", "uncertain"]), "aggressive": any(k in t for k in ["hack", "zorla", "manipüle"]), "build": any(k in t for k in ["kur", "build", "create"]) } score = 1.0 if signals["aggressive"]: score -= 0.7 if signals["risk"]: score -= 0.2 if signals["growth"]: score += 0.1 if signals["build"]: score += 0.1 score = max(0.0, min(1.0, round(score, 4))) explanation = [k for k, v in signals.items() if v] or ["neutral"] return { "score": score, "signals": signals, "explanation": explanation } analyzer = Analyzer() ######################################## # ⚫ CORE ENGINE ######################################## class UltraCore: def decision(self, payload): if not isinstance(payload, dict): payload = {} text = payload.get("hedef", "") ai = analyzer.analyze(text) result = { "id": str(uuid.uuid4()), "durum": "APPROVED" if ai["score"] >= 0.5 else "BLOCKED", "skor": ai["score"], "analiz": ai, "aksiyon": text, "timestamp": time.time() } with chain_lock: block = blockchain.write(result.copy()) result["blockchain"] = block return result engine = UltraCore() ######################################## # ⚫ STORAGE + MONETIZATION ######################################## # (DEDUP FIXED — single source of truth) ######################################## # pricing config (FIX: previously missing -> NameError) PRICING = { "starter": {"credits": 10, "price": 5}, "pro": {"credits": 50, "price": 15}, "ultra": {"credits": 200, "price": 40} } VALID_KEYS = set([settings.API_KEY]) USER_HISTORY = {} USER_CREDITS = {settings.API_KEY: 10} COST_PER_CALL = 1 RESULT_STORE = {} def add_credits(key, amount): with data_lock: USER_CREDITS[key] = USER_CREDITS.get(key, 0) + amount def generate_api_key(): key = str(uuid.uuid4()) VALID_KEYS.add(key) USER_HISTORY[key] = [] USER_CREDITS[key] = 10 return key def generate_api_key(): key = str(uuid.uuid4()) VALID_KEYS.add(key) USER_HISTORY[key] = [] USER_CREDITS[key] = 10 return key ######################################## # ⚫ WSGI ######################################## def parse_json_safe(body): try: return json.loads(body) except: return {} def application(environ, start_response): try: path = environ.get("PATH_INFO", "/") method = environ.get("REQUEST_METHOD", "GET") headers = [("Content-Type", "application/json")] if path == "/pricing": start_response("200 OK", headers) return [json.dumps(PRICING).encode()] if path == "/buy": key = environ.get("HTTP_X_API_KEY") if key not in VALID_KEYS: start_response("403 FORBIDDEN", headers) return [json.dumps({"error":"UNAUTHORIZED_ACCESS"}).encode()] query = environ.get("QUERY_STRING", "") params = dict(x.split("=") for x in query.split("&") if "=" in x) plan = params.get("plan") if plan not in PRICING: start_response("400 BAD REQUEST", headers) return [json.dumps({"error":"invalid_plan"}).encode()] add_credits(key, PRICING[plan]["credits"]) start_response("200 OK", headers) return [json.dumps({ "status":"credits_added", "credits": USER_CREDITS[key] }).encode()] if path == "/register": key = generate_api_key() start_response("200 OK", headers) return [json.dumps({"api_key": key}).encode()] if path == "/async_execute": key = environ.get("HTTP_X_API_KEY") if key not in VALID_KEYS: start_response("403 FORBIDDEN", headers) return [json.dumps({"error": "UNAUTHORIZED_ACCESS"}).encode()] if USER_CREDITS.get(key, 0) < COST_PER_CALL: start_response("402 PAYMENT REQUIRED", headers) return [json.dumps({"error": "NO_CREDITS"}).encode()] with data_lock: USER_CREDITS[key] -= COST_PER_CALL raw_len = environ.get("CONTENT_LENGTH") length = int(raw_len) if raw_len and raw_len.isdigit() else 0 body = environ["wsgi.input"].read(length) if length > 0 else b"" data = parse_json_safe(body) task_id = str(uuid.uuid4()) # SAFE: process inline (no threads) result = engine.decision(data) RESULT_STORE[task_id] = result start_response("200 OK", headers) return [json.dumps({"task_id": task_id}).encode()] if path == "/result": query = environ.get("QUERY_STRING", "") params = dict(x.split("=") for x in query.split("&") if "=" in x) task_id = params.get("task_id") result = RESULT_STORE.get(task_id) if not result: start_response("202 ACCEPTED", headers) return [json.dumps({"status": "processing"}).encode()] start_response("200 OK", headers) return [json.dumps(result).encode()] if path == "/execute": key = environ.get("HTTP_X_API_KEY") if key not in VALID_KEYS: start_response("403 FORBIDDEN", headers) return [json.dumps({"error": "UNAUTHORIZED_ACCESS"}).encode()] if USER_CREDITS.get(key, 0) < COST_PER_CALL: start_response("402 PAYMENT REQUIRED", headers) return [json.dumps({"error": "NO_CREDITS"}).encode()] with data_lock: USER_CREDITS[key] -= COST_PER_CALL if method != "POST": start_response("405 METHOD NOT ALLOWED", headers) return [json.dumps({"error": "method_not_allowed"}).encode()] raw_len = environ.get("CONTENT_LENGTH") length = int(raw_len) if raw_len and raw_len.isdigit() else 0 body = environ["wsgi.input"].read(length) if length > 0 else b"" data = parse_json_safe(body) res = engine.decision(data) USER_HISTORY.setdefault(key, []).append(res) start_response("200 OK", headers) return [json.dumps(res).encode()] if path == "/verify": start_response("200 OK", headers) return [json.dumps({"valid": blockchain.is_valid()}).encode()] start_response("404 NOT FOUND", headers) return [json.dumps({"error": "not_found"}).encode()] except Exception as e: start_response("500 INTERNAL SERVER ERROR", headers) return [json.dumps({"error": str(e)}).encode()] ######################################## # ⚫ TESTS ######################################## # helper to reset shared state def _reset_state(): rate_limiter.calls = 0 def test_pricing(): env = {"PATH_INFO": "/pricing", "REQUEST_METHOD": "GET"} res = application(env, lambda s,h: None) assert b"starter" in res[0] def test_buy_flow(): key = generate_api_key() env = { "PATH_INFO": "/buy", "REQUEST_METHOD": "GET", "HTTP_X_API_KEY": key, "QUERY_STRING": "plan=starter" } res = application(env, lambda s,h: None) assert b"credits_added" in res[0] ######################################## def _reset_state(): rate_limiter.calls = 0 def test_auth_fail(): _reset_state() env = {"PATH_INFO": "/execute", "REQUEST_METHOD": "POST", "HTTP_X_API_KEY": "bad"} res = application(env, lambda s, h: None) assert b"UNAUTHORIZED_ACCESS" in res[0] def test_execute_success(): _reset_state() payload = json.dumps({"hedef": "build startup"}).encode() env = { "PATH_INFO": "/execute", "REQUEST_METHOD": "POST", "HTTP_X_API_KEY": settings.API_KEY, "CONTENT_LENGTH": str(len(payload)), "wsgi.input": type("obj", (), {"read": lambda self, n: payload})() } res = application(env, lambda s, h: None) assert b"durum" in res[0] def test_async_flow(): _reset_state() key = generate_api_key() payload = json.dumps({"hedef": "async test"}).encode() env = { "PATH_INFO": "/async_execute", "REQUEST_METHOD": "POST", "HTTP_X_API_KEY": key, "CONTENT_LENGTH": str(len(payload)), "wsgi.input": type("obj", (), {"read": lambda self, n: payload})() } res = application(env, lambda s, h: None) data = json.loads(res[0].decode()) assert "task_id" in data ######################################## # ⚫ RUN ######################################## if __name__ == "__main__": print("SYSTEM READY")