• Latest
Developing Event-Driven Microservices – DZone Microservices

Developing Event-Driven Microservices – DZone Microservices

December 14, 2021
EA Reportedly Pursuing Sale Or Merger

EA Reportedly Pursuing Sale Or Merger

May 22, 2022
FCC filings reveal Apple’s ‘Network Adapter’ that runs iOS

FCC filings reveal Apple’s ‘Network Adapter’ that runs iOS

May 22, 2022
ETL, ELT, and Reverse ETL

ETL, ELT, and Reverse ETL

May 22, 2022
Poco Watch review – GSMArena.com news

Poco Watch review – GSMArena.com news

May 22, 2022
These Panasonic deals will save you up to $500 on cameras and lenses!

These Panasonic deals will save you up to $500 on cameras and lenses!

May 22, 2022
Weekly poll results: the Sony Xperia 1 IV and 10 IV are great, if you can afford them

Weekly poll results: the Sony Xperia 1 IV and 10 IV are great, if you can afford them

May 22, 2022
Applying Kappa Architecture to Make Data Available

Applying Kappa Architecture to Make Data Available

May 22, 2022
Nintendo Responds To Wii And DSi Shop Channel Outages

Nintendo Responds To Wii And DSi Shop Channel Outages

May 21, 2022
Poll: What’s The Best Warriors Game On Switch?

Poll: What’s The Best Warriors Game On Switch?

May 21, 2022
Question of the Week: Which woman photographer do you feel is/was most influential?

If you could spend one day with a famous photographer, who would it be?

May 21, 2022
Report: Apple tells suppliers it wants to expand manufacturing outside of China, India and Vietnam likely future production hubs

Report: Apple tells suppliers it wants to expand manufacturing outside of China, India and Vietnam likely future production hubs

May 21, 2022
The Centennial Case: A Shijima Story Review (Switch eShop)

The Centennial Case: A Shijima Story Review (Switch eShop)

May 21, 2022
Advertise with us
Sunday, May 22, 2022
Bookmarks
  • Login
  • Register
GetUpdated
  • Home
  • Game Updates
    • Mobile Gaming
    • Playstation News
    • Xbox News
    • Switch News
    • MMORPG
    • Game News
    • IGN
    • Retro Gaming
  • Tech News
    • Apple Updates
    • Jailbreak News
    • Mobile News
  • Software Development
  • Photography
  • Contact
    • Advertise With Us
    • About
No Result
View All Result
GetUpdated
No Result
View All Result
GetUpdated
No Result
View All Result
ADVERTISEMENT

Developing Event-Driven Microservices – DZone Microservices

December 14, 2021
in Software Development
Reading Time:8 mins read
0 0
0
Share on FacebookShare on WhatsAppShare on Twitter


Oracle infographic.

This is the second in a series of blogs on data-driven microservices design mechanisms and transaction patterns with the Oracle converged database. The first blog illustrated how to connect to an Oracle database in Java, JavaScript, Python, .NET, and Go as succinctly as possible. The goal of this second blog is to use that connection to receive and send messages with Oracle AQ (Advanced Queueing) queues and topics and conduct an update and read from the database using all of these same languages.

Advanced Queuing (AQ) is a messaging system that is part of every Oracle database edition and was first released in 2002. AQ sharded queues introduced partitioning in release 12c and is now called Transaction Event Queues (TEQ).

Microservices increasingly use event-driven architectures for communication and related to this many data-driven systems are also employing an event sourcing pattern of one form or another. This is when data changes (eg a SQL command to “insert order”) are sent via events that describe the data change (eg an “orderPlaced” event) that are received by interested services. Thus, the data is sourced from the events, and event sourcing in general moves the source of truth for data to the event broker.  This fits nicely with the decoupling paradigm of microservices.

It is very important to notice that there are actually two operations involved in event sourcing, the data change being made and the communication/event of that data change. There is, therefore, a transactional consideration and any inconsistency or failure causing a lack of atomicity between these two operations must be accounted for. This is an area where TEQ has an extremely significant and unique advantage as it, the messaging/eventing system, is actually part of the database system itself and therefore can conduct both of these operations in the same local transaction and provide this atomicity guarantee. 

