Custom HummingBot for Whitebit
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
hummingbot/scripts/klc_vol.py

96 lines
4.3 KiB

import logging
import random
from decimal import Decimal
from typing import List, Optional
from hummingbot.core.data_type.common import OrderType, PriceType, TradeType
from hummingbot.core.data_type.order_candidate import OrderCandidate
from hummingbot.strategy.script_strategy_base import ScriptStrategyBase
from hummingbot.logger import HummingbotLogger
from hummingbot.core.event.events import OrderFilledEvent
# Import necessary modules from Whitebit exchange connector
from hummingbot.connector.exchange.whitebit.whitebit_exchange import WhitebitExchange
class VolumeTradingStrategy(ScriptStrategyBase):
_logger: Optional[HummingbotLogger] = None
@classmethod
def logger(cls) -> HummingbotLogger:
if cls._logger is None:
cls._logger = logging.getLogger(__name__)
return cls._logger
"""
This strategy places a limit buy order and then a market sell order to increase trading volume.
It operates on a 15-second cycle with variable order amounts in KLC tokens, using the current market mid price.
"""
min_order_amount_klc = 400 # Minimum order amount in KLC tokens
max_order_amount_klc = 900 # Maximum order amount in KLC tokens
order_refresh_time = 15 # Time interval in seconds for each cycle
trading_pair = "KLC-USDT" # Trading pair
exchange = "whitebit" # Exchange name
price_source = "current_market" # Current market as price source
price_type = PriceType.MidPrice # Mid price as price type
markets = {exchange: {trading_pair}}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.last_order_timestamp = 0
async def on_tick(self):
if self.current_timestamp - self.last_order_timestamp >= self.order_refresh_time:
self.logger().info("Executing trading cycle.")
if not self._exchange_ready():
self.logger().warning("Whitebit exchange is not ready. Waiting...")
# Optionally, manually trigger initialization here
return
self.last_order_timestamp = self.current_timestamp
await self.cancel_all_orders()
proposal = self.create_proposal()
proposal_adjusted = self.adjust_proposal_to_budget(proposal)
await self.place_orders(proposal_adjusted)
def _exchange_ready(self) -> bool:
exchange = self.connectors[self.exchange]
is_ready = exchange.is_ready() if isinstance(exchange, WhitebitExchange) else False
self.logger().debug(f"Exchange ready status: {is_ready}")
return is_ready
def create_proposal(self) -> List[OrderCandidate]:
self.logger().info("Creating order proposal.")
ref_price = self.connectors[self.exchange].get_price_by_type(self.trading_pair, self.price_type)
order_amount_klc = random.uniform(self.min_order_amount_klc, self.max_order_amount_klc)
buy_price = ref_price
buy_order = OrderCandidate(
trading_pair=self.trading_pair,
is_maker=True,
order_type=OrderType.LIMIT,
order_side=TradeType.BUY,
amount=Decimal(order_amount_klc),
price=buy_price
)
return [buy_order]
def adjust_proposal_to_budget(self, proposal: List[OrderCandidate]) -> List[OrderCandidate]:
self.logger().info("Adjusting proposal to budget.")
proposal_adjusted = self.connectors[self.exchange].budget_checker.adjust_candidates(proposal, all_or_none=True)
return proposal_adjusted
async def place_orders(self, proposal: List[OrderCandidate]) -> None:
for order in proposal:
self.logger().info(f"Placing order: {order}")
await self.place_order(connector_name=self.exchange, order=order)
async def cancel_all_orders(self):
self.logger().info("Cancelling all orders.")
for order in self.get_active_orders(connector_name=self.exchange):
await self.cancel(self.exchange, order.trading_pair, order.client_order_id)
def did_fill_order(self, event: OrderFilledEvent):
self.logger().info(f"Order filled: {event}")
if event.trade_type == TradeType.BUY:
self.logger().info(f"Placing market sell order for {event.amount} of {event.trading_pair}")
self.sell(connector_name=self.exchange, trading_pair=event.trading_pair, amount=event.amount, order_type=OrderType.MARKET)