More details on AQ can be found at the blog Apache Kafka versus Oracle Transactional Event Queues (TEQ) as Microservices Event Mesh.

The examples provided in this blog will succinctly show the pattern just described using an order/inventory scenario:

  • A local transaction starts.
  • A message with a JSON payload for an order dequeues from the orderqueue queue.
  • The inventory for the item requested in the order reduces in the inventory table and the inventory location returns.
  • A message with a JSON payload (including the inventory status and location) enqueues on the inventoryqueuequeue.
  • The local transaction commits. 

Note that the SYS.AQ$_JMS_TEXT_MESSAGE type is interoperable between all languages and supports a JSON payload making it perfect for writing microservices that work together regardless of language.

The PL/SQL to create the order and inventory queues:

  BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table          => 'ORDERQUEUETABLE',
  queue_payload_type   => 'SYS.AQ$_JMS_TEXT_MESSAGE',
  multiple_consumers   => true,
  compatible           => '8.1');

  DBMS_AQADM.CREATE_QUEUE (
  queue_name          => 'ORDERQUEUE',
  queue_table         => 'ORDERQUEUETABLE');

  DBMS_AQADM.START_QUEUE (
  queue_name          => 'ORDERQUEUE');
  END;
  /
  BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE (
   queue_table          => 'INVENTORYQUEUETABLE',
   queue_payload_type   => 'SYS.AQ$_JMS_TEXT_MESSAGE',
   multiple_consumers   => true,
   compatible           => '8.1');

   DBMS_AQADM.CREATE_QUEUE (
   queue_name          => 'INVENTORYQUEUE',
   queue_table         => 'ORDERQUEUETABLE');

   DBMS_AQADM.START_QUEUE (
   queue_name          => 'INVENTORYQUEUE');
   END;
   /

The SQL to create the inventory table and insert a sample item:

   create table inventory (
     inventoryid varchar(16) PRIMARY KEY NOT NULL,
     inventorylocation varchar(32),
     inventorycount integer CONSTRAINT positive_inventory CHECK (inventorycount >= 0) );

   insert into inventory values ('sushi', '1468 WEBSTER ST,San Francisco,CA', 0);

Note that JSON can be manipulated using plain string operations, however, JSON parsers for each language are used with order and inventory object representations in the code snippets presented.  The complete source can be found here, and you can take the “Building Microservices with converged Oracle Database Workshop” found here at any time for no charge to easily set up a full microservices environment complete with a Kubernetes cluster, 2 ATP (Autonomous Transaction Processing) Oracle databases, AQ messaging propagation, etc. in ~15 minutes!  

We’ll provide just the basic facts you need… the source.

 

Coffee graphic.
Java

    
    QueueConnectionFactory qcfact = AQjmsFactory.getQueueConnectionFactory(inventoryResource.atpInventoryPDB);
    QueueConnection qconn = qcfact.createQueueConnection(inventoryResource.inventoryuser, inventoryResource.inventorypw);
    QueueSession qsess = qconn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
    qconn.start();
    Queue queue = ((AQjmsSession) qsess).getQueue(inventoryResource.inventoryuser, inventoryResource.orderQueueName);
    AQjmsConsumer consumer = (AQjmsConsumer) qsess.createConsumer(queue);

    // Dequeue from order queue
    TextMessage orderMessage = (TextMessage) (consumer(-1));
    String txt = orderMessage.getText();
    Order order = JsonUtils.read(txt, Order.class);
    dbConnection = session.getDBConnection();
    OraclePreparedStatement st = (OraclePreparedStatement) dbConnection.prepareStatement("update inventory set inventorycount = inventorycount - 1 where inventoryid = ? and inventorycount > 0 returning inventorylocation into ?");
    st.setString(1, id);
    st.registerReturnParameter(2, Types.VARCHAR);

    // Update inventory and return inventory location
    int i = st.executeUpdate();
    ResultSet res = st.getReturnResultSet();
    inventorylocation = (i > 0 && res.next())?res.getString(1):"inventorydoesnotexist";
    Inventory inventory = new Inventory(orderid, itemid, inventorylocation, suggestiveSale);
    String jsonString = JsonUtils.writeValueAsString(inventory);
    Topic inventoryTopic = session.getTopic(InventoryResource.inventoryuser, InventoryResource.inventoryQueueName);
    TextMessage objmsg = session.createTextMessage();
    TopicPublisher publisher = session.createPublisher(inventoryTopic);

    // Enqueue the response on the inventory queue and commit
    publisher.publish(inventoryTopic, objmsg, DeliveryMode.PERSISTENT, 2, AQjmsConstants.EXPIRATION_NEVER);
    qsess.commit();

Python

    
    orderQueue = conn.queue(queue_owner + ".orderqueue", conn.gettype("SYS.AQ$_JMS_TEXT_MESSAGE"))
    inventoryQueue = conn.queue(queue_owner + ".inventoryqueue", conn.gettype("SYS.AQ$_JMS_TEXT_MESSAGE"))
    cursor = conn.cursor()

    # Dequeue from order queue
    conn.autocommit = False
    payload =orderQueue.deqOne().payload
    logger.debug(payload.TEXT_VC)
    orderInfo = simplejson.loads(payload.TEXT_VC)

    # Update inventory and return inventory location
    ilvar = cursor.var(str)
    cursor.execute("""update inventory set inventorycount = inventorycount - 1
         where inventoryid = :inventoryid and inventorycount > 0
         returning inventorylocation into :inventorylocation""", [orderInfo["itemid"], ilvar])

    # Enqueue the response on the inventory queue and commit
    payload = conn.gettype("SYS.AQ$_JMS_TEXT_MESSAGE").newobject()
    payload.TEXT_VC = simplejson.dumps(
        {'orderid': orderInfo["orderid"],
         'itemid': orderInfo["itemid"],
         'inventorylocation': ilvar.getvalue(0)[0] if cursor.rowcount == 1 else "inventorydoesnotexist",
         'suggestiveSale': "beer"})
    payload.TEXT_LEN = len(payload.TEXT_VC)
    inventoryQueue.enqOne(conn.msgproperties(payload = payload))
    conn.commit()

 

JavaScript

  
  let connection;
  let bindVariables = {};  
  let options = {};
  oracledb.autoCommit = false;
  connection = await oracledb.getConnection();
  const orderQueue = await connection.getQueue(queueConfig.orderQueue, queueOptions);

  // Dequeue from order queue
  const orderMsg = await orderQueue.deqOne();
  const orderMsgContent = JSON.parse(orderMsg.payload.TEXT_VC);
  const updateSQL = `update inventory  set inventorycount = inventorycount - 1 where 1=1 and inventoryid = :inventoryid and inventorycount > 0  returning inventorylocation into :inventorylocation`; 
  bindVariables.inventoryid = orderMsgContent.itemid
  bindVariables.inventorylocation = {
      dir: oracledb.BIND_OUT,
      type: oracledb.STRING
  };
  options.outFormat = oracledb.OUT_FORMAT_OBJECT;

  // Update inventory and return inventory location
  const queryResult = await connection.execute(updateSQL, bindVariables, options);
  if (queryResult.rowsAffected && queryResult.rowsAffected === 1) {
      location = queryResult.outBinds.inventorylocation[0];
  } else {
      location = "inventorydoesnotexist";
  }
  const inventoryQueue = await connection.getQueue(queueConfig.inventoryQueue, queueOptions);
  const inventoryMsgText = JSON.stringify({
      orderid: orderMsgContent.orderid,
      itemid: orderMsgContent.itemid,
      inventorylocation: location,
      suggestiveSale: "beer"
  });
  const inventoryMsgContent = new inventoryQueue.payloadTypeClass({
      TEXT_VC: inventoryMsgText,
      TEXT_LEN: inventoryMsgText.length
  });

  // Enqueue the response on the inventory queue and commit
  const inventoryMsg = await inventoryQueue.enqOne({payload: inventoryMsgContent, priority: 2});
  await connection.commit();

 

.NET

   
  
  //for .NET we will show usage of PL/SQL procedures which can be found/installed from here
  connection.Open();
  OracleTransaction tx = connection.BeginTransaction();
  OracleCommand orderReceiveMessageCommand = new OracleCommand();
  orderReceiveMessageCommand.Connection = connection;
  orderReceiveMessageCommand.CommandText = "dequeueOrderMessage";
  orderReceiveMessageCommand.CommandType = CommandType.StoredProcedure;
  OracleParameter p_orderInfoParam = new OracleParameter("p_orderInfo", OracleDbType.Varchar2, 32767);
  p_orderInfoParam.Direction = ParameterDirection.Output;
  orderReceiveMessageCommand.Parameters.Add (p_orderInfoParam);

  // Dequeue from order queue
  orderReceiveMessageCommand.ExecuteNonQuery();
  Order order = JsonConvert.DeserializeObject<Order>("" + orderReceiveMessageCommand.Parameters["p_orderInfo"].Value);
  OracleCommand checkInventoryReturnLocationCommand = new OracleCommand();
  checkInventoryReturnLocationCommand.Connection = connection;
  checkInventoryReturnLocationCommand.CommandText = "checkInventoryReturnLocation";
  checkInventoryReturnLocationCommand.CommandType =  CommandType.StoredProcedure;
  OracleParameter p_itemIdParam = new OracleParameter("p_inventoryId", OracleDbType.Varchar2, 32767);
  p_itemIdParam.Direction =ParameterDirection.Input;
  p_itemIdParam.Value = order.itemid;
  checkInventoryReturnLocationCommand.Parameters.Add (p_itemIdParam);
  OracleParameter p_inventorylocationParam = new OracleParameter("p_inventorylocation", OracleDbType.Varchar2, 32767);
  p_inventorylocationParam.Direction = ParameterDirection.Output;
  checkInventoryReturnLocationCommand.Parameters.Add (p_inventorylocationParam);

  // Update inventory and return inventory location
  checkInventoryReturnLocationCommand.ExecuteNonQuery();
  Inventory inventory = new Inventory();
  var inventoryLocation = "" + checkInventoryReturnLocationCommand.Parameters["p_inventorylocation"].Value;
  inventory.inventorylocation = inventoryLocation.Equals("null") ? "inventorydoesnotexist" : inventoryLocation;
  inventory.itemid = order.itemid;
  inventory.orderid = order.orderid;
  inventory.suggestiveSale = inventoryLocation.Equals("null") ? "" : "beer";
  string inventoryJSON = JsonConvert.SerializeObject(inventory);
  OracleCommand inventorySendMessageCommand = new OracleCommand();
  inventorySendMessageCommand.Connection = connection;
  inventorySendMessageCommand.CommandText = "enqueueInventoryMessage";
  inventorySendMessageCommand.CommandType = CommandType.StoredProcedure;
  OracleParameter p_inventoryInfoParam = new OracleParameter("p_inventoryInfo", OracleDbType.Varchar2, 32767);
  p_inventoryInfoParam.Direction = ParameterDirection.Input;
  p_inventoryInfoParam.Value = inventoryJSON;
  inventorySendMessageCommand.Parameters.Add (p_inventoryInfoParam);

  // Enqueue the response on the inventory queue and commit
  inventorySendMessageCommand.ExecuteNonQuery();
  tx.Commit();

 

Go

   
   tx, err := db.BeginTx(ctx, nil)
   defer tx.Rollback()
   orderqueue, err := godror.NewQueue(ctx, tx, "orderqueue", "SYS.AQ$_JMS_TEXT_MESSAGE",
      godror.WithDeqOptions(godror.DeqOptions{
         Mode:       godror.DeqRemove,
         Visibility: godror.VisibleOnCommit,
         Navigation: godror.NavNext,
         Wait:       10000,
      }))
   defer orderqueue.Close()
   msgs := make([]godror.Message, 1)

  // Dequeue from order queue
   n, err := orderqueue.Dequeue(msgs)
   textVCIntf, _ := msgs[0].Object.Get("TEXT_VC")
   textVC := fmt.Sprintf("%s", textVCIntf)
   type Order struct {
      Orderid           string
      Itemid            string
      Deliverylocation  string
      Status            string
      Inventorylocation string
      SuggestiveSale    string
   }
   order := Order{}
   json.Unmarshal([]byte(textVC), &order)
   var inventorylocation string
   sqlString := "update INVENTORY set INVENTORYCOUNT = INVENTORYCOUNT - 1 where INVENTORYID = :inventoryid and INVENTORYCOUNT > 0 returning inventorylocation into :inventorylocation"

   // Update inventory and return inventory location
   res, errFromInventoryCheck := db.Exec(sqlString, sql.Named("inventoryid", order.Itemid), sql.Named("inventorylocation", sql.Out{Dest: &inventorylocation}))
   numRows, err := res.RowsAffected()
   if inventorylocation == "" {
      inventorylocation = "inventorydoesnotexist"
   }
   type Inventory struct {
      Orderid           string
      Itemid            string
      Inventorylocation string
      SuggestiveSale    string
   }
   inventory := &Inventory{
      Orderid:           order.Orderid,
      Itemid:            order.Itemid,
      Inventorylocation: inventorylocation,
      SuggestiveSale:    "beer",
   }
   inventoryJsonData, err := json.Marshal(inventory)
   if err != nil {
      fmt.Println(err)
   }
   inventoryqueue, err := godror.NewQueue(ctx, tx, "inventoryqueue", "SYS.AQ$_JMS_TEXT_MESSAGE",
      godror.WithEnqOptions(godror.EnqOptions{
         Visibility:   godror.VisibleOnCommit, //Immediate
         DeliveryMode: godror.DeliverPersistent,
      }))
   defer inventoryqueue.Close()
   obj, err := inventoryqueue.PayloadObjectType.NewObject()
   sendmsg := godror.Message{Object: obj}
   sendmsg.Expiration = 10000
   inventoryJsonDatastr := string(inventoryJsonData)
   obj.Set("TEXT_VC",  inventoryJsonDatastr)
   obj.Set("TEXT_LEN", len(inventoryJsonDatastr))
   sendmsgs := make([]godror.Message, 1)
   sendmsgs[0] = sendmsg

   // Enqueue the response on the inventory queue and commit
   if err = inventoryqueue.Enqueue(sendmsgs); err != nil { 
      fmt.Printf("nenqueue error:", err)
   }
   if err := tx.Commit(); err != nil {
      fmt.Printf("commit err:", err)
   }

 

Conclusion

We have built upon the first blog, which gave examples of how to connect to the Oracle database and containerize for microservice environments by showing how to use Oracle AQ/TEQ for eventing as well as update and query. This is in the same local transaction in order to implement a robust and simplified event-driven microservices architectural groundwork.

In the next installment of the series, we will look at various frameworks of these languages and how they provide additional convenience and functionality when creating microservices using the converged Oracle Database.

Please feel free to provide any feedback here, on the workshop, on the GitHub repos, or directly. We are happy to hear from you.



Source link

ShareSendTweet
Previous Post

Nikon unveils 28-75mm f/2.8 Z mount lens, development of 800mm f/6.3 prime

Next Post

Java Media Player: Web Browser-Based Approach

Related Posts

ETL, ELT, and Reverse ETL

May 22, 2022
0
0
ETL, ELT, and Reverse ETL
Software Development

This is an article from DZone's 2022 Data Pipelines Trend Report.For more: Read the Report ETL (extract, transform, load) has...

Read more

Applying Kappa Architecture to Make Data Available

May 22, 2022
0
0
Applying Kappa Architecture to Make Data Available
Software Development

Introduction  Banks are accelerating their modernization effort to rapidly develop and deliver top-notch digital experiences for their customers. To achieve...

Read more
Next Post
Java Media Player: Web Browser-Based Approach

Java Media Player: Web Browser-Based Approach

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

© 2021 GetUpdated – MW.

  • About
  • Advertise
  • Privacy & Policy
  • Terms & Conditions
  • Contact

No Result
View All Result
  • Home
  • Game Updates
    • Mobile Gaming
    • Playstation News
    • Xbox News
    • Switch News
    • MMORPG
    • Game News
    • IGN
    • Retro Gaming
  • Tech News
    • Apple Updates
    • Jailbreak News
    • Mobile News
  • Software Development
  • Photography
  • Contact
    • Advertise With Us
    • About

Welcome Back!

Login to your account below

Forgotten Password? Sign Up

Create New Account!

Fill the forms bellow to register

All fields are required. Log In

Retrieve your password

Please enter your username or email address to reset your password.

Log In
Are you sure want to unlock this post?
Unlock left : 0
Are you sure want to cancel subscription?